| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134 |
- ### -------------------- LIBARIES --------------------
- import datetime
- import time
- import json
- import yfinance as yf
- import pandas as pd
- import requests
- import config
- ### -------------------- FUNCTIONS --------------------
- # ---------------- #
- # HELPER FUNCTIONS #
- # ---------------- #
- # LOGGING / PRINTING TO TERMINAL
- def logging(message = "", logging_level = "", new_line = True):
-
- # Take the selected logging level in the config file
- # Look this up in the list of all available logging levels in the config file
- # Return the index number
- config_logging_level = config.logging_levels.index(config.selected_logging_level)
- try:
- # Take the logging level of the text to print
- # Look this up in the list of all available logging levels in the config file
- # Return the index number
- message_logging_level = config.logging_levels.index(logging_level)
- except:
- # Fallback to the least important logging level
- # Solved by checking the lenght of the available logging levels
- message_logging_level = len(config.logging_levels)
- # Check for false new_line entries
- if new_line is not bool:
- new_line = True
- # Check if the warning should be printed
- if message_logging_level <= config_logging_level:
-
- # Geting the log color
- log_color = getattr(config.log_colors, logging_level)
- # Construct the logging-text incl. color
- log_text = str(log_color + "[" + logging_level + "] " + config.log_colors.endcode + message)
- # Check if the warning should end with a new-line
- # Printing the text
- if new_line == True:
- print(log_text)
- else:
- print(log_text, end=" ", flush=True)
- # CALCULATE THE IRR
- def calculate_irr(date_now, date_open, value_now, value_open):
- error = False
- irr = 0.0
- try:
- # Count the number in days
- a = date_now - date_open
- a = a.days
- # Am Tag des Kaufs selbst, liegt das Delta in Tagen bei 0
- # Um dennoch einen IRR kalkulieren zu können, wird das Delta auf 1 gsetzt
- if a == 0:
- a = 1
- a = a / 365 # Umrechnung auf Jahresanteil, um auch den Jahreszinssatz zu bekommen
- b = value_now / value_open
-
- # Catch negative IRRs
- if b < 0:
- b = b * (-1)
- irr = b**(1/a) # matematisch identisch zur b-ten Wurzel von a
- irr = irr * (-1)
- else:
- irr = b**(1/a) # matematisch identisch zur b-ten Wurzel von a
- except:
- error = True
-
- # Return data if successful
- if error == True:
- print("[ERROR] Calculation of irr")
- return error
- else:
- return irr
- # GET THE DAY OF THE OLDEST TRADE
- def get_date_open_oldest_trade(trades):
- # Identify the open date for the oldest trade
- date_open_oldest_trade = datetime.date.today()
- for i in trades:
- if trades[i]["date_open"] < date_open_oldest_trade:
- date_open_oldest_trade = trades[i]["date_open"]
- return date_open_oldest_trade
- # CREATES LIST OF UNIQUE TICKERS
- def filter_list_of_tickers(trades):
- tickers = []
- try:
- for i in trades:
- # Fetch ticker belonging to trade
- ticker = trades[i]['ticker']
- # Add ticker to list, if not already present
- if ticker not in tickers:
- tickers.append(ticker)
-
- # Main Logging
- logging(logging_level="success")
- logging(logging_level="info", message=f"{len(tickers)} tickers found")
- return tickers
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # CREATE LIST OF WEEKLY DATES
- def create_list_filtered_dates(trades, days_seperation):
- stop_date = get_date_open_oldest_trade(trades)
- index_date = datetime.date.today()
- try:
- # Create reversed list (1st entry is today going back in time)
- list_filtered_dates = []
- while index_date >= stop_date:
- list_filtered_dates.append(index_date.isoformat())
- index_date = index_date - datetime.timedelta(days=days_seperation)
- # Reverse the list, so that the frist entry is the oldest one
- list_filtered_dates.reverse()
-
- # Main Logging
- logging(logging_level="success")
- logging(logging_level="info", message=f"{len(list_filtered_dates)} dates in weekly list")
- return list_filtered_dates
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # FETCH THE LAST INDEX FROM A DICT
- def fetch_last_key_from_dict(dict):
- key_list = list(dict.keys()) # Extract the keys and convert them to a list
- last_key = key_list[-1] # select the last entry from the list as it is the most current entry
- return last_key
- # ADD BENCHMARK-TICKER TO TICKER-DICT
- def add_benchmark_ticker(tickers, ticker_benchmarkt):
- tickers.append(ticker_benchmarkt)
- logging(logging_level="success")
- return tickers
- # CREATE BENCHMARK TRADES
- def create_benchmark_trades(trades, yf_data):
-
- # Prepertion
- benchmark_trades = {}
- i = 0
- # Creating benchmark trades
- try:
- for trade_id in trades:
- # Benchmark-id
- i = i+1
- benchmark_id = "benchmark" + str(i)
- # Copy raw trades
- benchmark_trades[benchmark_id] = trades[trade_id]
- benchmark_trades[benchmark_id]["ticker"] = config.ticker_benchmark
- # Calculate amount invested
- amount_invested = benchmark_trades[benchmark_id]["units"] * benchmark_trades[benchmark_id]["course_open"]
- # Change course-open for benchmark-ticker performance calculation
- success = False
- index_date = benchmark_trades[benchmark_id]["date_open"]
- while success == False:
- try:
- course_open_new = yf_data[config.ticker_benchmark].at[index_date, 'Close']
- success = True
- except:
- index_date = index_date + datetime.timedelta(days=1)
- benchmark_trades[benchmark_id]["course_open"] = course_open_new # type: ignore
- # Change amount for benchmark-ticker performance calculation
- benchmark_trades[benchmark_id]["units"] = amount_invested / course_open_new # type: ignore
- # Change course-open for benchmark-ticker performance calculation, if relevant
- if trades[trade_id]["date_close"] != 0:
- success = False
- index_date = benchmark_trades[benchmark_id]["date_close"]
- while success == False:
- try:
- course_close_new = yf_data[config.ticker_benchmark].at[index_date, 'Close']
- success = True
- except:
- index_date = index_date + datetime.timedelta(days=1)
- benchmark_trades[benchmark_id]["course_close"] = course_close_new # type: ignore
- # Logging
- logging(logging_level="success")
- return benchmark_trades
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # MERGE BENCHMARK HISTORY & TICKER-HISTROY
- def merge_histories(history_per_ticker, benchmark_history):
-
- # Preperation
- benchmark_ticker = config.ticker_benchmark
- error_count = 0
-
- # Merging Data
- for index_date in history_per_ticker:
- try:
- history_per_ticker[index_date]["benchmark"] = {}
- history_per_ticker[index_date]["benchmark"]["current_invested"] = benchmark_history[index_date][benchmark_ticker]["current_invested"]
- history_per_ticker[index_date]["benchmark"]["total_dividends"] = benchmark_history[index_date][benchmark_ticker]["total_dividends"]
- history_per_ticker[index_date]["benchmark"]["current_value"] = benchmark_history[index_date][benchmark_ticker]["current_value"]
- history_per_ticker[index_date]["benchmark"]["current_irr"] = benchmark_history[index_date][benchmark_ticker]["current_irr"]
- history_per_ticker[index_date]["benchmark"]["total_performanance"] = benchmark_history[index_date][benchmark_ticker]["total_performanance"]
- except:
- error_count = error_count +1
-
- # Debugging
- if config.selected_logging_level == "debug":
- data = json.dumps(history_per_ticker, indent=2) # Converts a python-dictionary into a json
- with open("history_per_ticker_with_benchmark.json", "w") as f:
- f.write(data)
- # Main Logging
- if error_count == 0:
- logging(logging_level="success")
- return history_per_ticker
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"Error merging benchmark-history into ticker-history in {error_count} cases")
- return False
- # -------------------------- #
- # NETWORK DOWNLOAD FUNCTIONS #
- # -------------------------- #
- # NOTION FETCH PAGES
- def notion_get_pages(db_id_trades, num_pages=None):
- try:
- # ------------------ FETCH THE FIRST 100 PAGES FROM A DB
- # Prepare Request
- url = f"https://api.notion.com/v1/databases/{db_id_trades}/query"
- get_all = num_pages is None # If num_pages is None, get all pages, otherwise just the defined number.
- page_size = 100 if get_all else num_pages
- payload = {"page_size": page_size}
-
- # Make Request
- raw_response = requests.post(url, json=payload, headers=config.notion_headers)
- # Process Reply
- parsed_response = raw_response.json()
- result = parsed_response["results"]
-
- # ------------------ FETCH 100 MORE PAGES AS OFTEN AS REQUIRED
- while parsed_response["has_more"] and get_all:
- # Prepare Request
- payload = {"page_size": page_size, "start_cursor": parsed_response["next_cursor"]}
- url = f"https://api.notion.com/v1/databases/{db_id_trades}/query"
- # Make Request
- raw_response = requests.post(url, json=payload, headers=config.notion_headers)
-
- # Process Reply
- parsed_response = raw_response.json()
- result.extend(parsed_response["results"])
-
- # Logging
- return result
- except Exception:
- return True # Return True when there was an error
- # NOTION FETCH & FORMAT TRADES
- def fetch_format_notion_trades(db_id_trades):
- trades = {}
- fetch_error = False
- format_errors = 0
- number_of_trades = 0
- error_message = ""
- # Download data from notion
- data = notion_get_pages(db_id_trades)
- # Check, if cuccessfull
- if data is True:
- fetch_error = True
- else:
- # Format the recieved data
- for i in data:
- # Count for stratistics
- number_of_trades = number_of_trades + 1
-
- # Each page is loaded as a dictionary
- notion_page = dict(i)
- # Handling desired missing entries
- try:
- date_close = notion_page["properties"]["Close"]["date"]
- date_close = date_close["start"]
- date_close = datetime.date(*map(int, date_close.split('-')))
- except:
- date_close = 0
- # Handeling non-desired missing entries (by skipping this trade)
- try:
- # Try extracting values
- trade = {}
- # Format date-open
- date_open = notion_page["properties"]["Open"]["date"]
- date_open = date_open["start"]
- date_open = datetime.date(*map(int, date_open.split('-')))
- # Combine data into json structure
- trade = {
- 'ticker' : notion_page["properties"]["Ticker"]["select"]["name"],
- 'date_open' : date_open,
- 'date_close' : date_close,
- 'course_open' : notion_page["properties"]["Open (€)"]["number"],
- 'course_close' : notion_page["properties"]["Close (€)"]["number"],
- 'course_current' : notion_page["properties"]["Current (€)"]["number"],
- 'irr' : notion_page["properties"]["IRR (%)"]["number"],
- 'units' : notion_page["properties"]["Units"]["number"],
- 'dividends' : notion_page["properties"]["Dividends (€)"]["number"]
- }
- # Save values
- notion_page_id = notion_page["id"] # Use as key for the dictionary
- trades[notion_page_id] = trade
- except Exception as e:
- format_errors = format_errors + 1
- error_message = e
- # Logging
- if fetch_error == True:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- else:
- # Writing Example File
- if config.selected_logging_level == "debug":
- with open("trades.json", "w") as f:
- f.write(str(trades))
- # Logging
- if format_errors == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"{number_of_trades} trades recieved and formated")
- return trades
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"{format_errors} trades out of {number_of_trades} skiped...maybe due to missing values?")
- return trades
- # NOTION FETCH & FORMAT INVESTMENT OVERVIEW
- def fetch_format_notion_investments(db_id_investments):
- investments = {}
- fetch_error = False
- format_errors = 0
- number_of_investments = 0
- # Download data & check for success
- data = notion_get_pages(db_id_investments)
- if data is True:
- error = True
- else:
- # Format recieved data
- for i in data:
-
- # Count up for statistics
- number_of_investments = number_of_investments + 1
- try:
- # Each page is loaded as a dictionary
- notion_page = dict(i)
- # Extract values
- notion_page_id = notion_page["id"] # Use as key for the dictionary
- investments[notion_page_id] = {}
- investments[notion_page_id]["ticker"] = notion_page["properties"]["Ticker"]["select"]["name"]
- investments[notion_page_id]["total_dividends"] = notion_page["properties"]["Dividends (€)"]["number"]
- investments[notion_page_id]["current_value"] = notion_page["properties"]["Current (€)"]["number"]
- investments[notion_page_id]["current_irr"] = notion_page["properties"]["IRR (%)"]["number"]
- investments[notion_page_id]["total_performanance"] = notion_page["properties"]["Performance (€)"]["number"]
-
- # Skip this entry, if errors show up
- except:
- format_errors = format_errors + 1
- # Main Logging
- if fetch_error == False & format_errors == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"{number_of_investments} investments recieved and formated")
- return investments
- elif fetch_error == False & format_errors > 0:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"{format_errors} investments out of {number_of_investments} skiped...maybe due to missing values?")
- return investments
- else:
- logging(logging_level="error")
- return False
- # YFINANCE FETCH & FORMAT DATA
- def fetch_format_yf_data(tickers):
-
- yf_data = {}
- fetch_errors = 0
- format_errors = 0
- number_of_tickers = 0
- # Download data for each ticker seperately
- for i in tickers:
-
- number_of_tickers = number_of_tickers +1
- skip_formating = False # Helper varianbel (see flow logik)
- ticker = i
-
- # Catch errors during the download
- try:
- # Download data
- api = yf.Ticker(ticker)
- data = api.history(period="max", auto_adjust=False)
- except:
- # Store error for later logging
- fetch_errors = fetch_errors + 1
- data = True
- # If the download was successfull:
- if skip_formating == False:
- # Try formating the data
- try:
- # Convert to Pandas DataFrame
- data = pd.DataFrame(data) # type: ignore
- # Delete the columns "Stock Splits", "High", "Low" and "Open"
- del data['Open']
- del data['Low']
- del data['High']
- del data['Volume']
-
- # Delete these 2 columns, if they exist
- if 'Stock Splits' in data.columns:
- del data['Stock Splits']
- if 'Capital Gains' in data.columns:
- del data['Capital Gains']
- # Get the Number of rows in data
- data_rows = data.shape[0]
- # Create new index without the time from the existing datetime64-index
- old_index = data.index
- new_index = []
- x = 0
- while x < data_rows:
- date = pd.Timestamp.date(old_index[x]) # Converts the "Pandas Timestamp"-object to a "date" object
- new_index.append(date)
- x+=1
- # Add the new index to the dataframe and set it as the index
- data.insert(1, 'Date', new_index)
- data.set_index('Date', inplace=True)
-
- # Save the data-frame to the yf_data dict
- yf_data[ticker] = data
-
- # Handle formating errors
- except:
- format_errors = format_errors +1
- # in case of an error the entry never get's added to the yf_data object
- # Wait for the API to cool down
- print(".", end="", flush=True)
- time.sleep(config.api_cooldowm_time)
- # Main Logging
- print(" ", end="", flush=True)
- if fetch_errors == 0 & format_errors == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"{number_of_tickers} tickers recieved and formated")
- return yf_data
- elif fetch_errors == 0 & format_errors > 0:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"{format_errors} tickers out of {number_of_tickers} skiped")
- return yf_data
- else:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {number_of_tickers}")
- print("\n")
- return False
- # ------------------------ #
- # NETWORK UPLOAD FUNCTIONS #
- # ------------------------ #
- # NOTION UPDATE PAGES
- def notion_update_page(page_id: str, data: dict):
- url = f"https://api.notion.com/v1/pages/{page_id}"
- payload = {"properties": data}
- results = requests.patch(url, json=payload, headers=config.notion_headers)
- return results
- # UPDATE NOTION-TRADES-DATABASE
- def push_notion_trades_update(trades):
-
- # Logging
- error_count = 0
- number_of_uploads = 0
- for notion_page_id in trades:
-
- number_of_uploads = number_of_uploads+1
- try:
- # The irr is stored in the format 1.2534
- # Notion need the format 0,2534
- irr_notion = trades[notion_page_id]['irr'] - 1
- irr_notion = round(irr_notion, 4)
- # Construct Notion-Update-Object
- notion_update = {
- "Current (€)": {
- "number": trades[notion_page_id]['course_current']
- },
- "IRR (%)": {
- "number": irr_notion
- },
- "Dividends (€)": {
- "number": trades[notion_page_id]['dividends']
- }
- }
- # Update the properties of the corresponding notion-page
- notion_update_page(notion_page_id, notion_update)
- except:
- error_count = error_count + 1
- # Wait for the API to cool off
- print(".", end="", flush=True)
- time.sleep(config.api_cooldowm_time)
- # Logging
- print(" ", end="", flush=True)
- if error_count == 0:
- logging(logging_level="success")
- elif error_count < number_of_uploads:
- logging(logging_level="warning")
- logging(logging_level="success", message=f"Updating notion trades failed for {error_count} out of {number_of_uploads} entries")
- else:
- logging(logging_level="error")
- logging(logging_level="success", message=f"Updating notion trades failed for all {error_count} entries")
- # UPDATE NOTION-INVESTMENT-OVERVIEW
- def push_notion_investment_update(investments):
-
- # Logging
- error_count = 0
- number_of_uploads = 0
-
- for notion_page_id in investments:
-
- number_of_uploads = number_of_uploads+1
- # Try uploading an update
- try:
- # The irr is stored in the format 1.2534
- # Notion need the format 0,2534
- irr_notion = investments[notion_page_id]['current_irr'] - 1
- irr_notion = round(irr_notion, 4)
-
- # Construct Notion-Update-Object
- notion_update = {
- "Current (€)": {
- "number": investments[notion_page_id]['current_value']
- },
- "IRR (%)": {
- "number": irr_notion
- },
- "Performance (€)": {
- "number": investments[notion_page_id]['total_performanance']
- },
- "Dividends (€)": {
- "number": investments[notion_page_id]['total_dividends']
- }
- }
- # Update the properties of the corresponding notion-page
- notion_update_page(notion_page_id, notion_update)
- except:
- error_count = error_count + 1
-
- # Wait for the API to cool off
- print(".", end="", flush=True)
- time.sleep(config.api_cooldowm_time)
-
- # Logging
- print(" ", end="", flush=True)
- if error_count == 0:
- logging(logging_level="success")
- elif error_count < number_of_uploads:
- logging(logging_level="warning")
- logging(logging_level="success", message=f"Updating notion investments failed for {error_count} out of {number_of_uploads} entries")
- else:
- logging(logging_level="error")
- logging(logging_level="success", message=f"Updating notion investments failed for all {error_count} entries")
- # TRMNL UPDATE DIAGRAMMS
- def push_trmnl_update_chart(trmnl_update_object, trmnl_url, trmnl_headers):
- # Send the data to TRMNL
- try:
- data = json.dumps(trmnl_update_object, indent=2) # Converts a python-dictionary into a json
- reply = requests.post(trmnl_url, data=data, headers=trmnl_headers)
-
- # Logging
- if reply.status_code == 200:
- logging(logging_level="success")
- elif reply.status_code == 429:
- logging_level="success"
- logging(logging_level="warning")
- logging(logging_level="warning", message="Exceeded TRMNL's API rate limits")
- logging(logging_level="warning", message="Waiting some time should work")
- elif reply.status_code == 422:
- logging(logging_level="warning")
- logging(logging_level="warning", message="Upload successful, but data cannot be displayed correctly")
- logging(logging_level="warning", message="The payload is probably to large in size")
- else:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed pushing data to TRMNL with server reply code: {reply.status_code}")
- logging(logging_level="debug", message=f"Complete server reply message: {reply}")
- except Exception as e:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed pushing data to TRMNL with error code: {e}")
- # ----------------------------- #
- # HISTORY CALCULATION FUNCTIONS #
- # ----------------------------- #
- # CALC HISTORY PER TRADE
- def calc_history_per_trade(trades, yf_data):
-
- # Create support variables
- history_per_trade = {}
- total_dividends = 0
- date_open_oldest_trade = get_date_open_oldest_trade(trades)
- # Logging & statistics
- missing_day_entrys = 0
- days_formated = 0
- number_of_trades = 0
-
- # As this history is so important, it is okay if this functions fails in total if errors araise
- try:
- # ------------------ LOOP OVER ALL TRADES
- for trade_id in trades:
- # Statistics
- number_of_trades = number_of_trades +1
-
- # ------------------ PREPARE FOR THE (NEXT) LOOP OVER ALL DAYS
- # Set / Reset the index-date to the oldest trade day
- # Resetting is required so that the calculations for the next trade start with day 1
- index_date = date_open_oldest_trade
- # Set the initial value for the course on the previous day to 0
- # Just in case the very first trade was made on a weekend somehow, where there is no yfinance data available
- previous_course = 0.0
- # Check, if the trade was closed already
- # If it was not, set the closure date to the future (Trick 17)
- if trades[trade_id]["date_close"] == 0:
- date_close = datetime.date.today() + datetime.timedelta(days=1)
- else:
- date_close = trades[trade_id]["date_close"]
- date_open = trades[trade_id]["date_open"]
- # Keep ticker for connecting performance later
- ticker = trades[trade_id]['ticker']
- # ------------------ DETERMINE THE COUSE PER DAY
- while index_date != datetime.date.today() + datetime.timedelta(days=1):
- # Statistics
- days_formated = days_formated +1
- # Fetch course for the day & eventual dividends from yf_data
- try:
- current_course = yf_data[ticker].at[index_date, 'Close']
- current_dividends_per_ticker = yf_data[ticker].at[index_date, 'Dividends']
-
- # Catch missing yf-data (eg. for weekends) by reusing course from previous day
- except:
- current_course = previous_course
- current_dividends_per_ticker = 0.0 # there are never dividends on non-trading days
- missing_day_entrys = missing_day_entrys +1 # Increase the warning count
- # Catch the special case of the day when the trade was closed
- # In this case, the current course needs to be overwritten with the sell-value
- if date_close == index_date:
- current_course = trades[trade_id]['course_close']
- # Save the result for the next iteration
- # This setup also makes it possible, that a previous course is passed down across mutiple days
- # This makes sense is case i.e. for a weekend
- previous_course = current_course
- # ------------------ CALCULATE PERFORMANCE IF REQUIRED
- if index_date >= date_open and index_date <= date_close:
- # Calculate performance values
- current_amount = trades[trade_id]['units']
- current_invested = current_amount * trades[trade_id]['course_open']
- total_dividends = total_dividends + current_amount * current_dividends_per_ticker
- current_value = current_amount * current_course
- current_value_with_dividends = current_value + total_dividends
- current_irr = calculate_irr(index_date, date_open, current_value_with_dividends, current_invested)
- total_performanance = current_value_with_dividends - current_invested
- if current_value_with_dividends == 0:
- print("0-value Error with ticker: {}".format(ticker))
-
- else:
- # Write 0, if trade is not relevant for current timeframe
- current_amount = 0
- current_invested = 0.00
- total_dividends = 0.00
- current_value = 0.00
- current_irr = 0.00
- total_performanance = 0.0
- # ------------------ STORE RESULTS
- index_date_iso = index_date.isoformat()
- # Store all values into a dict
- dict_a = {}
- dict_a['current_amount'] = current_amount
- dict_a['current_invested'] = current_invested
- dict_a['total_dividends'] = total_dividends
- dict_a['current_value'] = current_value
- dict_a['current_irr'] = current_irr
- dict_a['current_course'] = current_course
- dict_a['total_performanance'] = total_performanance
- # Check if the date is already present
- if index_date_iso in history_per_trade:
- dict_b = history_per_trade[index_date_iso]
- else:
- dict_b = {}
- # Add the values to the trade_id value-pair
- dict_b[trade_id] = dict_a
- # Update the hostory_per_trade
- history_per_trade.update({index_date_iso : dict_b})
- # ------------------ NEXT ITERATION
- index_date = index_date + datetime.timedelta(days=1)
- # ------------------ LOGGING & DEBUGING
- # Debug writing history to disk
- if config.selected_logging_level == "debug":
- data = json.dumps(history_per_trade, indent=2) # Converts a python-dictionary into a json
- with open("history_per_trade.json", "w") as f:
- f.write(data)
- # Logging logging
- if missing_day_entrys == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"created a history with {days_formated} across all {number_of_trades} tickers o_O")
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"No yf-data available in {missing_day_entrys} cases accross all {number_of_trades} tickers")
- logging(logging_level="warning", message="Probably reason is non-trading-days eg. weekends")
- logging(logging_level="warning", message="Used values from previous trade-day instead")
- # Return date
- return history_per_trade
-
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error message: {error_message}")
- return False
- # CALC THE HISTORY PER TRADE & OVERALL
- def calc_history_per_ticker(history_per_trade, tickers, trades):
-
- # ------------------ CREATE JSON OBJECT
- # Create the json-dict
- history_per_ticker = {}
- # Logging & statistics
- missing_day_entrys = 0
- days_formated = 0
- # As this history is so important, it is okay if this functions fails in total if errors araise
- try:
- # Loop over each date entry in the history
- for date_entry in history_per_trade:
-
- # Statistics
- days_formated = days_formated +1
- # Create a dict to store the results per day and ticker
- dict_daily = {}
- for ticker in tickers:
- dict_daily[ticker] = {}
- dict_daily[ticker]["current_invested"] = 0
- dict_daily[ticker]["total_dividends"] = 0
- dict_daily[ticker]["current_value"] = 0
- dict_daily[ticker]["current_irr"] = 0
- dict_daily[ticker]["current_irr"] = 0
- dict_daily[ticker]["total_performanance"] = 0
- dict_daily[ticker]["current_amount"] = 0 # Added only for ticker entries, not for the "total" value
- dict_daily[ticker]["current_course"] = 0 # Added only for ticker entries, not for the "total" value
- dict_daily["total"] = {}
- dict_daily["total"]["current_invested"] = 0
- dict_daily["total"]["total_dividends"] = 0
- dict_daily["total"]["current_value"] = 0
- dict_daily["total"]["current_irr"] = 0
- dict_daily["total"]["total_performanance"] = 0
- # Loop over each trade-entry for that day
- for trade_id in history_per_trade[date_entry]:
- # Extract data from the history_per_trade
- trade_amount = history_per_trade[date_entry][trade_id]['current_amount']
- trade_invested = history_per_trade[date_entry][trade_id]['current_invested']
- trade_dividends = history_per_trade[date_entry][trade_id]['total_dividends']
- trade_value = history_per_trade[date_entry][trade_id]['current_value']
- trade_irr = history_per_trade[date_entry][trade_id]['current_irr']
- trade_course = history_per_trade[date_entry][trade_id]['current_course']
- trade_performanance = history_per_trade[date_entry][trade_id]['total_performanance']
- # Lookup the ticker by the trade-id
- ticker = trades[trade_id]["ticker"]
- # Extract data from the history_per_ticker
- ticker_amount = dict_daily[ticker]['current_amount']
- ticker_invested = dict_daily[ticker]['current_invested']
- ticker_dividends = dict_daily[ticker]['total_dividends']
- ticker_value = dict_daily[ticker]['current_value']
- ticker_irr = dict_daily[ticker]['current_irr']
- ticker_performanance = dict_daily[ticker]['total_performanance']
- # Overwrite the values in the history_per_ticker
- dict_daily[ticker]['current_amount'] = ticker_amount + trade_amount # Simple addition works
- dict_daily[ticker]['current_invested'] = ticker_invested + trade_invested
- dict_daily[ticker]['total_dividends'] = ticker_dividends + trade_dividends
- dict_daily[ticker]['current_value'] = ticker_value + trade_value
- dict_daily[ticker]['total_performanance'] = ticker_performanance + trade_performanance
- dict_daily[ticker]['current_course'] = trade_course # Simple overwrite is fine, as the course is the same for all trades
- if ticker_invested == 0 and trade_invested == 0:
- dict_daily[ticker]['current_irr'] = 0
- # Catch 0 values
- else:
- dict_daily[ticker]['current_irr'] = (ticker_irr * ticker_invested + trade_irr * trade_invested) / (ticker_invested + trade_invested)
- # --> IRR is adjusted based on the trade values. This way a trade of 25% of the initial trade volume has only a 25% influence on the irr
- # Calculate the "total" entry after finishing with all the trades
- for ticker in tickers:
- # Same logic as above, but shortended code
- dict_daily["total"]['total_dividends'] = dict_daily["total"]['total_dividends'] + dict_daily[ticker]['total_dividends']
- dict_daily["total"]['current_value'] = dict_daily["total"]['current_value'] + dict_daily[ticker]['current_value']
- dict_daily["total"]['total_performanance'] = dict_daily["total"]['total_performanance'] + dict_daily[ticker]['total_performanance']
- # Extracting the values before rewriting them, to preserve them for the IRR calculation
- total_invested = dict_daily["total"]['current_invested']
- ticker_invested = dict_daily[ticker]['current_invested']
- dict_daily["total"]['current_invested'] = total_invested + ticker_invested
-
- # Extracting the values before rewriting them, to preserve them for the IRR calculation
- if ticker_invested == 0 and total_invested == 0:
- dict_daily["total"]['current_irr'] = 0
- else:
- total_irr = dict_daily["total"]['current_irr']
- ticker_irr = dict_daily[ticker]['current_irr']
- dict_daily["total"]['current_irr'] = (total_irr * total_invested + ticker_irr * ticker_invested) / (total_invested + ticker_invested)
-
- # Finally, write the results for this day-entry to the history_per_ticker
- history_per_ticker[date_entry] = dict_daily
- # ------------------ LOGGING & DEBUGING
- # Debugging
- if config.selected_logging_level == "debug":
- data = json.dumps(history_per_ticker, indent=2) # Converts a python-dictionary into a json
- with open("history_per_ticker.json", "w") as f:
- f.write(data)
- # Success Logging
- logging(logging_level="success")
- logging(logging_level="info", message=f"created a history with {days_formated} days formated o_O")
- return history_per_ticker
-
- # Error Logging
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error message: {error_message}")
- return False
- # --------------------------- #
- # HISTORY SELECTION FUNCTIONS #
- # --------------------------- #
- # FILTER ANY HISTORY OBJECT TO SELECTED DATES
- def filter_history_by_list(history, dates_list):
- filtered_history = {}
- try:
- # Loop over all days
- for index_date in history:
- # Check, if the history-date is in the filter-list
- if index_date in dates_list:
- # If so, add this date-entry to the filtered history object
- filtered_history[index_date] = history[index_date]
- # Main Logging
- logging(logging_level="success")
- return filtered_history
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # SELECT CURRENT VALUES PER TRADE
- def select_current_value_per_trade(trades, history_per_trade):
-
- # Logging
- format_errors = 0
- # Loop over all trades
- for trade_id in trades:
- try:
- # Determine, what values to fetch based on whether the trade was closed already
- date_closed = trades[trade_id]["date_close"]
- if date_closed == 0:
-
- # If trade still open, use performance data from today
- index_date_iso = datetime.date.today().isoformat()
-
- else:
- # If trade closed, use performance data from close-date
- index_date_iso = date_closed.isoformat()
-
- # Fetch data from history and save for this trade
- trades[trade_id]["course_current"] = history_per_trade[index_date_iso][trade_id]['current_course']
- trades[trade_id]["irr"] = history_per_trade[index_date_iso][trade_id]['current_irr']
- trades[trade_id]["dividends"] = history_per_trade[index_date_iso][trade_id]['total_dividends']
- except:
- format_errors = format_errors + 1
- # Logging logging
- if format_errors == 0:
- logging(logging_level="success")
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"Failed updating the current value per trade in {format_errors} cases")
- return trades
- # SELECT CURRENT VALUES PER TICKER
- def select_current_value_per_ticker(investments, history_per_ticker):
- # Logging
- format_errors = 0
- # Loop over all investments
- for investment_id in investments:
- try:
- # Generate the iso-date of today as the required index
- index_date_iso = datetime.date.today().isoformat()
- # Get the ticker corresponding to the investment
- ticker = investments[investment_id]["ticker"]
- # Select latest data from history and save for this investment
- investments[investment_id]["total_dividends"] = history_per_ticker[index_date_iso][ticker]['total_dividends']
- investments[investment_id]["current_value"] = history_per_ticker[index_date_iso][ticker]['current_value']
- investments[investment_id]["current_irr"] = history_per_ticker[index_date_iso][ticker]['current_irr']
- investments[investment_id]["total_performanance"] = history_per_ticker[index_date_iso][ticker]['total_performanance']
- except:
- format_errors = format_errors + 1
- # Logging
- if format_errors == 0:
- logging(logging_level="success")
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"Failed updating the current value per ticker in {format_errors} cases")
- return investments
- # TRMNL CREATE IRR-UPDATE
- def prep_trmnl_chart_udpate(history_to_show, series_to_show_1 = "total", data_to_show_1 = "current_value", series_to_show_2 = "bechnmark", data_to_show_2 = "current_value"): # default value = current invested
-
- # Setup
- dict_big_numbers = {}
- charts_data = []
- chart_1 = {}
- chart_2 = {}
- try:
- # Fetch the latest date entry from the history
- index_date_iso = fetch_last_key_from_dict(history_to_show)
-
- # Select latest data from history for the big-numbers
- current_value = history_to_show[index_date_iso]["total"]["current_value"]
- total_performanance = history_to_show[index_date_iso]["total"]["total_performanance"]
- current_irr = history_to_show[index_date_iso]["total"]["current_irr"]
- current_irr = (current_irr -1) *100
-
- # Round the nubers
- dict_big_numbers["current_value"] = str(round(current_value, 0))
- dict_big_numbers["total_performanance"] = str(round(total_performanance, 0))
- dict_big_numbers["current_irr"] = str(round(current_irr, 2))
- # Catching false inputs for the series to show
- possible_series_to_show = list(history_to_show[index_date_iso].keys()) # Get a list of all the series values, that could be shown
- if series_to_show_1 not in possible_series_to_show: # checks, if the selected series is not part of the history-object sent to the function
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'total' as the series to show, as the input was not valid")
- series_to_show_1 = "total"
- if series_to_show_2 not in possible_series_to_show: # checks, if the selected series is not part of the history-object sent to the function
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'total' as the series to show, as the input was not valid")
- series_to_show_2 = "total"
- # Catching false inputs for the data to show
- possible_data_to_show = list(history_to_show[index_date_iso][series_to_show_1].keys())
- if data_to_show_1 not in possible_data_to_show:
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'current invested' as chart data, as the input was not valid")
- data_to_show_1 = "current_value"
- possible_data_to_show = list(history_to_show[index_date_iso][series_to_show_2].keys())
- if data_to_show_2 not in possible_data_to_show:
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'current invested' as chart data, as the input was not valid")
- data_to_show_2 = "current_value"
- # Create space for storing values
- chart_1["data"] = []
- chart_2["data"] = []
- # Format the chart data into the right data
- for date in history_to_show:
-
- # Extract the value to be stored
- value_to_show_1 = history_to_show[date][series_to_show_1][data_to_show_1]
- value_to_show_2 = history_to_show[date][series_to_show_2][data_to_show_2]
- # Catch the case irr and convert to percent
- if data_to_show_1 == "current_irr":
- value_to_show_1 = (value_to_show_1 -1) *100
-
- if data_to_show_2 == "current_irr":
- value_to_show_2 = (value_to_show_2 -1) *100
- # Round to 2 decimal values
- value_to_show_1 = round(value_to_show_1, 2)
- value_to_show_2 = round(value_to_show_2, 2)
- # Extend the date by a timestamp
- json_date = datetime.date.fromisoformat(date) # Convert ISO-String to python date-object
- json_date = datetime.datetime.combine(json_date, datetime.datetime.min.time()) # Combine the date with midnight (00:00:00) to create a datetime object
- json_date = json_date.isoformat() # Convert back to ISO-String, now including a time
- # Store the values together with the corresponding date
- value_1 = [json_date, value_to_show_1]
- value_2 = [json_date, value_to_show_2]
-
- # Add the value pair to the list of values for this chart
- chart_1["data"].append(value_1)
- chart_2["data"].append(value_2)
- # Add the two series to the list of series in the TRML object
- charts_data.append(chart_1)
- charts_data.append(chart_2)
- # Generating nicer series titels
- if series_to_show_1 == "total":
- series_to_show_1 = "Portfolio"
- if series_to_show_2 == "total":
- series_to_show_2 = "Portfolio"
- if series_to_show_1 == "benchmark":
- series_to_show_1 = "Benchmark: " + config.ticker_benchmark
- if series_to_show_2 == "benchmark":
- series_to_show_2 = "Benchmark: " + config.ticker_benchmark
- # Generating nicer data titels
- data_to_show_1 = data_to_show_1.replace("_", " ").capitalize()
- data_to_show_2 = data_to_show_2.replace("_", " ").capitalize()
- # Increase look of IRR even more
- # Funktioniert auch dann, wenn "irr" nicht vorkommt
- data_to_show_1 = data_to_show_1.replace("irr", "IRR")
- data_to_show_2 = data_to_show_2.replace("irr", "IRR")
- # Generate the chat names / desciptions
- chart_1["name"] = data_to_show_1 + " " + series_to_show_1
- chart_2["name"] = data_to_show_2 + " " + series_to_show_2
- # Construct the trmnl_object
- trmnl_update_object = {}
- trmnl_update_object["merge_variables"] = {}
- trmnl_update_object["merge_variables"]["big_numbers"] = dict_big_numbers
- trmnl_update_object["merge_variables"]["charts"] = charts_data
- # Debugging
- if config.selected_logging_level == "debug":
- data = json.dumps(trmnl_update_object, indent=2) # Converts a python-dictionary into a json
- with open("trmnl_update_object.json", "w") as f:
- f.write(data)
- # Main Logging
- logging(logging_level="success")
- return trmnl_update_object
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
|