| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499 |
- ### --------------------------------------------- ---------------------- --------------------------------------------- ###
- ### --------------------------------------------- PROGRAMM CONFIGUARTION --------------------------------------------- ###
- ### --------------------------------------------- ---------------------- --------------------------------------------- ###
- # Program Functionality Switch
- update_notion = True
- update_TRMNL = True
- calculate_benchmark = True
- log_coloring = False
- # Program Functionality Configuration
- programm_cooldown_time = 15 # Programm cooldown timer in minutes
- api_cooldowm_time = 0.1 # API cooldown timer in minutes
- trmnl_granularity = 80 # Days in between two data points in the TRMNL chart
- ticker_benchmark = "VGWL.DE" # Ticker to benchmark the trades against
- # Programm Execution Configuration
- selected_logging_level = "warning" # must be one from the list below
- logging_levels = ("none", "error", "success", "warning", "info", "debug") # ordered by amount of logs
- class log_colors:
- error = '\033[91m'
- warning = '\033[93m'
- success = '\033[92m'
- info = '\033[90m'
- debug = '\033[4m'
- endcode = '\033[0m'
- ### --------- API CONFIGURATION
- # NOTION
- notion_token = "secret_b7PiPL2FqC9QEikqkAEWOht7LmzPMIJMWTzUPWwbw4H"
- notion_headers = {
- "Authorization": "Bearer " + notion_token,
- "Content-Type": "application/json",
- "Notion-Version": "2022-02-22"
- }
- notion_db_id_trades = "95f7a2b697a249d4892d60d855d31bda"
- notion_db_id_investments = "2ba10a5f51bd8160ab9ee982bbef8cc3"
- notion_db_id_performance = "1c010a5f51bd806f90d8e76a1286cfd4"
- # TRMNL
- trmnl_headers = {"Content-Type": "application/json"}
- trmnl_url_chart_1 = "https://usetrmnl.com/api/custom_plugins/334ea2ed-1f20-459a-bea5-dca2c8cf7714"
- trmnl_url_chart_2 = "https://usetrmnl.com/api/custom_plugins/72950759-38df-49eb-99fb-b3e2e67c385e"
- trmnl_url_chart_3 = "https://usetrmnl.com/api/custom_plugins/a975543a-51dc-4793-b7fa-d6a101dc4025"
- ### -------------------- LIBARIES
- import datetime
- import time
- import json
- import yfinance as yf
- import pandas as pd
- import requests
- ### ---------------------------------------------------- --------- ---------------------------------------------------- ###
- ### ---------------------------------------------------- FUNCTIONS ---------------------------------------------------- ###
- ### ---------------------------------------------------- --------- ---------------------------------------------------- ###
- # ---------------- #
- # HELPER FUNCTIONS #
- # ---------------- #
- # LOGGING / PRINTING TO TERMINAL
- def logging(message = "", logging_level = "", new_line = True):
-
- # Take the selected logging level in the config file
- # Look this up in the list of all available logging levels in the config file
- # Return the index number
- config_logging_level = logging_levels.index(selected_logging_level)
- try:
- # Take the logging level of the text to print
- # Look this up in the list of all available logging levels in the config file
- # Return the index number
- message_logging_level = logging_levels.index(logging_level)
- except:
- # Fallback to the least important logging level
- # Solved by checking the lenght of the available logging levels
- message_logging_level = len(logging_levels)
- # Check for false new_line entries
- if new_line is not bool:
- new_line = True
- # Check if the warning should be printed
- if message_logging_level <= config_logging_level:
-
- # Check, if colored logs are switched on
- if log_coloring == True:
-
- # Geting the log color
- log_color = getattr(log_colors, logging_level)
- # Construct the logging-text incl. color
- log_text = str(log_color + "[" + logging_level + "] " + log_colors.endcode + message)
- else:
- # Construct the logging-text incl. color
- log_text = str("[" + logging_level + "] " + message)
- # Check if the warning should end with a new-line
- # Printing the text
- if new_line == True:
- print(log_text)
- else:
- print(log_text, end=" ", flush=True)
- # CALCULATE THE IRR
- def calculate_irr(date_now, date_open, value_now, value_open):
- error = False
- irr = 0.0
- try:
- # Count the number in days
- a = date_now - date_open
- a = a.days
- # Am Tag des Kaufs selbst, liegt das Delta in Tagen bei 0
- # Um dennoch einen IRR kalkulieren zu können, wird das Delta auf 1 gsetzt
- if a == 0:
- a = 1
- a = a / 365 # Umrechnung auf Jahresanteil, um auch den Jahreszinssatz zu bekommen
- b = value_now / value_open
-
- # Catch negative IRRs
- if b < 0:
- b = b * (-1)
- irr = b**(1/a) # matematisch identisch zur b-ten Wurzel von a
- irr = irr * (-1)
- else:
- irr = b**(1/a) # matematisch identisch zur b-ten Wurzel von a
- except:
- error = True
-
- # Return data if successful
- if error == True:
- print("[ERROR] Calculation of irr")
- return error
- else:
- return irr
- # GET THE DAY OF THE OLDEST TRADE
- def get_date_open_oldest_trade(trades):
- # Identify the open date for the oldest trade
- date_open_oldest_trade = datetime.date.today()
- for i in trades:
- if trades[i]["date_open"] < date_open_oldest_trade:
- date_open_oldest_trade = trades[i]["date_open"]
- return date_open_oldest_trade
- # CREATES LIST OF UNIQUE TICKERS
- def filter_list_of_tickers(trades):
- tickers = []
- try:
- for i in trades:
- # Fetch ticker belonging to trade
- ticker = trades[i]['ticker']
- # Add ticker to list, if not already present
- if ticker not in tickers:
- tickers.append(ticker)
-
- # Main Logging
- logging(logging_level="success")
- logging(logging_level="info", message=f"{len(tickers)} tickers found")
- return tickers
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # CREATE LIST OF WEEKLY DATES
- def create_list_filtered_dates(trades, days_seperation):
- stop_date = get_date_open_oldest_trade(trades)
- index_date = datetime.date.today()
- try:
- # Create reversed list (1st entry is today going back in time)
- list_filtered_dates = []
- while index_date >= stop_date:
- list_filtered_dates.append(index_date.isoformat())
- index_date = index_date - datetime.timedelta(days=days_seperation)
- # Reverse the list, so that the frist entry is the oldest one
- list_filtered_dates.reverse()
-
- # Main Logging
- logging(logging_level="success")
- logging(logging_level="info", message=f"{len(list_filtered_dates)} dates in weekly list")
- return list_filtered_dates
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # FETCH THE LAST INDEX FROM A DICT
- def fetch_last_key_from_dict(dict):
- key_list = list(dict.keys()) # Extract the keys and convert them to a list
- last_key = key_list[-1] # select the last entry from the list as it is the most current entry
- return last_key
- # ADD BENCHMARK-TICKER TO TICKER-DICT
- def add_benchmark_ticker(tickers, ticker_benchmarkt):
- tickers.append(ticker_benchmarkt)
- logging(logging_level="success")
- return tickers
- # CREATE BENCHMARK TRADES
- def create_benchmark_trades(trades, yf_data):
-
- # Prepertion
- benchmark_trades = {}
- i = 0
- # Creating benchmark trades
- try:
- for trade_id in trades:
- # Benchmark-id
- i = i+1
- benchmark_id = "benchmark" + str(i)
- # Copy raw trades
- benchmark_trades[benchmark_id] = trades[trade_id]
- benchmark_trades[benchmark_id]["ticker"] = ticker_benchmark
- # Calculate amount invested
- amount_invested = benchmark_trades[benchmark_id]["units"] * benchmark_trades[benchmark_id]["course_open"]
- # Change course-open for benchmark-ticker performance calculation
- success = False
- index_date = benchmark_trades[benchmark_id]["date_open"]
- while success == False:
- try:
- course_open_new = yf_data[ticker_benchmark].at[index_date, 'Close']
- success = True
- except:
- index_date = index_date + datetime.timedelta(days=1)
- benchmark_trades[benchmark_id]["course_open"] = course_open_new # type: ignore
- # Change amount for benchmark-ticker performance calculation
- benchmark_trades[benchmark_id]["units"] = amount_invested / course_open_new # type: ignore
- # Change course-open for benchmark-ticker performance calculation, if relevant
- if trades[trade_id]["date_close"] != 0:
- success = False
- index_date = benchmark_trades[benchmark_id]["date_close"]
- while success == False:
- try:
- course_close_new = yf_data[ticker_benchmark].at[index_date, 'Close']
- success = True
- except:
- index_date = index_date + datetime.timedelta(days=1)
- benchmark_trades[benchmark_id]["course_close"] = course_close_new # type: ignore
- # Logging
- logging(logging_level="success")
- return benchmark_trades
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # MERGE BENCHMARK HISTORY & TICKER-HISTROY
- def merge_histories(history_per_ticker, benchmark_history):
-
- # Preperation
- benchmark_ticker = ticker_benchmark
- error_count = 0
-
- # Merging Data
- for index_date in history_per_ticker:
- try:
- history_per_ticker[index_date]["benchmark"] = {}
- history_per_ticker[index_date]["benchmark"]["current_invested"] = benchmark_history[index_date][benchmark_ticker]["current_invested"]
- history_per_ticker[index_date]["benchmark"]["total_dividends"] = benchmark_history[index_date][benchmark_ticker]["total_dividends"]
- history_per_ticker[index_date]["benchmark"]["current_value"] = benchmark_history[index_date][benchmark_ticker]["current_value"]
- history_per_ticker[index_date]["benchmark"]["current_irr"] = benchmark_history[index_date][benchmark_ticker]["current_irr"]
- history_per_ticker[index_date]["benchmark"]["total_performanance"] = benchmark_history[index_date][benchmark_ticker]["total_performanance"]
- except:
- error_count = error_count +1
-
- # Debugging
- if selected_logging_level == "debug":
- data = json.dumps(history_per_ticker, indent=2) # Converts a python-dictionary into a json
- with open("history_per_ticker_with_benchmark.json", "w") as f:
- f.write(data)
- # Main Logging
- if error_count == 0:
- logging(logging_level="success")
- return history_per_ticker
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"Error merging benchmark-history into ticker-history in {error_count} cases")
- return False
- # -------------------------- #
- # NETWORK DOWNLOAD FUNCTIONS #
- # -------------------------- #
- # NOTION FETCH PAGES
- def notion_get_pages(db_id_trades, num_pages=None):
- try:
- # ------------------ FETCH THE FIRST 100 PAGES FROM A DB
- # Prepare Request
- url = f"https://api.notion.com/v1/databases/{db_id_trades}/query"
- get_all = num_pages is None # If num_pages is None, get all pages, otherwise just the defined number.
- page_size = 100 if get_all else num_pages
- payload = {"page_size": page_size}
-
- # Make Request
- raw_response = requests.post(url, json=payload, headers=notion_headers)
- # Process Reply
- parsed_response = raw_response.json()
- result = parsed_response["results"]
-
- # ------------------ FETCH 100 MORE PAGES AS OFTEN AS REQUIRED
- while parsed_response["has_more"] and get_all:
- # Prepare Request
- payload = {"page_size": page_size, "start_cursor": parsed_response["next_cursor"]}
- url = f"https://api.notion.com/v1/databases/{db_id_trades}/query"
- # Make Request
- raw_response = requests.post(url, json=payload, headers=notion_headers)
-
- # Process Reply
- parsed_response = raw_response.json()
- result.extend(parsed_response["results"])
-
- # Logging
- return result
- except Exception:
- return True # Return True when there was an error
- # NOTION FETCH & FORMAT TRADES
- def fetch_format_notion_trades(db_id_trades):
- trades = {}
- fetch_error = False
- format_errors = 0
- number_of_trades = 0
- error_message = ""
- # Download data from notion
- data = notion_get_pages(db_id_trades)
- # Check, if cuccessfull
- if data is True:
- fetch_error = True
- else:
- # Format the recieved data
- for i in data:
- # Count for stratistics
- number_of_trades = number_of_trades + 1
-
- # Each page is loaded as a dictionary
- notion_page = dict(i)
- # Handling desired missing entries
- try:
- date_close = notion_page["properties"]["Close"]["date"]
- date_close = date_close["start"]
- date_close = datetime.date(*map(int, date_close.split('-')))
- except:
- date_close = 0
- # Handeling non-desired missing entries (by skipping this trade)
- try:
- # Try extracting values
- trade = {}
- # Format date-open
- date_open = notion_page["properties"]["Open"]["date"]
- date_open = date_open["start"]
- date_open = datetime.date(*map(int, date_open.split('-')))
- # Combine data into json structure
- trade = {
- 'ticker' : notion_page["properties"]["Ticker"]["select"]["name"],
- 'date_open' : date_open,
- 'date_close' : date_close,
- 'course_open' : notion_page["properties"]["Open (€)"]["number"],
- 'course_close' : notion_page["properties"]["Close (€)"]["number"],
- 'course_current' : notion_page["properties"]["Current (€)"]["number"],
- 'irr' : notion_page["properties"]["IRR (%)"]["number"],
- 'units' : notion_page["properties"]["Units"]["number"],
- 'dividends' : notion_page["properties"]["Dividends (€)"]["number"]
- }
- # Save values
- notion_page_id = notion_page["id"] # Use as key for the dictionary
- trades[notion_page_id] = trade
- except Exception as e:
- format_errors = format_errors + 1
- error_message = e
- # Logging
- if fetch_error == True:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- else:
- # Writing Example File
- if selected_logging_level == "debug":
- with open("trades.json", "w") as f:
- f.write(str(trades))
- # Logging
- if format_errors == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"{number_of_trades} trades recieved and formated")
- return trades
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"{format_errors} trades out of {number_of_trades} skiped...maybe due to missing values?")
- return trades
- # NOTION FETCH & FORMAT INVESTMENT OVERVIEW
- def fetch_format_notion_investments(db_id_investments):
- investments = {}
- fetch_error = False
- format_errors = 0
- number_of_investments = 0
- # Download data & check for success
- data = notion_get_pages(db_id_investments)
- if data is True:
- error = True
- else:
- # Format recieved data
- for i in data:
-
- # Count up for statistics
- number_of_investments = number_of_investments + 1
- try:
- # Each page is loaded as a dictionary
- notion_page = dict(i)
- # Extract values
- notion_page_id = notion_page["id"] # Use as key for the dictionary
- investments[notion_page_id] = {}
- investments[notion_page_id]["ticker"] = notion_page["properties"]["Ticker"]["select"]["name"]
- investments[notion_page_id]["total_dividends"] = notion_page["properties"]["Dividends (€)"]["number"]
- investments[notion_page_id]["current_value"] = notion_page["properties"]["Current (€)"]["number"]
- investments[notion_page_id]["current_irr"] = notion_page["properties"]["IRR (%)"]["number"]
- investments[notion_page_id]["total_performanance"] = notion_page["properties"]["Performance (€)"]["number"]
-
- # Skip this entry, if errors show up
- except:
- format_errors = format_errors + 1
- # Main Logging
- if fetch_error == False & format_errors == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"{number_of_investments} investments recieved and formated")
- return investments
- elif fetch_error == False & format_errors > 0:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"{format_errors} investments out of {number_of_investments} skiped...maybe due to missing values?")
- return investments
- else:
- logging(logging_level="error")
- return False
- # YFINANCE FETCH & FORMAT DATA
- def fetch_format_yf_data(tickers):
-
- yf_data = {}
- fetch_errors = 0
- format_errors = 0
- number_of_tickers = 0
- # Download data for each ticker seperately
- for i in tickers:
-
- number_of_tickers = number_of_tickers +1
- skip_formating = False # Helper varianbel (see flow logik)
- ticker = i
-
- # Catch errors during the download
- try:
- # Download data
- api = yf.Ticker(ticker)
- data = api.history(period="max", auto_adjust=False)
- except:
- # Store error for later logging
- fetch_errors = fetch_errors + 1
- data = True
- # If the download was successfull:
- if skip_formating == False:
- # Try formating the data
- try:
- # Convert to Pandas DataFrame
- data = pd.DataFrame(data) # type: ignore
- # Delete the columns "Stock Splits", "High", "Low" and "Open"
- del data['Open']
- del data['Low']
- del data['High']
- del data['Volume']
-
- # Delete these 2 columns, if they exist
- if 'Stock Splits' in data.columns:
- del data['Stock Splits']
- if 'Capital Gains' in data.columns:
- del data['Capital Gains']
- # Get the Number of rows in data
- data_rows = data.shape[0]
- # Create new index without the time from the existing datetime64-index
- old_index = data.index
- new_index = []
- x = 0
- while x < data_rows:
- date = pd.Timestamp.date(old_index[x]) # Converts the "Pandas Timestamp"-object to a "date" object
- new_index.append(date)
- x+=1
- # Add the new index to the dataframe and set it as the index
- data.insert(1, 'Date', new_index)
- data.set_index('Date', inplace=True)
-
- # Save the data-frame to the yf_data dict
- yf_data[ticker] = data
-
- # Handle formating errors
- except:
- format_errors = format_errors +1
- # in case of an error the entry never get's added to the yf_data object
- # Wait for the API to cool down
- print(".", end="", flush=True)
- time.sleep(api_cooldowm_time)
- # Main Logging
- print(" ", end="", flush=True)
- if fetch_errors == 0 & format_errors == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"{number_of_tickers} tickers recieved and formated")
- return yf_data
- elif fetch_errors == 0 & format_errors > 0:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"{format_errors} tickers out of {number_of_tickers} skiped")
- return yf_data
- else:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {number_of_tickers}")
- print("\n")
- return False
- # ------------------------ #
- # NETWORK UPLOAD FUNCTIONS #
- # ------------------------ #
- # NOTION UPDATE PAGES
- def notion_update_page(page_id: str, data: dict):
- url = f"https://api.notion.com/v1/pages/{page_id}"
- payload = {"properties": data}
- results = requests.patch(url, json=payload, headers=notion_headers)
- return results
- # UPDATE NOTION-TRADES-DATABASE
- def push_notion_trades_update(trades):
-
- # Logging
- error_count = 0
- number_of_uploads = 0
- for notion_page_id in trades:
-
- number_of_uploads = number_of_uploads+1
- try:
- # The irr is stored in the format 1.2534
- # Notion need the format 0,2534
- irr_notion = trades[notion_page_id]['irr'] - 1
- irr_notion = round(irr_notion, 4)
- # Construct Notion-Update-Object
- notion_update = {
- "Current (€)": {
- "number": trades[notion_page_id]['course_current']
- },
- "IRR (%)": {
- "number": irr_notion
- },
- "Dividends (€)": {
- "number": trades[notion_page_id]['dividends']
- }
- }
- # Update the properties of the corresponding notion-page
- notion_update_page(notion_page_id, notion_update)
- except:
- error_count = error_count + 1
- # Wait for the API to cool off
- print(".", end="", flush=True)
- time.sleep(api_cooldowm_time)
- # Logging
- print(" ", end="", flush=True)
- if error_count == 0:
- logging(logging_level="success")
- elif error_count < number_of_uploads:
- logging(logging_level="warning")
- logging(logging_level="success", message=f"Updating notion trades failed for {error_count} out of {number_of_uploads} entries")
- else:
- logging(logging_level="error")
- logging(logging_level="success", message=f"Updating notion trades failed for all {error_count} entries")
- # UPDATE NOTION-INVESTMENT-OVERVIEW
- def push_notion_investment_update(investments):
-
- # Logging
- error_count = 0
- number_of_uploads = 0
-
- for notion_page_id in investments:
-
- number_of_uploads = number_of_uploads+1
- # Try uploading an update
- try:
- # The irr is stored in the format 1.2534
- # Notion need the format 0,2534
- irr_notion = investments[notion_page_id]['current_irr'] - 1
- irr_notion = round(irr_notion, 4)
-
- # Construct Notion-Update-Object
- notion_update = {
- "Current (€)": {
- "number": investments[notion_page_id]['current_value']
- },
- "IRR (%)": {
- "number": irr_notion
- },
- "Performance (€)": {
- "number": investments[notion_page_id]['total_performanance']
- },
- "Dividends (€)": {
- "number": investments[notion_page_id]['total_dividends']
- }
- }
- # Update the properties of the corresponding notion-page
- notion_update_page(notion_page_id, notion_update)
- except:
- error_count = error_count + 1
-
- # Wait for the API to cool off
- print(".", end="", flush=True)
- time.sleep(api_cooldowm_time)
-
- # Logging
- print(" ", end="", flush=True)
- if error_count == 0:
- logging(logging_level="success")
- elif error_count < number_of_uploads:
- logging(logging_level="warning")
- logging(logging_level="success", message=f"Updating notion investments failed for {error_count} out of {number_of_uploads} entries")
- else:
- logging(logging_level="error")
- logging(logging_level="success", message=f"Updating notion investments failed for all {error_count} entries")
- # TRMNL UPDATE DIAGRAMMS
- def push_trmnl_update_chart(trmnl_update_object, trmnl_url, trmnl_headers):
- # Send the data to TRMNL
- try:
- data = json.dumps(trmnl_update_object, indent=2) # Converts a python-dictionary into a json
- reply = requests.post(trmnl_url, data=data, headers=trmnl_headers)
-
- # Logging
- if reply.status_code == 200:
- logging(logging_level="success")
- elif reply.status_code == 429:
- logging_level="success"
- logging(logging_level="warning")
- logging(logging_level="warning", message="Exceeded TRMNL's API rate limits")
- logging(logging_level="warning", message="Waiting some time should work")
- elif reply.status_code == 422:
- logging(logging_level="warning")
- logging(logging_level="warning", message="Upload successful, but data cannot be displayed correctly")
- logging(logging_level="warning", message="The payload is probably to large in size")
- else:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed pushing data to TRMNL with server reply code: {reply.status_code}")
- logging(logging_level="debug", message=f"Complete server reply message: {reply}")
- except Exception as e:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed pushing data to TRMNL with error code: {e}")
- # ----------------------------- #
- # HISTORY CALCULATION FUNCTIONS #
- # ----------------------------- #
- # CALC HISTORY PER TRADE
- def calc_history_per_trade(trades, yf_data):
-
- # Create support variables
- history_per_trade = {}
- total_dividends = 0
- date_open_oldest_trade = get_date_open_oldest_trade(trades)
- # Logging & statistics
- missing_day_entrys = 0
- days_formated = 0
- number_of_trades = 0
-
- # As this history is so important, it is okay if this functions fails in total if errors araise
- try:
- # ------------------ LOOP OVER ALL TRADES
- for trade_id in trades:
- # Statistics
- number_of_trades = number_of_trades +1
-
- # ------------------ PREPARE FOR THE (NEXT) LOOP OVER ALL DAYS
- # Set / Reset the index-date to the oldest trade day
- # Resetting is required so that the calculations for the next trade start with day 1
- index_date = date_open_oldest_trade
- # Set the initial value for the course on the previous day to 0
- # Just in case the very first trade was made on a weekend somehow, where there is no yfinance data available
- previous_course = 0.0
- # Check, if the trade was closed already
- # If it was not, set the closure date to the future (Trick 17)
- if trades[trade_id]["date_close"] == 0:
- date_close = datetime.date.today() + datetime.timedelta(days=1)
- else:
- date_close = trades[trade_id]["date_close"]
- date_open = trades[trade_id]["date_open"]
- # Keep ticker for connecting performance later
- ticker = trades[trade_id]['ticker']
- # ------------------ DETERMINE THE COUSE PER DAY
- while index_date != datetime.date.today() + datetime.timedelta(days=1):
- # Statistics
- days_formated = days_formated +1
- # Fetch course for the day & eventual dividends from yf_data
- try:
- current_course = yf_data[ticker].at[index_date, 'Close']
- current_dividends_per_ticker = yf_data[ticker].at[index_date, 'Dividends']
-
- # Catch missing yf-data (eg. for weekends) by reusing course from previous day
- except:
- current_course = previous_course
- current_dividends_per_ticker = 0.0 # there are never dividends on non-trading days
- missing_day_entrys = missing_day_entrys +1 # Increase the warning count
- # Catch the special case of the day when the trade was closed
- # In this case, the current course needs to be overwritten with the sell-value
- if date_close == index_date:
- current_course = trades[trade_id]['course_close']
- # Save the result for the next iteration
- # This setup also makes it possible, that a previous course is passed down across mutiple days
- # This makes sense is case i.e. for a weekend
- previous_course = current_course
- # ------------------ CALCULATE PERFORMANCE IF REQUIRED
- if index_date >= date_open and index_date <= date_close:
- # Calculate performance values
- current_amount = trades[trade_id]['units']
- current_invested = current_amount * trades[trade_id]['course_open']
- total_dividends = total_dividends + current_amount * current_dividends_per_ticker
- current_value = current_amount * current_course
- current_value_with_dividends = current_value + total_dividends
- current_irr = calculate_irr(index_date, date_open, current_value_with_dividends, current_invested)
- total_performanance = current_value_with_dividends - current_invested
- if current_value_with_dividends == 0:
- print("0-value Error with ticker: {}".format(ticker))
-
- else:
- # Write 0, if trade is not relevant for current timeframe
- current_amount = 0
- current_invested = 0.00
- total_dividends = 0.00
- current_value = 0.00
- current_irr = 0.00
- total_performanance = 0.0
- # ------------------ STORE RESULTS
- index_date_iso = index_date.isoformat()
- # Store all values into a dict
- dict_a = {}
- dict_a['current_amount'] = current_amount
- dict_a['current_invested'] = current_invested
- dict_a['total_dividends'] = total_dividends
- dict_a['current_value'] = current_value
- dict_a['current_irr'] = current_irr
- dict_a['current_course'] = current_course
- dict_a['total_performanance'] = total_performanance
- # Check if the date is already present
- if index_date_iso in history_per_trade:
- dict_b = history_per_trade[index_date_iso]
- else:
- dict_b = {}
- # Add the values to the trade_id value-pair
- dict_b[trade_id] = dict_a
- # Update the hostory_per_trade
- history_per_trade.update({index_date_iso : dict_b})
- # ------------------ NEXT ITERATION
- index_date = index_date + datetime.timedelta(days=1)
- # ------------------ LOGGING & DEBUGING
- # Debug writing history to disk
- if selected_logging_level == "debug":
- data = json.dumps(history_per_trade, indent=2) # Converts a python-dictionary into a json
- with open("history_per_trade.json", "w") as f:
- f.write(data)
- # Logging logging
- if missing_day_entrys == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"created a history with {days_formated} across all {number_of_trades} tickers o_O")
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"No yf-data available in {missing_day_entrys} cases accross all {number_of_trades} tickers")
- logging(logging_level="warning", message="Probably reason is non-trading-days eg. weekends")
- logging(logging_level="warning", message="Used values from previous trade-day instead")
- # Return date
- return history_per_trade
-
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error message: {error_message}")
- return False
- # CALC THE HISTORY PER TRADE & OVERALL
- def calc_history_per_ticker(history_per_trade, tickers, trades):
-
- # ------------------ CREATE JSON OBJECT
- # Create the json-dict
- history_per_ticker = {}
- # Logging & statistics
- missing_day_entrys = 0
- days_formated = 0
- # As this history is so important, it is okay if this functions fails in total if errors araise
- try:
- # Loop over each date entry in the history
- for date_entry in history_per_trade:
-
- # Statistics
- days_formated = days_formated +1
- # Create a dict to store the results per day and ticker
- dict_daily = {}
- for ticker in tickers:
- dict_daily[ticker] = {}
- dict_daily[ticker]["current_invested"] = 0
- dict_daily[ticker]["total_dividends"] = 0
- dict_daily[ticker]["current_value"] = 0
- dict_daily[ticker]["current_irr"] = 0
- dict_daily[ticker]["current_irr"] = 0
- dict_daily[ticker]["total_performanance"] = 0
- dict_daily[ticker]["current_amount"] = 0 # Added only for ticker entries, not for the "total" value
- dict_daily[ticker]["current_course"] = 0 # Added only for ticker entries, not for the "total" value
- dict_daily["total"] = {}
- dict_daily["total"]["current_invested"] = 0
- dict_daily["total"]["total_dividends"] = 0
- dict_daily["total"]["current_value"] = 0
- dict_daily["total"]["current_irr"] = 0
- dict_daily["total"]["total_performanance"] = 0
- # Loop over each trade-entry for that day
- for trade_id in history_per_trade[date_entry]:
- # Extract data from the history_per_trade
- trade_amount = history_per_trade[date_entry][trade_id]['current_amount']
- trade_invested = history_per_trade[date_entry][trade_id]['current_invested']
- trade_dividends = history_per_trade[date_entry][trade_id]['total_dividends']
- trade_value = history_per_trade[date_entry][trade_id]['current_value']
- trade_irr = history_per_trade[date_entry][trade_id]['current_irr']
- trade_course = history_per_trade[date_entry][trade_id]['current_course']
- trade_performanance = history_per_trade[date_entry][trade_id]['total_performanance']
- # Lookup the ticker by the trade-id
- ticker = trades[trade_id]["ticker"]
- # Extract data from the history_per_ticker
- ticker_amount = dict_daily[ticker]['current_amount']
- ticker_invested = dict_daily[ticker]['current_invested']
- ticker_dividends = dict_daily[ticker]['total_dividends']
- ticker_value = dict_daily[ticker]['current_value']
- ticker_irr = dict_daily[ticker]['current_irr']
- ticker_performanance = dict_daily[ticker]['total_performanance']
- # Overwrite the values in the history_per_ticker
- dict_daily[ticker]['current_amount'] = ticker_amount + trade_amount # Simple addition works
- dict_daily[ticker]['current_invested'] = ticker_invested + trade_invested
- dict_daily[ticker]['total_dividends'] = ticker_dividends + trade_dividends
- dict_daily[ticker]['current_value'] = ticker_value + trade_value
- dict_daily[ticker]['total_performanance'] = ticker_performanance + trade_performanance
- dict_daily[ticker]['current_course'] = trade_course # Simple overwrite is fine, as the course is the same for all trades
- if ticker_invested == 0 and trade_invested == 0:
- dict_daily[ticker]['current_irr'] = 0
- # Catch 0 values
- else:
- dict_daily[ticker]['current_irr'] = (ticker_irr * ticker_invested + trade_irr * trade_invested) / (ticker_invested + trade_invested)
- # --> IRR is adjusted based on the trade values. This way a trade of 25% of the initial trade volume has only a 25% influence on the irr
- # Calculate the "total" entry after finishing with all the trades
- for ticker in tickers:
- # Same logic as above, but shortended code
- dict_daily["total"]['total_dividends'] = dict_daily["total"]['total_dividends'] + dict_daily[ticker]['total_dividends']
- dict_daily["total"]['current_value'] = dict_daily["total"]['current_value'] + dict_daily[ticker]['current_value']
- dict_daily["total"]['total_performanance'] = dict_daily["total"]['total_performanance'] + dict_daily[ticker]['total_performanance']
- # Extracting the values before rewriting them, to preserve them for the IRR calculation
- total_invested = dict_daily["total"]['current_invested']
- ticker_invested = dict_daily[ticker]['current_invested']
- dict_daily["total"]['current_invested'] = total_invested + ticker_invested
-
- # Extracting the values before rewriting them, to preserve them for the IRR calculation
- if ticker_invested == 0 and total_invested == 0:
- dict_daily["total"]['current_irr'] = 0
- else:
- total_irr = dict_daily["total"]['current_irr']
- ticker_irr = dict_daily[ticker]['current_irr']
- dict_daily["total"]['current_irr'] = (total_irr * total_invested + ticker_irr * ticker_invested) / (total_invested + ticker_invested)
-
- # Finally, write the results for this day-entry to the history_per_ticker
- history_per_ticker[date_entry] = dict_daily
- # ------------------ LOGGING & DEBUGING
- # Debugging
- if selected_logging_level == "debug":
- data = json.dumps(history_per_ticker, indent=2) # Converts a python-dictionary into a json
- with open("history_per_ticker.json", "w") as f:
- f.write(data)
- # Success Logging
- logging(logging_level="success")
- logging(logging_level="info", message=f"created a history with {days_formated} days formated o_O")
- return history_per_ticker
-
- # Error Logging
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error message: {error_message}")
- return False
- # --------------------------- #
- # HISTORY SELECTION FUNCTIONS #
- # --------------------------- #
- # FILTER ANY HISTORY OBJECT TO SELECTED DATES (With Weighted Averaging & Outlier Removal)
- def filter_history_by_list(history, dates_list):
- filtered_history = {}
-
- try:
- # Sort inputs to ensure chronologically correct processing
- sorted_target_dates = sorted(dates_list)
- all_history_dates = sorted(history.keys())
-
- last_history_idx = 0
- for i, target_date in enumerate(sorted_target_dates):
-
- # Check if this is the last date (Today)
- is_last_date = (i == len(sorted_target_dates) - 1)
- if is_last_date:
- # --- LOGIC FOR TODAY (Last Entry) ---
- # Do NOT average. Grab the specific data for this date.
-
- # Try to find exact match
- data_to_use = history.get(target_date)
- # If not found (e.g. running on weekend/holiday), use the very last entry available
- if not data_to_use and len(all_history_dates) > 0:
- last_available_date = all_history_dates[-1]
- data_to_use = history.get(last_available_date)
- if data_to_use:
- # Deep copy using JSON to avoid reference issues
- filtered_history[target_date] = json.loads(json.dumps(data_to_use))
-
- else:
- # --- LOGIC FOR PAST DATES (Weighted Interval Averaging) ---
- current_history_idx = -1
-
- # Find the index of the target date (or the closest date BEFORE it)
- for j in range(last_history_idx, len(all_history_dates)):
- h_date = all_history_dates[j]
- if h_date <= target_date:
- current_history_idx = j
- else:
- break # Optimization: stop searching once we pass the date
- if current_history_idx != -1 and current_history_idx >= last_history_idx:
-
- # Get interval dates (Python slice is exclusive at the end, so +1)
- interval_dates = all_history_dates[last_history_idx : current_history_idx + 1]
-
- if len(interval_dates) > 0:
-
- # Initialize Aggregation Dictionary
- aggregation = {}
- # Helper to init ticker if missing
- def get_agg(t):
- if t not in aggregation:
- aggregation[t] = {
- 'wSumInvested': 0.0, 'wSumValue': 0.0, 'wSumPerf': 0.0, 'wSumDiv': 0.0,
- 'totalWeight': 0.0,
- 'wSumIrr': 0.0, 'irrTotalWeight': 0.0,
- 'bSumInvested': 0.0, 'bSumValue': 0.0, 'bSumPerf': 0.0, 'bSumDiv': 0.0,
- 'bTotalWeight': 0.0,
- 'bSumIrr': 0.0, 'bIrrTotalWeight': 0.0
- }
- return aggregation[t]
- for date_key in interval_dates:
- day_data = history.get(date_key)
- if not day_data: continue
- for ticker, entry in day_data.items():
- agg = get_agg(ticker)
-
- # Access properties safely
- current_invested = entry.get('current_invested', 0)
- current_value = entry.get('current_value', 0)
- current_irr = entry.get('current_irr', 0)
- # Note: Keeping original typo 'performanance' to match your Notion keys
- total_performance = entry.get('total_performanance', 0)
- total_dividends = entry.get('total_dividends', 0)
- # Weighting: If invested < 1, treat as 0 to avoid dust-multiplication
- w = current_invested if current_invested > 1 else 0
-
- # Outlier Detection: Ignore IRRs > 500% (5.0)
- is_outlier = abs(current_irr) > 5.0
- if w > 0:
- agg['wSumInvested'] += current_invested * w
- agg['wSumValue'] += current_value * w
- agg['wSumPerf'] += total_performance * w
- agg['wSumDiv'] += total_dividends * w
- agg['totalWeight'] += w
- # Only add IRR if sane
- if not is_outlier:
- agg['wSumIrr'] += current_irr * w
- agg['irrTotalWeight'] += w
- # Benchmark Logic
- benchmark_entry = entry.get('benchmark')
- if benchmark_entry:
- b_invested = benchmark_entry.get('current_invested', 0)
- b_value = benchmark_entry.get('current_value', 0)
- b_irr = benchmark_entry.get('current_irr', 0)
- b_perf = benchmark_entry.get('total_performanance', 0)
- b_div = benchmark_entry.get('total_dividends', 0)
-
- bw = b_invested if b_invested > 1 else 0
- is_bench_outlier = abs(b_irr) > 5.0
- if bw > 0:
- agg['bSumInvested'] += b_invested * bw
- agg['bSumValue'] += b_value * bw
- agg['bSumPerf'] += b_perf * bw
- agg['bSumDiv'] += b_div * bw
- agg['bTotalWeight'] += bw
- if not is_bench_outlier:
- agg['bSumIrr'] += b_irr * bw
- agg['bIrrTotalWeight'] += bw
- # Construct Result for this Date
- result_daily = {}
-
- for ticker, agg in aggregation.items():
-
- def safe_div(n, d):
- return n / d if d != 0 else 0
- entry_result = {
- 'current_invested': safe_div(agg['wSumInvested'], agg['totalWeight']),
- 'current_value': safe_div(agg['wSumValue'], agg['totalWeight']),
- 'current_irr': safe_div(agg['wSumIrr'], agg['irrTotalWeight']),
- 'total_performanance': safe_div(agg['wSumPerf'], agg['totalWeight']),
- 'total_dividends': safe_div(agg['wSumDiv'], agg['totalWeight']),
- 'current_amount': 0,
- 'current_course': 0
- }
- if agg['bTotalWeight'] > 0:
- entry_result['benchmark'] = {
- 'current_invested': safe_div(agg['bSumInvested'], agg['bTotalWeight']),
- 'current_value': safe_div(agg['bSumValue'], agg['bTotalWeight']),
- 'current_irr': safe_div(agg['bSumIrr'], agg['bIrrTotalWeight']),
- 'total_performanance': safe_div(agg['bSumPerf'], agg['bTotalWeight']),
- 'total_dividends': safe_div(agg['bSumDiv'], agg['bTotalWeight']),
- }
- result_daily[ticker] = entry_result
- filtered_history[target_date] = result_daily
-
- # Update start index for next iteration
- last_history_idx = current_history_idx + 1
- # Main Logging
- logging(logging_level="success")
- return filtered_history
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # SELECT CURRENT VALUES PER TRADE
- def select_current_value_per_trade(trades, history_per_trade):
-
- # Logging
- format_errors = 0
- # Loop over all trades
- for trade_id in trades:
- try:
- # Determine, what values to fetch based on whether the trade was closed already
- date_closed = trades[trade_id]["date_close"]
- if date_closed == 0:
-
- # If trade still open, use performance data from today
- index_date_iso = datetime.date.today().isoformat()
-
- else:
- # If trade closed, use performance data from close-date
- index_date_iso = date_closed.isoformat()
-
- # Fetch data from history and save for this trade
- trades[trade_id]["course_current"] = history_per_trade[index_date_iso][trade_id]['current_course']
- trades[trade_id]["irr"] = history_per_trade[index_date_iso][trade_id]['current_irr']
- trades[trade_id]["dividends"] = history_per_trade[index_date_iso][trade_id]['total_dividends']
- except:
- format_errors = format_errors + 1
- # Logging logging
- if format_errors == 0:
- logging(logging_level="success")
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"Failed updating the current value per trade in {format_errors} cases")
- return trades
- # SELECT CURRENT VALUES PER TICKER
- def select_current_value_per_ticker(investments, history_per_ticker):
- # Logging
- format_errors = 0
- # Loop over all investments
- for investment_id in investments:
- try:
- # Generate the iso-date of today as the required index
- index_date_iso = datetime.date.today().isoformat()
- # Get the ticker corresponding to the investment
- ticker = investments[investment_id]["ticker"]
- # Select latest data from history and save for this investment
- investments[investment_id]["total_dividends"] = history_per_ticker[index_date_iso][ticker]['total_dividends']
- investments[investment_id]["current_value"] = history_per_ticker[index_date_iso][ticker]['current_value']
- investments[investment_id]["current_irr"] = history_per_ticker[index_date_iso][ticker]['current_irr']
- investments[investment_id]["total_performanance"] = history_per_ticker[index_date_iso][ticker]['total_performanance']
- except:
- format_errors = format_errors + 1
- # Logging
- if format_errors == 0:
- logging(logging_level="success")
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"Failed updating the current value per ticker in {format_errors} cases")
- return investments
- # TRMNL CREATE IRR-UPDATE
- def prep_trmnl_chart_udpate(history_to_show, series_to_show_1 = "total", data_to_show_1 = "current_value", series_to_show_2 = "bechnmark", data_to_show_2 = "current_value"): # default value = current invested
-
- # Setup
- dict_big_numbers = {}
- charts_data = []
- chart_1 = {}
- chart_2 = {}
- try:
- # Fetch the latest date entry from the history
- index_date_iso = fetch_last_key_from_dict(history_to_show)
-
- # Select latest data from history for the big-numbers
- current_value = history_to_show[index_date_iso]["total"]["current_value"]
- total_performanance = history_to_show[index_date_iso]["total"]["total_performanance"]
- current_irr = history_to_show[index_date_iso]["total"]["current_irr"]
- current_irr = (current_irr -1) *100
-
- # Round the nubers
- dict_big_numbers["current_value"] = str(round(current_value, 0))
- dict_big_numbers["total_performanance"] = str(round(total_performanance, 0))
- dict_big_numbers["current_irr"] = str(round(current_irr, 2))
- # Catching false inputs for the series to show
- possible_series_to_show = list(history_to_show[index_date_iso].keys()) # Get a list of all the series values, that could be shown
- if series_to_show_1 not in possible_series_to_show: # checks, if the selected series is not part of the history-object sent to the function
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'total' as the series to show, as the input was not valid")
- series_to_show_1 = "total"
- if series_to_show_2 not in possible_series_to_show: # checks, if the selected series is not part of the history-object sent to the function
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'total' as the series to show, as the input was not valid")
- series_to_show_2 = "total"
- # Catching false inputs for the data to show
- possible_data_to_show = list(history_to_show[index_date_iso][series_to_show_1].keys())
- if data_to_show_1 not in possible_data_to_show:
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'current invested' as chart data, as the input was not valid")
- data_to_show_1 = "current_value"
- possible_data_to_show = list(history_to_show[index_date_iso][series_to_show_2].keys())
- if data_to_show_2 not in possible_data_to_show:
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'current invested' as chart data, as the input was not valid")
- data_to_show_2 = "current_value"
- # Create space for storing values
- chart_1["data"] = []
- chart_2["data"] = []
- # Format the chart data into the right data
- for date in history_to_show:
-
- # Extract the value to be stored
- value_to_show_1 = history_to_show[date][series_to_show_1][data_to_show_1]
- value_to_show_2 = history_to_show[date][series_to_show_2][data_to_show_2]
- # Catch the case irr and convert to percent
- if data_to_show_1 == "current_irr":
- value_to_show_1 = (value_to_show_1 -1) *100
-
- if data_to_show_2 == "current_irr":
- value_to_show_2 = (value_to_show_2 -1) *100
- # Round to 2 decimal values
- value_to_show_1 = round(value_to_show_1, 2)
- value_to_show_2 = round(value_to_show_2, 2)
- # Extend the date by a timestamp
- json_date = datetime.date.fromisoformat(date) # Convert ISO-String to python date-object
- json_date = datetime.datetime.combine(json_date, datetime.datetime.min.time()) # Combine the date with midnight (00:00:00) to create a datetime object
- json_date = json_date.isoformat() # Convert back to ISO-String, now including a time
- # Store the values together with the corresponding date
- value_1 = [json_date, value_to_show_1]
- value_2 = [json_date, value_to_show_2]
-
- # Add the value pair to the list of values for this chart
- chart_1["data"].append(value_1)
- chart_2["data"].append(value_2)
- # Add the two series to the list of series in the TRML object
- charts_data.append(chart_1)
- charts_data.append(chart_2)
- # Generating nicer series titels
- if series_to_show_1 == "total":
- series_to_show_1 = "Portfolio"
- if series_to_show_2 == "total":
- series_to_show_2 = "Portfolio"
- if series_to_show_1 == "benchmark":
- series_to_show_1 = "Benchmark: " + ticker_benchmark
- if series_to_show_2 == "benchmark":
- series_to_show_2 = "Benchmark: " + ticker_benchmark
- # Generating nicer data titels
- data_to_show_1 = data_to_show_1.replace("_", " ").capitalize()
- data_to_show_2 = data_to_show_2.replace("_", " ").capitalize()
- # Increase look of IRR even more
- # Funktioniert auch dann, wenn "irr" nicht vorkommt
- data_to_show_1 = data_to_show_1.replace("irr", "IRR")
- data_to_show_2 = data_to_show_2.replace("irr", "IRR")
- # Generate the chat names / desciptions
- chart_1["name"] = data_to_show_1 + " " + series_to_show_1
- chart_2["name"] = data_to_show_2 + " " + series_to_show_2
- # Construct the trmnl_object
- trmnl_update_object = {}
- trmnl_update_object["merge_variables"] = {}
- trmnl_update_object["merge_variables"]["big_numbers"] = dict_big_numbers
- trmnl_update_object["merge_variables"]["charts"] = charts_data
- # Debugging
- if selected_logging_level == "debug":
- data = json.dumps(trmnl_update_object, indent=2) # Converts a python-dictionary into a json
- with open("trmnl_update_object.json", "w") as f:
- f.write(data)
- # Main Logging
- logging(logging_level="success")
- return trmnl_update_object
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
-
- ### ---------------------------------------------------- --------- ---------------------------------------------------- ###
- ### ---------------------------------------------------- MAIN CODE ---------------------------------------------------- ###
- ### ---------------------------------------------------- --------- ---------------------------------------------------- ###
- # ------------------------------------------- #
- # PART 1: Updating the notion trades database #
- # ------------------------------------------- #
- # Fetches the list of all trades stored in notion
- print("Fetching Data from Notion...", end=" ", flush=True)
- trades = fetch_format_notion_trades(notion_db_id_trades)
- # Generates a list with unique tickers and no duplicates to reduce workload for the yfinance api
- print("Creating a list of unique tickers...", end=" ", flush=True)
- tickers = filter_list_of_tickers(trades)
- # Configuration dependent execution:
- if calculate_benchmark == True:
-
- # Add the benchmark-ticker to the list of tickers to download data from yfinance from
- print("Adding benchmark-ticker...", end="", flush=True)
- tickers = add_benchmark_ticker(tickers, ticker_benchmark)
- # Fetches & formats the complete history per ticker from yfinance
- print("Fetching & formating yfinance data", end="", flush=True)
- yf_data = fetch_format_yf_data(tickers)
- # Calculates & stores a history per trade
- print("Calculating the history per trade...", end=" ", flush=True)
- history_per_trade = calc_history_per_trade(trades, yf_data)
- # Configuration dependent execution:
- if update_notion == True:
- # Selects the most current values from the history per trade and overwrites them in the "trades" feteched from notion
- print("Selecting the most current values...", end=" ", flush=True)
- trades = select_current_value_per_trade(trades, history_per_trade)
- # Updates the values in the notion database
- print("Updating the notion trades database", end="", flush=True)
- push_notion_trades_update(trades)
- # ------------------------------------------------ #
- # PART 2: Updating the notion investments database #
- # ------------------------------------------------ #
- # Fetches the list of entries in the investment-overview database stored in notion
- print("Fetching & formating notion investments...", end=" ", flush=True)
- investments = fetch_format_notion_investments(notion_db_id_investments)
- # Calculates & stores a history per ticker AND a total across all tickers indexed by the ticker name
- print("Calculating history per ticker...", end=" ", flush=True)
- history_per_ticker = calc_history_per_ticker(history_per_trade, tickers, trades)
- # Configuration dependent execution:
- if update_notion == True:
-
- # Selects the most current values from the history per ticker and overwrites them in the "investments" feteched from notion
- print("Calculating current value per ticker...", end=" ", flush=True)
- investments = select_current_value_per_ticker(investments, history_per_ticker)
- # Updates the values in the notion database
- print("Updating the notion ticker database", end="", flush=True)
- push_notion_investment_update(investments)
- # ----------------------------------------- #
- # PART 3: Calculating Benchmark performance #
- # ----------------------------------------- #
- # Configuration dependent execution:
- if calculate_benchmark == True:
- # Creating benchmark trades
- print("Creating 'benchmark trades'...", end="", flush=True)
- benchmark_trades = create_benchmark_trades(trades, yf_data)
- # Calculating benchmark trades
- print("Calculating the history per benchmark-trade...", end=" ", flush=True)
- history_per_benchmark_trade = calc_history_per_trade(benchmark_trades, yf_data)
- # Calculates & stores a history for the benchmark
- print("Calculating benchmark-history overall...", end=" ", flush=True)
- history_benchmark = calc_history_per_ticker(history_per_benchmark_trade, tickers, benchmark_trades)
- # Merging the benchmark_history into the ticker_history
- print("Merging the benchmark-history into the ticker-history...", end=" ", flush=True)
- history_per_ticker = merge_histories(history_per_ticker, history_benchmark)
- # --------------------------------- #
- # PART 4: Updating the TRMNL Screen #
- # --------------------------------- #
- # Configuration dependent execution:
- if update_TRMNL == True:
- # Creates a list containing one date per week
- print("Creating a list with one entry per week...", end=" ", flush=True)
- list_filtered_dates = create_list_filtered_dates(trades, trmnl_granularity)
- # Filter a weekly snapshot from the history per ticker
- print("Filtering the history per ticker to weekly values...", end=" ", flush=True)
- history_per_ticker_filtered = filter_history_by_list(history_per_ticker, list_filtered_dates)
- # Prepare a new TRMNL update
- print("Constructing a TERMNL update object...", end=" ", flush=True)
- trmnl_update_object = prep_trmnl_chart_udpate(
- history_per_ticker_filtered,
- series_to_show_1="total",
- data_to_show_1="current_value",
- series_to_show_2="benchmark",
- data_to_show_2="current_value"
- )
- # Push the update to TRMNL
- print("Updating a TERMNL screen...", end=" ", flush=True)
- push_trmnl_update_chart(trmnl_update_object, trmnl_url_chart_1, trmnl_headers)
- # Prepare a new TRMNL update
- print("Constructing a TERMNL update object...", end=" ", flush=True)
- trmnl_update_object = prep_trmnl_chart_udpate(
- history_per_ticker_filtered,
- series_to_show_1="total",
- data_to_show_1="current_irr",
- series_to_show_2="benchmark",
- data_to_show_2="current_irr"
- )
- # Push the update to TRMNL
- print("Updating a TERMNL screen...", end=" ", flush=True)
- push_trmnl_update_chart(trmnl_update_object, trmnl_url_chart_2, trmnl_headers)
- # Prepare a new TRMNL update
- print("Constructing a TERMNL update object...", end=" ", flush=True)
- trmnl_update_object = prep_trmnl_chart_udpate(
- history_per_ticker_filtered,
- series_to_show_1="total",
- data_to_show_1="total_performanance",
- series_to_show_2="benchmark",
- data_to_show_2="total_performanance"
- )
- # Push the update to TRMNL
- print("Updating a TERMNL screen...", end=" ", flush=True)
- push_trmnl_update_chart(trmnl_update_object, trmnl_url_chart_3, trmnl_headers)
- # ----------------- #
- # PART 5: Finsch up #
- # ----------------- #
- # Logging
- print("--------------------------- SUCCESS! ---------------------------")
- print("Completed cycle at: {}".format(datetime.datetime.now()))
- print("--------------------------- SUCCESS! ---------------------------")
|