### --------------------------------------------- ---------------------- --------------------------------------------- ### ### --------------------------------------------- PROGRAMM CONFIGUARTION --------------------------------------------- ### ### --------------------------------------------- ---------------------- --------------------------------------------- ### # Program Functionality Switch update_notion = True update_TRMNL = True calculate_benchmark = True log_coloring = False # Program Functionality Configuration programm_cooldown_time = 15 # Programm cooldown timer in minutes api_cooldowm_time = 0.1 # API cooldown timer in minutes trmnl_granularity = 80 # Days in between two data points in the TRMNL chart ticker_benchmark = "VGWL.DE" # Ticker to benchmark the trades against # Programm Execution Configuration selected_logging_level = "warning" # must be one from the list below logging_levels = ("none", "error", "success", "warning", "info", "debug") # ordered by amount of logs class log_colors: error = '\033[91m' warning = '\033[93m' success = '\033[92m' info = '\033[90m' debug = '\033[4m' endcode = '\033[0m' ### --------- API CONFIGURATION # NOTION notion_token = "secret_b7PiPL2FqC9QEikqkAEWOht7LmzPMIJMWTzUPWwbw4H" notion_headers = { "Authorization": "Bearer " + notion_token, "Content-Type": "application/json", "Notion-Version": "2022-02-22" } notion_db_id_trades = "95f7a2b697a249d4892d60d855d31bda" notion_db_id_investments = "2ba10a5f51bd8160ab9ee982bbef8cc3" notion_db_id_performance = "1c010a5f51bd806f90d8e76a1286cfd4" # TRMNL trmnl_headers = {"Content-Type": "application/json"} trmnl_url_chart_1 = "https://usetrmnl.com/api/custom_plugins/334ea2ed-1f20-459a-bea5-dca2c8cf7714" trmnl_url_chart_2 = "https://usetrmnl.com/api/custom_plugins/72950759-38df-49eb-99fb-b3e2e67c385e" trmnl_url_chart_3 = "https://usetrmnl.com/api/custom_plugins/a975543a-51dc-4793-b7fa-d6a101dc4025" ### -------------------- LIBARIES import datetime import time import json import yfinance as yf import pandas as pd import requests ### ---------------------------------------------------- --------- ---------------------------------------------------- ### ### ---------------------------------------------------- FUNCTIONS ---------------------------------------------------- ### ### ---------------------------------------------------- --------- ---------------------------------------------------- ### # ---------------- # # HELPER FUNCTIONS # # ---------------- # # LOGGING / PRINTING TO TERMINAL def logging(message = "", logging_level = "", new_line = True): # Take the selected logging level in the config file # Look this up in the list of all available logging levels in the config file # Return the index number config_logging_level = logging_levels.index(selected_logging_level) try: # Take the logging level of the text to print # Look this up in the list of all available logging levels in the config file # Return the index number message_logging_level = logging_levels.index(logging_level) except: # Fallback to the least important logging level # Solved by checking the lenght of the available logging levels message_logging_level = len(logging_levels) # Check for false new_line entries if new_line is not bool: new_line = True # Check if the warning should be printed if message_logging_level <= config_logging_level: # Check, if colored logs are switched on if log_coloring == True: # Geting the log color log_color = getattr(log_colors, logging_level) # Construct the logging-text incl. color log_text = str(log_color + "[" + logging_level + "] " + log_colors.endcode + message) else: # Construct the logging-text incl. color log_text = str("[" + logging_level + "] " + message) # Check if the warning should end with a new-line # Printing the text if new_line == True: print(log_text) else: print(log_text, end=" ", flush=True) # CALCULATE THE IRR def calculate_irr(date_now, date_open, value_now, value_open): error = False irr = 0.0 try: # Count the number in days a = date_now - date_open a = a.days # Am Tag des Kaufs selbst, liegt das Delta in Tagen bei 0 # Um dennoch einen IRR kalkulieren zu können, wird das Delta auf 1 gsetzt if a == 0: a = 1 a = a / 365 # Umrechnung auf Jahresanteil, um auch den Jahreszinssatz zu bekommen b = value_now / value_open # Catch negative IRRs if b < 0: b = b * (-1) irr = b**(1/a) # matematisch identisch zur b-ten Wurzel von a irr = irr * (-1) else: irr = b**(1/a) # matematisch identisch zur b-ten Wurzel von a except: error = True # Return data if successful if error == True: print("[ERROR] Calculation of irr") return error else: return irr # GET THE DAY OF THE OLDEST TRADE def get_date_open_oldest_trade(trades): # Identify the open date for the oldest trade date_open_oldest_trade = datetime.date.today() for i in trades: if trades[i]["date_open"] < date_open_oldest_trade: date_open_oldest_trade = trades[i]["date_open"] return date_open_oldest_trade # CREATES LIST OF UNIQUE TICKERS def filter_list_of_tickers(trades): tickers = [] try: for i in trades: # Fetch ticker belonging to trade ticker = trades[i]['ticker'] # Add ticker to list, if not already present if ticker not in tickers: tickers.append(ticker) # Main Logging logging(logging_level="success") logging(logging_level="info", message=f"{len(tickers)} tickers found") return tickers except Exception as error_message: logging(logging_level="error") logging(logging_level="error", message=f"Failed with error: {error_message}") return False # CREATE LIST OF WEEKLY DATES def create_list_filtered_dates(trades, days_seperation): stop_date = get_date_open_oldest_trade(trades) index_date = datetime.date.today() try: # Create reversed list (1st entry is today going back in time) list_filtered_dates = [] while index_date >= stop_date: list_filtered_dates.append(index_date.isoformat()) index_date = index_date - datetime.timedelta(days=days_seperation) # Reverse the list, so that the frist entry is the oldest one list_filtered_dates.reverse() # Main Logging logging(logging_level="success") logging(logging_level="info", message=f"{len(list_filtered_dates)} dates in weekly list") return list_filtered_dates except Exception as error_message: logging(logging_level="error") logging(logging_level="error", message=f"Failed with error: {error_message}") return False # FETCH THE LAST INDEX FROM A DICT def fetch_last_key_from_dict(dict): key_list = list(dict.keys()) # Extract the keys and convert them to a list last_key = key_list[-1] # select the last entry from the list as it is the most current entry return last_key # ADD BENCHMARK-TICKER TO TICKER-DICT def add_benchmark_ticker(tickers, ticker_benchmarkt): tickers.append(ticker_benchmarkt) logging(logging_level="success") return tickers # CREATE BENCHMARK TRADES def create_benchmark_trades(trades, yf_data): # Prepertion benchmark_trades = {} i = 0 # Creating benchmark trades try: for trade_id in trades: # Benchmark-id i = i+1 benchmark_id = "benchmark" + str(i) # Copy raw trades benchmark_trades[benchmark_id] = trades[trade_id] benchmark_trades[benchmark_id]["ticker"] = ticker_benchmark # Calculate amount invested amount_invested = benchmark_trades[benchmark_id]["units"] * benchmark_trades[benchmark_id]["course_open"] # Change course-open for benchmark-ticker performance calculation success = False index_date = benchmark_trades[benchmark_id]["date_open"] while success == False: try: course_open_new = yf_data[ticker_benchmark].at[index_date, 'Close'] success = True except: index_date = index_date + datetime.timedelta(days=1) benchmark_trades[benchmark_id]["course_open"] = course_open_new # type: ignore # Change amount for benchmark-ticker performance calculation benchmark_trades[benchmark_id]["units"] = amount_invested / course_open_new # type: ignore # Change course-open for benchmark-ticker performance calculation, if relevant if trades[trade_id]["date_close"] != 0: success = False index_date = benchmark_trades[benchmark_id]["date_close"] while success == False: try: course_close_new = yf_data[ticker_benchmark].at[index_date, 'Close'] success = True except: index_date = index_date + datetime.timedelta(days=1) benchmark_trades[benchmark_id]["course_close"] = course_close_new # type: ignore # Logging logging(logging_level="success") return benchmark_trades except Exception as error_message: logging(logging_level="error") logging(logging_level="error", message=f"Failed with error: {error_message}") return False # MERGE BENCHMARK HISTORY & TICKER-HISTROY def merge_histories(history_per_ticker, benchmark_history): # Preperation benchmark_ticker = ticker_benchmark error_count = 0 # Merging Data for index_date in history_per_ticker: try: history_per_ticker[index_date]["benchmark"] = {} history_per_ticker[index_date]["benchmark"]["current_invested"] = benchmark_history[index_date][benchmark_ticker]["current_invested"] history_per_ticker[index_date]["benchmark"]["total_dividends"] = benchmark_history[index_date][benchmark_ticker]["total_dividends"] history_per_ticker[index_date]["benchmark"]["current_value"] = benchmark_history[index_date][benchmark_ticker]["current_value"] history_per_ticker[index_date]["benchmark"]["current_irr"] = benchmark_history[index_date][benchmark_ticker]["current_irr"] history_per_ticker[index_date]["benchmark"]["total_performanance"] = benchmark_history[index_date][benchmark_ticker]["total_performanance"] except: error_count = error_count +1 # Debugging if selected_logging_level == "debug": data = json.dumps(history_per_ticker, indent=2) # Converts a python-dictionary into a json with open("history_per_ticker_with_benchmark.json", "w") as f: f.write(data) # Main Logging if error_count == 0: logging(logging_level="success") return history_per_ticker else: logging(logging_level="warning") logging(logging_level="warning", message=f"Error merging benchmark-history into ticker-history in {error_count} cases") return False # -------------------------- # # NETWORK DOWNLOAD FUNCTIONS # # -------------------------- # # NOTION FETCH PAGES def notion_get_pages(db_id_trades, num_pages=None): try: # ------------------ FETCH THE FIRST 100 PAGES FROM A DB # Prepare Request url = f"https://api.notion.com/v1/databases/{db_id_trades}/query" get_all = num_pages is None # If num_pages is None, get all pages, otherwise just the defined number. page_size = 100 if get_all else num_pages payload = {"page_size": page_size} # Make Request raw_response = requests.post(url, json=payload, headers=notion_headers) # Process Reply parsed_response = raw_response.json() result = parsed_response["results"] # ------------------ FETCH 100 MORE PAGES AS OFTEN AS REQUIRED while parsed_response["has_more"] and get_all: # Prepare Request payload = {"page_size": page_size, "start_cursor": parsed_response["next_cursor"]} url = f"https://api.notion.com/v1/databases/{db_id_trades}/query" # Make Request raw_response = requests.post(url, json=payload, headers=notion_headers) # Process Reply parsed_response = raw_response.json() result.extend(parsed_response["results"]) # Logging return result except Exception: return True # Return True when there was an error # NOTION FETCH & FORMAT TRADES def fetch_format_notion_trades(db_id_trades): trades = {} fetch_error = False format_errors = 0 number_of_trades = 0 error_message = "" # Download data from notion data = notion_get_pages(db_id_trades) # Check, if cuccessfull if data is True: fetch_error = True else: # Format the recieved data for i in data: # Count for stratistics number_of_trades = number_of_trades + 1 # Each page is loaded as a dictionary notion_page = dict(i) # Handling desired missing entries try: date_close = notion_page["properties"]["Close"]["date"] date_close = date_close["start"] date_close = datetime.date(*map(int, date_close.split('-'))) except: date_close = 0 # Handeling non-desired missing entries (by skipping this trade) try: # Try extracting values trade = {} # Format date-open date_open = notion_page["properties"]["Open"]["date"] date_open = date_open["start"] date_open = datetime.date(*map(int, date_open.split('-'))) # Combine data into json structure trade = { 'ticker' : notion_page["properties"]["Ticker"]["select"]["name"], 'date_open' : date_open, 'date_close' : date_close, 'course_open' : notion_page["properties"]["Open (€)"]["number"], 'course_close' : notion_page["properties"]["Close (€)"]["number"], 'course_current' : notion_page["properties"]["Current (€)"]["number"], 'irr' : notion_page["properties"]["IRR (%)"]["number"], 'units' : notion_page["properties"]["Units"]["number"], 'dividends' : notion_page["properties"]["Dividends (€)"]["number"] } # Save values notion_page_id = notion_page["id"] # Use as key for the dictionary trades[notion_page_id] = trade except Exception as e: format_errors = format_errors + 1 error_message = e # Logging if fetch_error == True: logging(logging_level="error") logging(logging_level="error", message=f"Failed with error: {error_message}") return False else: # Writing Example File if selected_logging_level == "debug": with open("trades.json", "w") as f: f.write(str(trades)) # Logging if format_errors == 0: logging(logging_level="success") logging(logging_level="info", message=f"{number_of_trades} trades recieved and formated") return trades else: logging(logging_level="warning") logging(logging_level="warning", message=f"{format_errors} trades out of {number_of_trades} skiped...maybe due to missing values?") return trades # NOTION FETCH & FORMAT INVESTMENT OVERVIEW def fetch_format_notion_investments(db_id_investments): investments = {} fetch_error = False format_errors = 0 number_of_investments = 0 # Download data & check for success data = notion_get_pages(db_id_investments) if data is True: error = True else: # Format recieved data for i in data: # Count up for statistics number_of_investments = number_of_investments + 1 try: # Each page is loaded as a dictionary notion_page = dict(i) # Extract values notion_page_id = notion_page["id"] # Use as key for the dictionary investments[notion_page_id] = {} investments[notion_page_id]["ticker"] = notion_page["properties"]["Ticker"]["select"]["name"] investments[notion_page_id]["total_dividends"] = notion_page["properties"]["Dividends (€)"]["number"] investments[notion_page_id]["current_value"] = notion_page["properties"]["Current (€)"]["number"] investments[notion_page_id]["current_irr"] = notion_page["properties"]["IRR (%)"]["number"] investments[notion_page_id]["total_performanance"] = notion_page["properties"]["Performance (€)"]["number"] # Skip this entry, if errors show up except: format_errors = format_errors + 1 # Main Logging if fetch_error == False & format_errors == 0: logging(logging_level="success") logging(logging_level="info", message=f"{number_of_investments} investments recieved and formated") return investments elif fetch_error == False & format_errors > 0: logging(logging_level="warning") logging(logging_level="warning", message=f"{format_errors} investments out of {number_of_investments} skiped...maybe due to missing values?") return investments else: logging(logging_level="error") return False # YFINANCE FETCH & FORMAT DATA def fetch_format_yf_data(tickers): yf_data = {} fetch_errors = 0 format_errors = 0 number_of_tickers = 0 # Download data for each ticker seperately for i in tickers: number_of_tickers = number_of_tickers +1 skip_formating = False # Helper varianbel (see flow logik) ticker = i # Catch errors during the download try: # Download data api = yf.Ticker(ticker) data = api.history(period="max", auto_adjust=False) except: # Store error for later logging fetch_errors = fetch_errors + 1 data = True # If the download was successfull: if skip_formating == False: # Try formating the data try: # Convert to Pandas DataFrame data = pd.DataFrame(data) # type: ignore # Delete the columns "Stock Splits", "High", "Low" and "Open" del data['Open'] del data['Low'] del data['High'] del data['Volume'] # Delete these 2 columns, if they exist if 'Stock Splits' in data.columns: del data['Stock Splits'] if 'Capital Gains' in data.columns: del data['Capital Gains'] # Get the Number of rows in data data_rows = data.shape[0] # Create new index without the time from the existing datetime64-index old_index = data.index new_index = [] x = 0 while x < data_rows: date = pd.Timestamp.date(old_index[x]) # Converts the "Pandas Timestamp"-object to a "date" object new_index.append(date) x+=1 # Add the new index to the dataframe and set it as the index data.insert(1, 'Date', new_index) data.set_index('Date', inplace=True) # Save the data-frame to the yf_data dict yf_data[ticker] = data # Handle formating errors except: format_errors = format_errors +1 # in case of an error the entry never get's added to the yf_data object # Wait for the API to cool down print(".", end="", flush=True) time.sleep(api_cooldowm_time) # Main Logging print(" ", end="", flush=True) if fetch_errors == 0 & format_errors == 0: logging(logging_level="success") logging(logging_level="info", message=f"{number_of_tickers} tickers recieved and formated") return yf_data elif fetch_errors == 0 & format_errors > 0: logging(logging_level="warning") logging(logging_level="warning", message=f"{format_errors} tickers out of {number_of_tickers} skiped") return yf_data else: logging(logging_level="error") logging(logging_level="error", message=f"Failed with error: {number_of_tickers}") print("\n") return False # ------------------------ # # NETWORK UPLOAD FUNCTIONS # # ------------------------ # # NOTION UPDATE PAGES def notion_update_page(page_id: str, data: dict): url = f"https://api.notion.com/v1/pages/{page_id}" payload = {"properties": data} results = requests.patch(url, json=payload, headers=notion_headers) return results # UPDATE NOTION-TRADES-DATABASE def push_notion_trades_update(trades): # Logging error_count = 0 number_of_uploads = 0 for notion_page_id in trades: number_of_uploads = number_of_uploads+1 try: # The irr is stored in the format 1.2534 # Notion need the format 0,2534 irr_notion = trades[notion_page_id]['irr'] - 1 irr_notion = round(irr_notion, 4) # Construct Notion-Update-Object notion_update = { "Current (€)": { "number": trades[notion_page_id]['course_current'] }, "IRR (%)": { "number": irr_notion }, "Dividends (€)": { "number": trades[notion_page_id]['dividends'] } } # Update the properties of the corresponding notion-page notion_update_page(notion_page_id, notion_update) except: error_count = error_count + 1 # Wait for the API to cool off print(".", end="", flush=True) time.sleep(api_cooldowm_time) # Logging print(" ", end="", flush=True) if error_count == 0: logging(logging_level="success") elif error_count < number_of_uploads: logging(logging_level="warning") logging(logging_level="success", message=f"Updating notion trades failed for {error_count} out of {number_of_uploads} entries") else: logging(logging_level="error") logging(logging_level="success", message=f"Updating notion trades failed for all {error_count} entries") # UPDATE NOTION-INVESTMENT-OVERVIEW def push_notion_investment_update(investments): # Logging error_count = 0 number_of_uploads = 0 for notion_page_id in investments: number_of_uploads = number_of_uploads+1 # Try uploading an update try: # The irr is stored in the format 1.2534 # Notion need the format 0,2534 irr_notion = investments[notion_page_id]['current_irr'] - 1 irr_notion = round(irr_notion, 4) # Construct Notion-Update-Object notion_update = { "Current (€)": { "number": investments[notion_page_id]['current_value'] }, "IRR (%)": { "number": irr_notion }, "Performance (€)": { "number": investments[notion_page_id]['total_performanance'] }, "Dividends (€)": { "number": investments[notion_page_id]['total_dividends'] } } # Update the properties of the corresponding notion-page notion_update_page(notion_page_id, notion_update) except: error_count = error_count + 1 # Wait for the API to cool off print(".", end="", flush=True) time.sleep(api_cooldowm_time) # Logging print(" ", end="", flush=True) if error_count == 0: logging(logging_level="success") elif error_count < number_of_uploads: logging(logging_level="warning") logging(logging_level="success", message=f"Updating notion investments failed for {error_count} out of {number_of_uploads} entries") else: logging(logging_level="error") logging(logging_level="success", message=f"Updating notion investments failed for all {error_count} entries") # TRMNL UPDATE DIAGRAMMS def push_trmnl_update_chart(trmnl_update_object, trmnl_url, trmnl_headers): # Send the data to TRMNL try: data = json.dumps(trmnl_update_object, indent=2) # Converts a python-dictionary into a json reply = requests.post(trmnl_url, data=data, headers=trmnl_headers) # Logging if reply.status_code == 200: logging(logging_level="success") elif reply.status_code == 429: logging_level="success" logging(logging_level="warning") logging(logging_level="warning", message="Exceeded TRMNL's API rate limits") logging(logging_level="warning", message="Waiting some time should work") elif reply.status_code == 422: logging(logging_level="warning") logging(logging_level="warning", message="Upload successful, but data cannot be displayed correctly") logging(logging_level="warning", message="The payload is probably to large in size") else: logging(logging_level="error") logging(logging_level="error", message=f"Failed pushing data to TRMNL with server reply code: {reply.status_code}") logging(logging_level="debug", message=f"Complete server reply message: {reply}") except Exception as e: logging(logging_level="error") logging(logging_level="error", message=f"Failed pushing data to TRMNL with error code: {e}") # ----------------------------- # # HISTORY CALCULATION FUNCTIONS # # ----------------------------- # # CALC HISTORY PER TRADE def calc_history_per_trade(trades, yf_data): # Create support variables history_per_trade = {} total_dividends = 0 date_open_oldest_trade = get_date_open_oldest_trade(trades) # Logging & statistics missing_day_entrys = 0 days_formated = 0 number_of_trades = 0 # As this history is so important, it is okay if this functions fails in total if errors araise try: # ------------------ LOOP OVER ALL TRADES for trade_id in trades: # Statistics number_of_trades = number_of_trades +1 # ------------------ PREPARE FOR THE (NEXT) LOOP OVER ALL DAYS # Set / Reset the index-date to the oldest trade day # Resetting is required so that the calculations for the next trade start with day 1 index_date = date_open_oldest_trade # Set the initial value for the course on the previous day to 0 # Just in case the very first trade was made on a weekend somehow, where there is no yfinance data available previous_course = 0.0 # Check, if the trade was closed already # If it was not, set the closure date to the future (Trick 17) if trades[trade_id]["date_close"] == 0: date_close = datetime.date.today() + datetime.timedelta(days=1) else: date_close = trades[trade_id]["date_close"] date_open = trades[trade_id]["date_open"] # Keep ticker for connecting performance later ticker = trades[trade_id]['ticker'] # ------------------ DETERMINE THE COUSE PER DAY while index_date != datetime.date.today() + datetime.timedelta(days=1): # Statistics days_formated = days_formated +1 # Fetch course for the day & eventual dividends from yf_data try: current_course = yf_data[ticker].at[index_date, 'Close'] current_dividends_per_ticker = yf_data[ticker].at[index_date, 'Dividends'] # Catch missing yf-data (eg. for weekends) by reusing course from previous day except: current_course = previous_course current_dividends_per_ticker = 0.0 # there are never dividends on non-trading days missing_day_entrys = missing_day_entrys +1 # Increase the warning count # Catch the special case of the day when the trade was closed # In this case, the current course needs to be overwritten with the sell-value if date_close == index_date: current_course = trades[trade_id]['course_close'] # Save the result for the next iteration # This setup also makes it possible, that a previous course is passed down across mutiple days # This makes sense is case i.e. for a weekend previous_course = current_course # ------------------ CALCULATE PERFORMANCE IF REQUIRED if index_date >= date_open and index_date <= date_close: # Calculate performance values current_amount = trades[trade_id]['units'] current_invested = current_amount * trades[trade_id]['course_open'] total_dividends = total_dividends + current_amount * current_dividends_per_ticker current_value = current_amount * current_course current_value_with_dividends = current_value + total_dividends current_irr = calculate_irr(index_date, date_open, current_value_with_dividends, current_invested) total_performanance = current_value_with_dividends - current_invested if current_value_with_dividends == 0: print("0-value Error with ticker: {}".format(ticker)) else: # Write 0, if trade is not relevant for current timeframe current_amount = 0 current_invested = 0.00 total_dividends = 0.00 current_value = 0.00 current_irr = 0.00 total_performanance = 0.0 # ------------------ STORE RESULTS index_date_iso = index_date.isoformat() # Store all values into a dict dict_a = {} dict_a['current_amount'] = current_amount dict_a['current_invested'] = current_invested dict_a['total_dividends'] = total_dividends dict_a['current_value'] = current_value dict_a['current_irr'] = current_irr dict_a['current_course'] = current_course dict_a['total_performanance'] = total_performanance # Check if the date is already present if index_date_iso in history_per_trade: dict_b = history_per_trade[index_date_iso] else: dict_b = {} # Add the values to the trade_id value-pair dict_b[trade_id] = dict_a # Update the hostory_per_trade history_per_trade.update({index_date_iso : dict_b}) # ------------------ NEXT ITERATION index_date = index_date + datetime.timedelta(days=1) # ------------------ LOGGING & DEBUGING # Debug writing history to disk if selected_logging_level == "debug": data = json.dumps(history_per_trade, indent=2) # Converts a python-dictionary into a json with open("history_per_trade.json", "w") as f: f.write(data) # Logging logging if missing_day_entrys == 0: logging(logging_level="success") logging(logging_level="info", message=f"created a history with {days_formated} across all {number_of_trades} tickers o_O") else: logging(logging_level="warning") logging(logging_level="warning", message=f"No yf-data available in {missing_day_entrys} cases accross all {number_of_trades} tickers") logging(logging_level="warning", message="Probably reason is non-trading-days eg. weekends") logging(logging_level="warning", message="Used values from previous trade-day instead") # Return date return history_per_trade except Exception as error_message: logging(logging_level="error") logging(logging_level="error", message=f"Failed with error message: {error_message}") return False # CALC THE HISTORY PER TRADE & OVERALL def calc_history_per_ticker(history_per_trade, tickers, trades): # ------------------ CREATE JSON OBJECT # Create the json-dict history_per_ticker = {} # Logging & statistics missing_day_entrys = 0 days_formated = 0 # As this history is so important, it is okay if this functions fails in total if errors araise try: # Loop over each date entry in the history for date_entry in history_per_trade: # Statistics days_formated = days_formated +1 # Create a dict to store the results per day and ticker dict_daily = {} for ticker in tickers: dict_daily[ticker] = {} dict_daily[ticker]["current_invested"] = 0 dict_daily[ticker]["total_dividends"] = 0 dict_daily[ticker]["current_value"] = 0 dict_daily[ticker]["current_irr"] = 0 dict_daily[ticker]["current_irr"] = 0 dict_daily[ticker]["total_performanance"] = 0 dict_daily[ticker]["current_amount"] = 0 # Added only for ticker entries, not for the "total" value dict_daily[ticker]["current_course"] = 0 # Added only for ticker entries, not for the "total" value dict_daily["total"] = {} dict_daily["total"]["current_invested"] = 0 dict_daily["total"]["total_dividends"] = 0 dict_daily["total"]["current_value"] = 0 dict_daily["total"]["current_irr"] = 0 dict_daily["total"]["total_performanance"] = 0 # Loop over each trade-entry for that day for trade_id in history_per_trade[date_entry]: # Extract data from the history_per_trade trade_amount = history_per_trade[date_entry][trade_id]['current_amount'] trade_invested = history_per_trade[date_entry][trade_id]['current_invested'] trade_dividends = history_per_trade[date_entry][trade_id]['total_dividends'] trade_value = history_per_trade[date_entry][trade_id]['current_value'] trade_irr = history_per_trade[date_entry][trade_id]['current_irr'] trade_course = history_per_trade[date_entry][trade_id]['current_course'] trade_performanance = history_per_trade[date_entry][trade_id]['total_performanance'] # Lookup the ticker by the trade-id ticker = trades[trade_id]["ticker"] # Extract data from the history_per_ticker ticker_amount = dict_daily[ticker]['current_amount'] ticker_invested = dict_daily[ticker]['current_invested'] ticker_dividends = dict_daily[ticker]['total_dividends'] ticker_value = dict_daily[ticker]['current_value'] ticker_irr = dict_daily[ticker]['current_irr'] ticker_performanance = dict_daily[ticker]['total_performanance'] # Overwrite the values in the history_per_ticker dict_daily[ticker]['current_amount'] = ticker_amount + trade_amount # Simple addition works dict_daily[ticker]['current_invested'] = ticker_invested + trade_invested dict_daily[ticker]['total_dividends'] = ticker_dividends + trade_dividends dict_daily[ticker]['current_value'] = ticker_value + trade_value dict_daily[ticker]['total_performanance'] = ticker_performanance + trade_performanance dict_daily[ticker]['current_course'] = trade_course # Simple overwrite is fine, as the course is the same for all trades if ticker_invested == 0 and trade_invested == 0: dict_daily[ticker]['current_irr'] = 0 # Catch 0 values else: dict_daily[ticker]['current_irr'] = (ticker_irr * ticker_invested + trade_irr * trade_invested) / (ticker_invested + trade_invested) # --> IRR is adjusted based on the trade values. This way a trade of 25% of the initial trade volume has only a 25% influence on the irr # Calculate the "total" entry after finishing with all the trades for ticker in tickers: # Same logic as above, but shortended code dict_daily["total"]['total_dividends'] = dict_daily["total"]['total_dividends'] + dict_daily[ticker]['total_dividends'] dict_daily["total"]['current_value'] = dict_daily["total"]['current_value'] + dict_daily[ticker]['current_value'] dict_daily["total"]['total_performanance'] = dict_daily["total"]['total_performanance'] + dict_daily[ticker]['total_performanance'] # Extracting the values before rewriting them, to preserve them for the IRR calculation total_invested = dict_daily["total"]['current_invested'] ticker_invested = dict_daily[ticker]['current_invested'] dict_daily["total"]['current_invested'] = total_invested + ticker_invested # Extracting the values before rewriting them, to preserve them for the IRR calculation if ticker_invested == 0 and total_invested == 0: dict_daily["total"]['current_irr'] = 0 else: total_irr = dict_daily["total"]['current_irr'] ticker_irr = dict_daily[ticker]['current_irr'] dict_daily["total"]['current_irr'] = (total_irr * total_invested + ticker_irr * ticker_invested) / (total_invested + ticker_invested) # Finally, write the results for this day-entry to the history_per_ticker history_per_ticker[date_entry] = dict_daily # ------------------ LOGGING & DEBUGING # Debugging if selected_logging_level == "debug": data = json.dumps(history_per_ticker, indent=2) # Converts a python-dictionary into a json with open("history_per_ticker.json", "w") as f: f.write(data) # Success Logging logging(logging_level="success") logging(logging_level="info", message=f"created a history with {days_formated} days formated o_O") return history_per_ticker # Error Logging except Exception as error_message: logging(logging_level="error") logging(logging_level="error", message=f"Failed with error message: {error_message}") return False # --------------------------- # # HISTORY SELECTION FUNCTIONS # # --------------------------- # # FILTER ANY HISTORY OBJECT TO SELECTED DATES (With Weighted Averaging & Outlier Removal) def filter_history_by_list(history, dates_list): filtered_history = {} try: # Sort inputs to ensure chronologically correct processing sorted_target_dates = sorted(dates_list) all_history_dates = sorted(history.keys()) last_history_idx = 0 for i, target_date in enumerate(sorted_target_dates): # Check if this is the last date (Today) is_last_date = (i == len(sorted_target_dates) - 1) if is_last_date: # --- LOGIC FOR TODAY (Last Entry) --- # Do NOT average. Grab the specific data for this date. # Try to find exact match data_to_use = history.get(target_date) # If not found (e.g. running on weekend/holiday), use the very last entry available if not data_to_use and len(all_history_dates) > 0: last_available_date = all_history_dates[-1] data_to_use = history.get(last_available_date) if data_to_use: # Deep copy using JSON to avoid reference issues filtered_history[target_date] = json.loads(json.dumps(data_to_use)) else: # --- LOGIC FOR PAST DATES (Weighted Interval Averaging) --- current_history_idx = -1 # Find the index of the target date (or the closest date BEFORE it) for j in range(last_history_idx, len(all_history_dates)): h_date = all_history_dates[j] if h_date <= target_date: current_history_idx = j else: break # Optimization: stop searching once we pass the date if current_history_idx != -1 and current_history_idx >= last_history_idx: # Get interval dates (Python slice is exclusive at the end, so +1) interval_dates = all_history_dates[last_history_idx : current_history_idx + 1] if len(interval_dates) > 0: # Initialize Aggregation Dictionary aggregation = {} # Helper to init ticker if missing def get_agg(t): if t not in aggregation: aggregation[t] = { 'wSumInvested': 0.0, 'wSumValue': 0.0, 'wSumPerf': 0.0, 'wSumDiv': 0.0, 'totalWeight': 0.0, 'wSumIrr': 0.0, 'irrTotalWeight': 0.0, 'bSumInvested': 0.0, 'bSumValue': 0.0, 'bSumPerf': 0.0, 'bSumDiv': 0.0, 'bTotalWeight': 0.0, 'bSumIrr': 0.0, 'bIrrTotalWeight': 0.0 } return aggregation[t] for date_key in interval_dates: day_data = history.get(date_key) if not day_data: continue for ticker, entry in day_data.items(): agg = get_agg(ticker) # Access properties safely current_invested = entry.get('current_invested', 0) current_value = entry.get('current_value', 0) current_irr = entry.get('current_irr', 0) # Note: Keeping original typo 'performanance' to match your Notion keys total_performance = entry.get('total_performanance', 0) total_dividends = entry.get('total_dividends', 0) # Weighting: If invested < 1, treat as 0 to avoid dust-multiplication w = current_invested if current_invested > 1 else 0 # Outlier Detection: Ignore IRRs > 500% (5.0) is_outlier = abs(current_irr) > 5.0 if w > 0: agg['wSumInvested'] += current_invested * w agg['wSumValue'] += current_value * w agg['wSumPerf'] += total_performance * w agg['wSumDiv'] += total_dividends * w agg['totalWeight'] += w # Only add IRR if sane if not is_outlier: agg['wSumIrr'] += current_irr * w agg['irrTotalWeight'] += w # Benchmark Logic benchmark_entry = entry.get('benchmark') if benchmark_entry: b_invested = benchmark_entry.get('current_invested', 0) b_value = benchmark_entry.get('current_value', 0) b_irr = benchmark_entry.get('current_irr', 0) b_perf = benchmark_entry.get('total_performanance', 0) b_div = benchmark_entry.get('total_dividends', 0) bw = b_invested if b_invested > 1 else 0 is_bench_outlier = abs(b_irr) > 5.0 if bw > 0: agg['bSumInvested'] += b_invested * bw agg['bSumValue'] += b_value * bw agg['bSumPerf'] += b_perf * bw agg['bSumDiv'] += b_div * bw agg['bTotalWeight'] += bw if not is_bench_outlier: agg['bSumIrr'] += b_irr * bw agg['bIrrTotalWeight'] += bw # Construct Result for this Date result_daily = {} for ticker, agg in aggregation.items(): def safe_div(n, d): return n / d if d != 0 else 0 entry_result = { 'current_invested': safe_div(agg['wSumInvested'], agg['totalWeight']), 'current_value': safe_div(agg['wSumValue'], agg['totalWeight']), 'current_irr': safe_div(agg['wSumIrr'], agg['irrTotalWeight']), 'total_performanance': safe_div(agg['wSumPerf'], agg['totalWeight']), 'total_dividends': safe_div(agg['wSumDiv'], agg['totalWeight']), 'current_amount': 0, 'current_course': 0 } if agg['bTotalWeight'] > 0: entry_result['benchmark'] = { 'current_invested': safe_div(agg['bSumInvested'], agg['bTotalWeight']), 'current_value': safe_div(agg['bSumValue'], agg['bTotalWeight']), 'current_irr': safe_div(agg['bSumIrr'], agg['bIrrTotalWeight']), 'total_performanance': safe_div(agg['bSumPerf'], agg['bTotalWeight']), 'total_dividends': safe_div(agg['bSumDiv'], agg['bTotalWeight']), } result_daily[ticker] = entry_result filtered_history[target_date] = result_daily # Update start index for next iteration last_history_idx = current_history_idx + 1 # Main Logging logging(logging_level="success") return filtered_history except Exception as error_message: logging(logging_level="error") logging(logging_level="error", message=f"Failed with error: {error_message}") return False # SELECT CURRENT VALUES PER TRADE def select_current_value_per_trade(trades, history_per_trade): # Logging format_errors = 0 # Loop over all trades for trade_id in trades: try: # Determine, what values to fetch based on whether the trade was closed already date_closed = trades[trade_id]["date_close"] if date_closed == 0: # If trade still open, use performance data from today index_date_iso = datetime.date.today().isoformat() else: # If trade closed, use performance data from close-date index_date_iso = date_closed.isoformat() # Fetch data from history and save for this trade trades[trade_id]["course_current"] = history_per_trade[index_date_iso][trade_id]['current_course'] trades[trade_id]["irr"] = history_per_trade[index_date_iso][trade_id]['current_irr'] trades[trade_id]["dividends"] = history_per_trade[index_date_iso][trade_id]['total_dividends'] except: format_errors = format_errors + 1 # Logging logging if format_errors == 0: logging(logging_level="success") else: logging(logging_level="warning") logging(logging_level="warning", message=f"Failed updating the current value per trade in {format_errors} cases") return trades # SELECT CURRENT VALUES PER TICKER def select_current_value_per_ticker(investments, history_per_ticker): # Logging format_errors = 0 # Loop over all investments for investment_id in investments: try: # Generate the iso-date of today as the required index index_date_iso = datetime.date.today().isoformat() # Get the ticker corresponding to the investment ticker = investments[investment_id]["ticker"] # Select latest data from history and save for this investment investments[investment_id]["total_dividends"] = history_per_ticker[index_date_iso][ticker]['total_dividends'] investments[investment_id]["current_value"] = history_per_ticker[index_date_iso][ticker]['current_value'] investments[investment_id]["current_irr"] = history_per_ticker[index_date_iso][ticker]['current_irr'] investments[investment_id]["total_performanance"] = history_per_ticker[index_date_iso][ticker]['total_performanance'] except: format_errors = format_errors + 1 # Logging if format_errors == 0: logging(logging_level="success") else: logging(logging_level="warning") logging(logging_level="warning", message=f"Failed updating the current value per ticker in {format_errors} cases") return investments # TRMNL CREATE IRR-UPDATE def prep_trmnl_chart_udpate(history_to_show, series_to_show_1 = "total", data_to_show_1 = "current_value", series_to_show_2 = "bechnmark", data_to_show_2 = "current_value"): # default value = current invested # Setup dict_big_numbers = {} charts_data = [] chart_1 = {} chart_2 = {} try: # Fetch the latest date entry from the history index_date_iso = fetch_last_key_from_dict(history_to_show) # Select latest data from history for the big-numbers current_value = history_to_show[index_date_iso]["total"]["current_value"] total_performanance = history_to_show[index_date_iso]["total"]["total_performanance"] current_irr = history_to_show[index_date_iso]["total"]["current_irr"] current_irr = (current_irr -1) *100 # Round the nubers dict_big_numbers["current_value"] = str(round(current_value, 0)) dict_big_numbers["total_performanance"] = str(round(total_performanance, 0)) dict_big_numbers["current_irr"] = str(round(current_irr, 2)) # Catching false inputs for the series to show possible_series_to_show = list(history_to_show[index_date_iso].keys()) # Get a list of all the series values, that could be shown if series_to_show_1 not in possible_series_to_show: # checks, if the selected series is not part of the history-object sent to the function logging(logging_level="warning") logging(logging_level="warning", message="Selecting 'total' as the series to show, as the input was not valid") series_to_show_1 = "total" if series_to_show_2 not in possible_series_to_show: # checks, if the selected series is not part of the history-object sent to the function logging(logging_level="warning") logging(logging_level="warning", message="Selecting 'total' as the series to show, as the input was not valid") series_to_show_2 = "total" # Catching false inputs for the data to show possible_data_to_show = list(history_to_show[index_date_iso][series_to_show_1].keys()) if data_to_show_1 not in possible_data_to_show: logging(logging_level="warning") logging(logging_level="warning", message="Selecting 'current invested' as chart data, as the input was not valid") data_to_show_1 = "current_value" possible_data_to_show = list(history_to_show[index_date_iso][series_to_show_2].keys()) if data_to_show_2 not in possible_data_to_show: logging(logging_level="warning") logging(logging_level="warning", message="Selecting 'current invested' as chart data, as the input was not valid") data_to_show_2 = "current_value" # Create space for storing values chart_1["data"] = [] chart_2["data"] = [] # Format the chart data into the right data for date in history_to_show: # Extract the value to be stored value_to_show_1 = history_to_show[date][series_to_show_1][data_to_show_1] value_to_show_2 = history_to_show[date][series_to_show_2][data_to_show_2] # Catch the case irr and convert to percent if data_to_show_1 == "current_irr": value_to_show_1 = (value_to_show_1 -1) *100 if data_to_show_2 == "current_irr": value_to_show_2 = (value_to_show_2 -1) *100 # Round to 2 decimal values value_to_show_1 = round(value_to_show_1, 2) value_to_show_2 = round(value_to_show_2, 2) # Extend the date by a timestamp json_date = datetime.date.fromisoformat(date) # Convert ISO-String to python date-object json_date = datetime.datetime.combine(json_date, datetime.datetime.min.time()) # Combine the date with midnight (00:00:00) to create a datetime object json_date = json_date.isoformat() # Convert back to ISO-String, now including a time # Store the values together with the corresponding date value_1 = [json_date, value_to_show_1] value_2 = [json_date, value_to_show_2] # Add the value pair to the list of values for this chart chart_1["data"].append(value_1) chart_2["data"].append(value_2) # Add the two series to the list of series in the TRML object charts_data.append(chart_1) charts_data.append(chart_2) # Generating nicer series titels if series_to_show_1 == "total": series_to_show_1 = "Portfolio" if series_to_show_2 == "total": series_to_show_2 = "Portfolio" if series_to_show_1 == "benchmark": series_to_show_1 = "Benchmark: " + ticker_benchmark if series_to_show_2 == "benchmark": series_to_show_2 = "Benchmark: " + ticker_benchmark # Generating nicer data titels data_to_show_1 = data_to_show_1.replace("_", " ").capitalize() data_to_show_2 = data_to_show_2.replace("_", " ").capitalize() # Increase look of IRR even more # Funktioniert auch dann, wenn "irr" nicht vorkommt data_to_show_1 = data_to_show_1.replace("irr", "IRR") data_to_show_2 = data_to_show_2.replace("irr", "IRR") # Generate the chat names / desciptions chart_1["name"] = data_to_show_1 + " " + series_to_show_1 chart_2["name"] = data_to_show_2 + " " + series_to_show_2 # Construct the trmnl_object trmnl_update_object = {} trmnl_update_object["merge_variables"] = {} trmnl_update_object["merge_variables"]["big_numbers"] = dict_big_numbers trmnl_update_object["merge_variables"]["charts"] = charts_data # Debugging if selected_logging_level == "debug": data = json.dumps(trmnl_update_object, indent=2) # Converts a python-dictionary into a json with open("trmnl_update_object.json", "w") as f: f.write(data) # Main Logging logging(logging_level="success") return trmnl_update_object except Exception as error_message: logging(logging_level="error") logging(logging_level="error", message=f"Failed with error: {error_message}") return False ### ---------------------------------------------------- --------- ---------------------------------------------------- ### ### ---------------------------------------------------- MAIN CODE ---------------------------------------------------- ### ### ---------------------------------------------------- --------- ---------------------------------------------------- ### # ------------------------------------------- # # PART 1: Updating the notion trades database # # ------------------------------------------- # # Fetches the list of all trades stored in notion print("Fetching Data from Notion...", end=" ", flush=True) trades = fetch_format_notion_trades(notion_db_id_trades) # Generates a list with unique tickers and no duplicates to reduce workload for the yfinance api print("Creating a list of unique tickers...", end=" ", flush=True) tickers = filter_list_of_tickers(trades) # Configuration dependent execution: if calculate_benchmark == True: # Add the benchmark-ticker to the list of tickers to download data from yfinance from print("Adding benchmark-ticker...", end="", flush=True) tickers = add_benchmark_ticker(tickers, ticker_benchmark) # Fetches & formats the complete history per ticker from yfinance print("Fetching & formating yfinance data", end="", flush=True) yf_data = fetch_format_yf_data(tickers) # Calculates & stores a history per trade print("Calculating the history per trade...", end=" ", flush=True) history_per_trade = calc_history_per_trade(trades, yf_data) # Configuration dependent execution: if update_notion == True: # Selects the most current values from the history per trade and overwrites them in the "trades" feteched from notion print("Selecting the most current values...", end=" ", flush=True) trades = select_current_value_per_trade(trades, history_per_trade) # Updates the values in the notion database print("Updating the notion trades database", end="", flush=True) push_notion_trades_update(trades) # ------------------------------------------------ # # PART 2: Updating the notion investments database # # ------------------------------------------------ # # Fetches the list of entries in the investment-overview database stored in notion print("Fetching & formating notion investments...", end=" ", flush=True) investments = fetch_format_notion_investments(notion_db_id_investments) # Calculates & stores a history per ticker AND a total across all tickers indexed by the ticker name print("Calculating history per ticker...", end=" ", flush=True) history_per_ticker = calc_history_per_ticker(history_per_trade, tickers, trades) # Configuration dependent execution: if update_notion == True: # Selects the most current values from the history per ticker and overwrites them in the "investments" feteched from notion print("Calculating current value per ticker...", end=" ", flush=True) investments = select_current_value_per_ticker(investments, history_per_ticker) # Updates the values in the notion database print("Updating the notion ticker database", end="", flush=True) push_notion_investment_update(investments) # ----------------------------------------- # # PART 3: Calculating Benchmark performance # # ----------------------------------------- # # Configuration dependent execution: if calculate_benchmark == True: # Creating benchmark trades print("Creating 'benchmark trades'...", end="", flush=True) benchmark_trades = create_benchmark_trades(trades, yf_data) # Calculating benchmark trades print("Calculating the history per benchmark-trade...", end=" ", flush=True) history_per_benchmark_trade = calc_history_per_trade(benchmark_trades, yf_data) # Calculates & stores a history for the benchmark print("Calculating benchmark-history overall...", end=" ", flush=True) history_benchmark = calc_history_per_ticker(history_per_benchmark_trade, tickers, benchmark_trades) # Merging the benchmark_history into the ticker_history print("Merging the benchmark-history into the ticker-history...", end=" ", flush=True) history_per_ticker = merge_histories(history_per_ticker, history_benchmark) # --------------------------------- # # PART 4: Updating the TRMNL Screen # # --------------------------------- # # Configuration dependent execution: if update_TRMNL == True: # Creates a list containing one date per week print("Creating a list with one entry per week...", end=" ", flush=True) list_filtered_dates = create_list_filtered_dates(trades, trmnl_granularity) # Filter a weekly snapshot from the history per ticker print("Filtering the history per ticker to weekly values...", end=" ", flush=True) history_per_ticker_filtered = filter_history_by_list(history_per_ticker, list_filtered_dates) # Prepare a new TRMNL update print("Constructing a TERMNL update object...", end=" ", flush=True) trmnl_update_object = prep_trmnl_chart_udpate( history_per_ticker_filtered, series_to_show_1="total", data_to_show_1="current_value", series_to_show_2="benchmark", data_to_show_2="current_value" ) # Push the update to TRMNL print("Updating a TERMNL screen...", end=" ", flush=True) push_trmnl_update_chart(trmnl_update_object, trmnl_url_chart_1, trmnl_headers) # Prepare a new TRMNL update print("Constructing a TERMNL update object...", end=" ", flush=True) trmnl_update_object = prep_trmnl_chart_udpate( history_per_ticker_filtered, series_to_show_1="total", data_to_show_1="current_irr", series_to_show_2="benchmark", data_to_show_2="current_irr" ) # Push the update to TRMNL print("Updating a TERMNL screen...", end=" ", flush=True) push_trmnl_update_chart(trmnl_update_object, trmnl_url_chart_2, trmnl_headers) # Prepare a new TRMNL update print("Constructing a TERMNL update object...", end=" ", flush=True) trmnl_update_object = prep_trmnl_chart_udpate( history_per_ticker_filtered, series_to_show_1="total", data_to_show_1="total_performanance", series_to_show_2="benchmark", data_to_show_2="total_performanance" ) # Push the update to TRMNL print("Updating a TERMNL screen...", end=" ", flush=True) push_trmnl_update_chart(trmnl_update_object, trmnl_url_chart_3, trmnl_headers) # ----------------- # # PART 5: Finsch up # # ----------------- # # Logging print("--------------------------- SUCCESS! ---------------------------") print("Completed cycle at: {}".format(datetime.datetime.now())) print("--------------------------- SUCCESS! ---------------------------")