| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031 |
- ### -------------------- LIBARIES --------------------
- import datetime
- import time
- import json
- import yfinance as yf
- import pandas as pd
- import requests
- import config
- ### -------------------- FUNCTIONS --------------------
- # ---------------- #
- # HELPER FUNCTIONS #
- # ---------------- #
- # LOGGING / PRINTING TO TERMINAL
- def logging(message = "", logging_level = "", new_line = True):
-
- # Take the selected logging level in the config file
- # Look this up in the list of all available logging levels in the config file
- # Return the index number
- config_logging_level = config.logging_levels.index(config.selected_logging_level)
- try:
- # Take the logging level of the text to print
- # Look this up in the list of all available logging levels in the config file
- # Return the index number
- message_logging_level = config.logging_levels.index(logging_level)
- except:
- # Fallback to the least important logging level
- # Solved by checking the lenght of the available logging levels
- message_logging_level = len(config.logging_levels)
- # Check for false new_line entries
- if new_line is not bool:
- new_line = True
- # Check if the warning should be printed
- if message_logging_level <= config_logging_level:
-
- # Geting the log color
- log_color = getattr(config.log_colors, logging_level)
- # Construct the logging-text incl. color
- log_text = str(log_color + "[" + logging_level + "] " + config.log_colors.endcode + message)
- # Check if the warning should end with a new-line
- # Printing the text
- if new_line == True:
- print(log_text)
- else:
- print(log_text, end=" ", flush=True)
- # CALCULATE THE IRR
- def calculate_irr(date_now, date_open, value_now, value_open):
- error = False
- irr = 0.0
- try:
- # Count the number in days
- a = date_now - date_open
- a = a.days
- # Am Tag des Kaufs selbst, liegt das Delta in Tagen bei 0
- # Um dennoch einen IRR kalkulieren zu können, wird das Delta auf 1 gsetzt
- if a == 0:
- a = 1
- a = a / 365 # Umrechnung auf Jahresanteil, um auch den Jahreszinssatz zu bekommen
- b = value_now / value_open
-
- # Catch negative IRRs
- if b < 0:
- b = b * (-1)
- irr = b**(1/a) # matematisch identisch zur b-ten Wurzel von a
- irr = irr * (-1)
- else:
- irr = b**(1/a) # matematisch identisch zur b-ten Wurzel von a
- except:
- error = True
-
- # Return data if successful
- if error == True:
- print("[ERROR] Calculation of irr")
- return error
- else:
- return irr
- # GET THE DAY OF THE OLDEST TRADE
- def get_date_open_oldest_trade(trades):
- # Identify the open date for the oldest trade
- date_open_oldest_trade = datetime.date.today()
- for i in trades:
- if trades[i]["date_open"] < date_open_oldest_trade:
- date_open_oldest_trade = trades[i]["date_open"]
- return date_open_oldest_trade
- # CREATES LIST OF UNIQUE TICKERS
- def filter_list_of_tickers(trades):
- tickers = []
- try:
- for i in trades:
- # Fetch ticker belonging to trade
- ticker = trades[i]['ticker']
- # Add ticker to list, if not already present
- if ticker not in tickers:
- tickers.append(ticker)
-
- # Main Logging
- logging(logging_level="success")
- logging(logging_level="info", message=f"{len(tickers)} tickers found")
- return tickers
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # CREATE LIST OF WEEKLY DATES
- def create_list_filtered_dates(trades, days_seperation):
- stop_date = get_date_open_oldest_trade(trades)
- index_date = datetime.date.today()
- try:
- # Create reversed list (1st entry is today going back in time)
- list_filtered_dates = []
- while index_date >= stop_date:
- list_filtered_dates.append(index_date.isoformat())
- index_date = index_date - datetime.timedelta(days=days_seperation)
- # Reverse the list, so that the frist entry is the oldest one
- list_filtered_dates.reverse()
-
- # Main Logging
- logging(logging_level="success")
- logging(logging_level="info", message=f"{len(list_filtered_dates)} dates in weekly list")
- return list_filtered_dates
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # FETCH THE LAST INDEX FROM A DICT
- def fetch_last_key_from_dict(dict):
- key_list = list(dict.keys()) # Extract the keys and convert them to a list
- last_key = key_list[-1] # select the last entry from the list as it is the most current entry
- return last_key
- # -------------------------- #
- # NETWORK DOWNLOAD FUNCTIONS #
- # -------------------------- #
- # NOTION FETCH PAGES
- def notion_get_pages(db_id_trades, num_pages=None):
- try:
- # ------------------ FETCH THE FIRST 100 PAGES FROM A DB
- # Prepare Request
- url = f"https://api.notion.com/v1/databases/{db_id_trades}/query"
- get_all = num_pages is None # If num_pages is None, get all pages, otherwise just the defined number.
- page_size = 100 if get_all else num_pages
- payload = {"page_size": page_size}
-
- # Make Request
- raw_response = requests.post(url, json=payload, headers=config.notion_headers)
- # Process Reply
- parsed_response = raw_response.json()
- result = parsed_response["results"]
-
- # ------------------ FETCH 100 MORE PAGES AS OFTEN AS REQUIRED
- while parsed_response["has_more"] and get_all:
- # Prepare Request
- payload = {"page_size": page_size, "start_cursor": parsed_response["next_cursor"]}
- url = f"https://api.notion.com/v1/databases/{db_id_trades}/query"
- # Make Request
- raw_response = requests.post(url, json=payload, headers=config.notion_headers)
-
- # Process Reply
- parsed_response = raw_response.json()
- result.extend(parsed_response["results"])
-
- # Logging
- return result
- except Exception:
- return True # Return True when there was an error
- # NOTION FETCH & FORMAT TRADES
- def fetch_format_notion_trades(db_id_trades):
- trades = {}
- fetch_error = False
- format_errors = 0
- number_of_trades = 0
- error_message = ""
- # Download data from notion
- data = notion_get_pages(db_id_trades)
- # Check, if cuccessfull
- if data is True:
- fetch_error = True
- else:
- # Format the recieved data
- for i in data:
- # Count for stratistics
- number_of_trades = number_of_trades + 1
-
- # Each page is loaded as a dictionary
- notion_page = dict(i)
- # Handling desired missing entries
- try:
- date_close = notion_page["properties"]["Close"]["date"]
- date_close = date_close["start"]
- date_close = datetime.date(*map(int, date_close.split('-')))
- except:
- date_close = 0
- # Handeling non-desired missing entries (by skipping this trade)
- try:
- # Try extracting values
- trade = {}
- # Format date-open
- date_open = notion_page["properties"]["Open"]["date"]
- date_open = date_open["start"]
- date_open = datetime.date(*map(int, date_open.split('-')))
- # Combine data into json structure
- trade = {
- 'ticker' : notion_page["properties"]["Ticker"]["select"]["name"],
- 'date_open' : date_open,
- 'date_close' : date_close,
- 'course_open' : notion_page["properties"]["Open (€)"]["number"],
- 'course_close' : notion_page["properties"]["Close (€)"]["number"],
- 'course_current' : notion_page["properties"]["Current (€)"]["number"],
- 'irr' : notion_page["properties"]["IRR (%)"]["number"],
- 'units' : notion_page["properties"]["Units"]["number"],
- 'dividends' : notion_page["properties"]["Dividends (€)"]["number"]
- }
- # Save values
- notion_page_id = notion_page["id"] # Use as key for the dictionary
- trades[notion_page_id] = trade
- except Exception as e:
- format_errors = format_errors + 1
- error_message = e
- # Main Logging
- if fetch_error == False & format_errors == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"{number_of_trades} trades recieved and formated")
- return trades
- elif fetch_error == False & format_errors > 0:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"{format_errors} trades out of {number_of_trades} skiped...maybe due to missing values?")
- return trades
- else:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # NOTION FETCH & FORMAT INVESTMENT OVERVIEW
- def fetch_format_notion_investments(db_id_investments):
- investments = {}
- fetch_error = False
- format_errors = 0
- number_of_investments = 0
- # Download data & check for success
- data = notion_get_pages(db_id_investments)
- if data is True:
- error = True
- else:
- # Format recieved data
- for i in data:
-
- # Count up for statistics
- number_of_investments = number_of_investments + 1
- try:
- # Each page is loaded as a dictionary
- notion_page = dict(i)
- # Extract values
- notion_page_id = notion_page["id"] # Use as key for the dictionary
- investments[notion_page_id] = {}
- investments[notion_page_id]["ticker"] = notion_page["properties"]["Ticker"]["select"]["name"]
- investments[notion_page_id]["total_dividends"] = notion_page["properties"]["Dividends (€)"]["number"]
- investments[notion_page_id]["current_value"] = notion_page["properties"]["Current (€)"]["number"]
- investments[notion_page_id]["current_irr"] = notion_page["properties"]["IRR (%)"]["number"]
- investments[notion_page_id]["total_performanance"] = notion_page["properties"]["Performance (€)"]["number"]
-
- # Skip this entry, if errors show up
- except:
- format_errors = format_errors + 1
- # Main Logging
- if fetch_error == False & format_errors == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"{number_of_investments} trades recieved and formated")
- return investments
- elif fetch_error == False & format_errors > 0:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"{format_errors} trades out of {number_of_investments} skiped...maybe due to missing values?")
- return investments
- else:
- logging(logging_level="error")
- return False
- # YFINANCE FETCH & FORMAT DATA
- def fetch_format_yf_data(tickers):
-
- yf_data = {}
- fetch_errors = 0
- format_errors = 0
- number_of_tickers = 0
- # Download data for each ticker seperately
- for i in tickers:
-
- number_of_tickers = number_of_tickers +1
- skip_formating = False # Helper varianbel (see flow logik)
- ticker = i
-
- # Catch errors during the download
- try:
- # Download data
- api = yf.Ticker(ticker)
- data = api.history(period="max")
- except:
- # Store error for later logging
- fetch_errors = fetch_errors + 1
- skip_formating = True
- # If the download was successfull:
- if skip_formating == False:
- # Try formating the data
- try:
- # Convert to Pandas DataFrame
- data = pd.DataFrame(data)
- # Delete the columns "Stock Splits", "High", "Low" and "Open"
- del data['Open']
- del data['Low']
- del data['High']
- del data['Volume']
-
- # Delete these 2 columns, if they exist
- if 'Stock Splits' in data.columns:
- del data['Stock Splits']
- if 'Capital Gains' in data.columns:
- del data['Capital Gains']
- # Get the Number of rows in data
- data_rows = data.shape[0]
- # Create new index without the time from the existing datetime64-index
- old_index = data.index
- new_index = []
- x = 0
- while x < data_rows:
- date = pd.Timestamp.date(old_index[x]) # Converts the "Pandas Timestamp"-object to a "date" object
- new_index.append(date)
- x+=1
- # Add the new index to the dataframe and set it as the index
- data.insert(1, 'Date', new_index)
- data.set_index('Date', inplace=True)
-
- # Save the data-frame to the yf_data dict
- yf_data[ticker] = data
-
- # Handle formating errors
- except:
- format_errors = format_errors +1
- # in case of an error the entry never get's added to the yf_data object
- # Wait for the API to cool down
- print(".", end="", flush=True)
- time.sleep(config.api_cooldowm_time)
- # Main Logging
- print(" ", end="", flush=True)
- if fetch_errors == 0 & format_errors == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"{number_of_tickers} tickers recieved and formated")
- return yf_data
- elif fetch_errors == 0 & format_errors > 0:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"{format_errors} tickers out of {number_of_tickers} skiped")
- return yf_data
- else:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {number_of_tickers}")
- print("\n")
- return False
- # ------------------------ #
- # NETWORK UPLOAD FUNCTIONS #
- # ------------------------ #
- # NOTION UPDATE PAGES
- def notion_update_page(page_id: str, data: dict):
- url = f"https://api.notion.com/v1/pages/{page_id}"
- payload = {"properties": data}
- results = requests.patch(url, json=payload, headers=config.notion_headers)
- return results
- # UPDATE NOTION-TRADES-DATABASE
- def push_notion_trades_update(trades):
-
- # Logging
- error_count = 0
- number_of_uploads = 0
- for notion_page_id in trades:
-
- number_of_uploads = number_of_uploads+1
- try:
- # The irr is stored in the format 1.2534
- # Notion need the format 0,2534
- irr_notion = trades[notion_page_id]['irr'] - 1
- irr_notion = round(irr_notion, 4)
- # Construct Notion-Update-Object
- notion_update = {
- "Current (€)": {
- "number": trades[notion_page_id]['course_current']
- },
- "IRR (%)": {
- "number": irr_notion
- },
- "Dividends (€)": {
- "number": trades[notion_page_id]['dividends']
- }
- }
- # Update the properties of the corresponding notion-page
- notion_update_page(notion_page_id, notion_update)
- except:
- error_count = error_count + 1
- # Wait for the API to cool off
- print(".", end="", flush=True)
- time.sleep(config.api_cooldowm_time)
- # Logging
- print(" ", end="", flush=True)
- if error_count == 0:
- logging(logging_level="success")
- elif error_count < number_of_uploads:
- logging(logging_level="warning")
- logging(logging_level="success", message=f"Updating notion trades failed for {error_count} out of {number_of_uploads} entries")
- else:
- logging(logging_level="error")
- logging(logging_level="success", message=f"Updating notion trades failed for all {error_count} entries")
- # UPDATE NOTION-INVESTMENT-OVERVIEW
- def push_notion_investment_update(investments):
-
- # Logging
- error_count = 0
- number_of_uploads = 0
-
- for notion_page_id in investments:
-
- number_of_uploads = number_of_uploads+1
- # Try uploading an update
- try:
- # The irr is stored in the format 1.2534
- # Notion need the format 0,2534
- irr_notion = investments[notion_page_id]['current_irr'] - 1
- irr_notion = round(irr_notion, 4)
-
- # Construct Notion-Update-Object
- notion_update = {
- "Current (€)": {
- "number": investments[notion_page_id]['current_value']
- },
- "IRR (%)": {
- "number": irr_notion
- },
- "Performance (€)": {
- "number": investments[notion_page_id]['total_performanance']
- },
- "Dividends (€)": {
- "number": investments[notion_page_id]['total_dividends']
- }
- }
- # Update the properties of the corresponding notion-page
- notion_update_page(notion_page_id, notion_update)
- except:
- error_count = error_count + 1
-
- # Wait for the API to cool off
- print(".", end="", flush=True)
- time.sleep(config.api_cooldowm_time)
-
- # Logging
- print(" ", end="", flush=True)
- if error_count == 0:
- logging(logging_level="success")
- elif error_count < number_of_uploads:
- logging(logging_level="warning")
- logging(logging_level="success", message=f"Updating notion investments failed for {error_count} out of {number_of_uploads} entries")
- else:
- logging(logging_level="error")
- logging(logging_level="success", message=f"Updating notion investments failed for all {error_count} entries")
- # TRMNL UPDATE DIAGRAMMS
- def push_trmnl_update_chart(trmnl_update_object, trmnl_url, trmnl_headers):
- # Send the data to TRMNL
- try:
- data = json.dumps(trmnl_update_object, indent=2) # Converts a python-dictionary into a json
- reply = requests.post(trmnl_url, data=data, headers=trmnl_headers)
-
- # Logging
- if reply.status_code == 200:
- logging(logging_level="success")
- elif reply.status_code == 429:
- logging_level="success"
- logging(logging_level="warning")
- logging(logging_level="warning", message="Exceeded TRMNL's API rate limits")
- logging(logging_level="warning", message="Waiting some time should work")
- elif reply.status_code == 422:
- logging(logging_level="warning")
- logging(logging_level="warning", message="Upload successful, but data cannot be displayed correctly")
- logging(logging_level="warning", message="The payload is probably to large in size")
- else:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed pushing data to TRMNL with server reply code: {reply.status_code}")
- logging(logging_level="debug", message=f"Complete server reply message: {reply}")
- except Exception as e:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed pushing data to TRMNL with error code: {e}")
- # ----------------------------- #
- # HISTORY CALCULATION FUNCTIONS #
- # ----------------------------- #
- # CALC HISTORY PER TRADE
- def calc_history_per_trade(trades, yf_data):
-
- # Create support variables
- history_per_trade = {}
- total_dividends = 0
- date_open_oldest_trade = get_date_open_oldest_trade(trades)
- # Logging & statistics
- missing_day_entrys = 0
- days_formated = 0
- number_of_trades = 0
-
- # As this history is so important, it is okay if this functions fails in total if errors araise
- try:
- # ------------------ LOOP OVER ALL TRADES
- for trade_id in trades:
- # Statistics
- number_of_trades = number_of_trades +1
-
- # ------------------ PREPARE FOR THE (NEXT) LOOP OVER ALL DAYS
- # Set / Reset the index-date to the oldest trade day
- # Resetting is required so that the calculations for the next trade start with day 1
- index_date = date_open_oldest_trade
- # Set the initial value for the course on the previous day to 0
- # Just in case the very first trade was made on a weekend somehow, where there is no yfinance data available
- previous_course = 0.0
- # Check, if the trade was closed already
- # If it was not, set the closure date to the future (Trick 17)
- if trades[trade_id]["date_close"] == 0:
- date_close = datetime.date.today() + datetime.timedelta(days=1)
- else:
- date_close = trades[trade_id]["date_close"]
- date_open = trades[trade_id]["date_open"]
- # Keep ticker for connecting performance later
- ticker = trades[trade_id]['ticker']
- # ------------------ DETERMINE THE COUSE PER DAY
- while index_date != datetime.date.today() + datetime.timedelta(days=1):
- # Statistics
- days_formated = days_formated +1
- # Fetch course for the day & eventual dividends from yf_data
- try:
- current_course = yf_data[ticker].at[index_date, 'Close']
- current_dividends_per_ticker = yf_data[ticker].at[index_date, 'Dividends']
-
- # Catch missing yf-data (eg. for weekends) by reusing course from previous day
- except:
- current_course = previous_course
- current_dividends_per_ticker = 0.0 # there are never dividends on non-trading days
- missing_day_entrys = missing_day_entrys +1 # Increase the warning count
- # Catch the special case of the day when the trade was closed
- # In this case, the current course needs to be overwritten with the sell-value
- if date_close == index_date:
- current_course = trades[trade_id]['course_close']
- # Save the result for the next iteration
- # This setup also makes it possible, that a previous course is passed down across mutiple days
- # This makes sense is case i.e. for a weekend
- previous_course = current_course
- # ------------------ CALCULATE PERFORMANCE IF REQUIRED
- if index_date >= date_open and index_date <= date_close:
- # Calculate performance values
- current_amount = trades[trade_id]['units']
- current_invested = current_amount * trades[trade_id]['course_open']
- total_dividends = total_dividends + current_amount * current_dividends_per_ticker
- current_value = current_amount * current_course
- current_value_with_dividends = current_value + total_dividends
- current_irr = calculate_irr(index_date, date_open, current_value_with_dividends, current_invested)
- total_performanance = current_value_with_dividends - current_invested
- if current_value_with_dividends == 0:
- print("0-value Error with ticker: {}".format(ticker))
-
- else:
- # Write 0, if trade is not relevant for current timeframe
- current_amount = 0
- current_invested = 0.00
- total_dividends = 0.00
- current_value = 0.00
- current_irr = 0.00
- total_performanance = 0.0
- # ------------------ STORE RESULTS
- index_date_iso = index_date.isoformat()
- # Store all values into a dict
- dict_a = {}
- dict_a['current_amount'] = current_amount
- dict_a['current_invested'] = current_invested
- dict_a['total_dividends'] = total_dividends
- dict_a['current_value'] = current_value
- dict_a['current_irr'] = current_irr
- dict_a['current_course'] = current_course
- dict_a['total_performanance'] = total_performanance
- # Check if the date is already present
- if index_date_iso in history_per_trade:
- dict_b = history_per_trade[index_date_iso]
- else:
- dict_b = {}
- # Add the values to the trade_id value-pair
- dict_b[trade_id] = dict_a
- # Update the hostory_per_trade
- history_per_trade.update({index_date_iso : dict_b})
- # ------------------ NEXT ITERATION
- index_date = index_date + datetime.timedelta(days=1)
- # ------------------ LOGGING & DEBUGING
- # Debug writing history to disk
- if config.selected_logging_level == "debug":
- data = json.dumps(history_per_trade, indent=2) # Converts a python-dictionary into a json
- with open("history_per_trade.json", "w") as f:
- f.write(data)
- # Logging logging
- if missing_day_entrys == 0:
- logging(logging_level="success")
- logging(logging_level="info", message=f"created a history with {days_formated} across all {number_of_trades} tickers o_O")
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"No yf-data available in {missing_day_entrys} cases accross all {number_of_trades} tickers")
- logging(logging_level="warning", message="Probably reason is non-trading-days eg. weekends")
- logging(logging_level="warning", message="Used values from previous trade-day instead")
- # Return date
- return history_per_trade
-
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error message: {error_message}")
- return False
- # CALC THE HISTORY PER TRADE & OVERALL
- def calc_history_per_ticker(history_per_trade, tickers, trades):
-
- # ------------------ CREATE JSON OBJECT
- # Create the json-dict
- history_per_ticker = {}
- # Logging & statistics
- missing_day_entrys = 0
- days_formated = 0
- # As this history is so important, it is okay if this functions fails in total if errors araise
- try:
- # Loop over each date entry in the history
- for date_entry in history_per_trade:
-
- # Statistics
- days_formated = days_formated +1
- # Create a dict to store the results per day and ticker
- dict_daily = {}
- for ticker in tickers:
- dict_daily[ticker] = {}
- dict_daily[ticker]["current_invested"] = 0
- dict_daily[ticker]["total_dividends"] = 0
- dict_daily[ticker]["current_value"] = 0
- dict_daily[ticker]["current_irr"] = 0
- dict_daily[ticker]["current_irr"] = 0
- dict_daily[ticker]["total_performanance"] = 0
- dict_daily[ticker]["current_amount"] = 0 # Added only for ticker entries, not for the "total" value
- dict_daily[ticker]["current_course"] = 0 # Added only for ticker entries, not for the "total" value
- dict_daily["total"] = {}
- dict_daily["total"]["current_invested"] = 0
- dict_daily["total"]["total_dividends"] = 0
- dict_daily["total"]["current_value"] = 0
- dict_daily["total"]["current_irr"] = 0
- dict_daily["total"]["current_irr"] = 0
- dict_daily["total"]["total_performanance"] = 0
- # Loop over each trade-entry for that day
- for trade_id in history_per_trade[date_entry]:
- # Extract data from the history_per_trade
- trade_amount = history_per_trade[date_entry][trade_id]['current_amount']
- trade_invested = history_per_trade[date_entry][trade_id]['current_invested']
- trade_dividends = history_per_trade[date_entry][trade_id]['total_dividends']
- trade_value = history_per_trade[date_entry][trade_id]['current_value']
- trade_irr = history_per_trade[date_entry][trade_id]['current_irr']
- trade_course = history_per_trade[date_entry][trade_id]['current_course']
- trade_performanance = history_per_trade[date_entry][trade_id]['total_performanance']
- # Lookup the ticker by the trade-id
- ticker = trades[trade_id]["ticker"]
- # Extract data from the history_per_ticker
- ticker_amount = dict_daily[ticker]['current_amount']
- ticker_invested = dict_daily[ticker]['current_invested']
- ticker_dividends = dict_daily[ticker]['total_dividends']
- ticker_value = dict_daily[ticker]['current_value']
- ticker_irr = dict_daily[ticker]['current_irr']
- ticker_performanance = dict_daily[ticker]['total_performanance']
- # Overwrite the values in the history_per_ticker
- dict_daily[ticker]['current_amount'] = ticker_amount + trade_amount # Simple addition works
- dict_daily[ticker]['current_invested'] = ticker_invested + trade_invested
- dict_daily[ticker]['total_dividends'] = ticker_dividends + trade_dividends
- dict_daily[ticker]['current_value'] = ticker_value + trade_value
- dict_daily[ticker]['total_performanance'] = ticker_performanance + trade_performanance
- dict_daily[ticker]['current_course'] = trade_course # Simple overwrite is fine, as the course is the same for all trades
- if ticker_invested == 0 and trade_invested == 0:
- dict_daily[ticker]['current_irr'] = 0
- # Catch 0 values
- else:
- dict_daily[ticker]['current_irr'] = (ticker_irr * ticker_invested + trade_irr * trade_invested) / (ticker_invested + trade_invested)
- # --> IRR is adjusted based on the trade values. This way a trade of 25% of the initial trade volume has only a 25% influence on the irr
- # Calculate the "total" entry after finishing with all the trades
- for ticker in tickers:
- # Same logic as above, but shortended code
- dict_daily["total"]['total_dividends'] = dict_daily["total"]['total_dividends'] + dict_daily[ticker]['total_dividends']
- dict_daily["total"]['current_value'] = dict_daily["total"]['current_value'] + dict_daily[ticker]['current_value']
- dict_daily["total"]['total_performanance'] = dict_daily["total"]['total_performanance'] + dict_daily[ticker]['total_performanance']
- # Extracting the values before rewriting them, to preserve them for the IRR calculation
- total_invested = dict_daily["total"]['current_invested']
- ticker_invested = dict_daily[ticker]['current_invested']
- dict_daily["total"]['current_invested'] = total_invested + ticker_invested
-
- # Extracting the values before rewriting them, to preserve them for the IRR calculation
- if ticker_invested == 0 and total_invested == 0:
- dict_daily["total"]['current_irr'] = 0
- else:
- total_irr = dict_daily["total"]['current_irr']
- ticker_irr = dict_daily[ticker]['current_irr']
- dict_daily["total"]['current_irr'] = (total_irr * total_invested + ticker_irr * ticker_invested) / (total_invested + ticker_invested)
-
- # Finally, write the results for this day-entry to the history_per_ticker
- history_per_ticker[date_entry] = dict_daily
- # ------------------ LOGGING & DEBUGING
- # Debugging
- if config.selected_logging_level == "debug":
- data = json.dumps(history_per_ticker, indent=2) # Converts a python-dictionary into a json
- with open("history_per_ticker.json", "w") as f:
- f.write(data)
- # Success Logging
- logging(logging_level="success")
- logging(logging_level="info", message=f"created a history with {days_formated} days formated o_O")
- return history_per_ticker
-
- # Error Logging
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error message: {error_message}")
- return False
- # --------------------------- #
- # HISTORY SELECTION FUNCTIONS #
- # --------------------------- #
- # FILTER ANY HISTORY OBJECT TO SELECTED DATES
- def filter_history_by_list(history, dates_list):
- filtered_history = {}
- try:
- # Loop over all days
- for index_date in history:
- # Check, if the history-date is in the filter-list
- if index_date in dates_list:
- # If so, add this date-entry to the filtered history object
- filtered_history[index_date] = history[index_date]
- # Main Logging
- logging(logging_level="success")
- return filtered_history
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
- # SELECT CURRENT VALUES PER TRADE
- def select_current_value_per_trade(trades, history_per_trade):
-
- # Logging
- format_errors = 0
- # Loop over all trades
- for trade_id in trades:
- try:
- # Determine, what values to fetch based on whether the trade was closed already
- date_closed = trades[trade_id]["date_close"]
- if date_closed == 0:
-
- # If trade still open, use performance data from today
- index_date_iso = datetime.date.today().isoformat()
-
- else:
- # If trade closed, use performance data from close-date
- index_date_iso = date_closed.isoformat()
-
- # Fetch data from history and save for this trade
- trades[trade_id]["course_current"] = history_per_trade[index_date_iso][trade_id]['current_course']
- trades[trade_id]["irr"] = history_per_trade[index_date_iso][trade_id]['current_irr']
- trades[trade_id]["dividends"] = history_per_trade[index_date_iso][trade_id]['total_dividends']
- except:
- format_errors = format_errors + 1
- # Logging logging
- if format_errors == 0:
- logging(logging_level="success")
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"Failed updating the current value per trade in {format_errors} cases")
- return trades
- # SELECT CURRENT VALUES PER TICKER
- def select_current_value_per_ticker(investments, history_per_ticker):
- # Logging
- format_errors = 0
- # Loop over all investments
- for investment_id in investments:
- try:
- # Generate the iso-date of today as the required index
- index_date_iso = datetime.date.today().isoformat()
- # Get the ticker corresponding to the investment
- ticker = investments[investment_id]["ticker"]
- # Select latest data from history and save for this investment
- investments[investment_id]["total_dividends"] = history_per_ticker[index_date_iso][ticker]['total_dividends']
- investments[investment_id]["current_value"] = history_per_ticker[index_date_iso][ticker]['current_value']
- investments[investment_id]["current_irr"] = history_per_ticker[index_date_iso][ticker]['current_irr']
- investments[investment_id]["total_performanance"] = history_per_ticker[index_date_iso][ticker]['total_performanance']
- except:
- format_errors = format_errors + 1
- # Logging
- if format_errors == 0:
- logging(logging_level="success")
- else:
- logging(logging_level="warning")
- logging(logging_level="warning", message=f"Failed updating the current value per ticker in {format_errors} cases")
- return investments
- # TRMNL CREATE IRR-UPDATE
- def prep_trmnl_chart_udpate(history_to_show, series_to_show_1 = "total", data_to_show_1 = "current_value", series_to_show_2 = "bechnmark", data_to_show_2 = "current_value"): # default value = current invested
-
- # Setup
- dict_big_numbers = {}
- charts_data = []
- chart_1 = {}
- chart_2 = {}
- try:
- # Fetch the latest date entry from the history
- index_date_iso = fetch_last_key_from_dict(history_to_show)
-
- # Select latest data from history for the big-numbers
- current_value = history_to_show[index_date_iso]["total"]["current_value"]
- total_performanance = history_to_show[index_date_iso]["total"]["total_performanance"]
- current_irr = history_to_show[index_date_iso]["total"]["current_irr"]
- current_irr = (current_irr -1) *100
-
- # Round the nubers
- dict_big_numbers["current_value"] = str(round(current_value, 0))
- dict_big_numbers["total_performanance"] = str(round(total_performanance, 0))
- dict_big_numbers["current_irr"] = str(round(current_irr, 2))
- # Catching false inputs for the series to show
- possible_series_to_show = list(history_to_show[index_date_iso].keys()) # Get a list of all the series values, that could be shown
- if series_to_show_1 not in possible_series_to_show: # checks, if the selected series is not part of the history-object sent to the function
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'total' as the series to show, as the input was not valid")
- series_to_show_1 = "total"
- if series_to_show_2 not in possible_series_to_show: # checks, if the selected series is not part of the history-object sent to the function
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'total' as the series to show, as the input was not valid")
- series_to_show_2 = "total"
- # Catching false inputs for the data to show
- possible_data_to_show = list(history_to_show[index_date_iso][series_to_show_1].keys())
- if data_to_show_1 not in possible_data_to_show:
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'current invested' as chart data, as the input was not valid")
- data_to_show_1 = "current_value"
- possible_data_to_show = list(history_to_show[index_date_iso][series_to_show_2].keys())
- if data_to_show_2 not in possible_data_to_show:
- logging(logging_level="warning")
- logging(logging_level="warning", message="Selecting 'current invested' as chart data, as the input was not valid")
- data_to_show_2 = "current_value"
- # Create space for storing values
- chart_1["data"] = []
- chart_2["data"] = []
- # Format the chart data into the right data
- for date in history_to_show:
-
- # Extract the value to be stored
- value_to_show_1 = history_to_show[date][series_to_show_1][data_to_show_1]
- value_to_show_2 = history_to_show[date][series_to_show_2][data_to_show_2]
- # Catch the case irr and convert to percent
- if data_to_show_1 == "current_irr":
- value_to_show_1 = (value_to_show_1 -1) *100
-
- if data_to_show_2 == "current_irr":
- value_to_show_2 = (value_to_show_2 -1) *100
- # Round to 2 decimal values
- value_to_show_1 = round(value_to_show_1, 2)
- value_to_show_2 = round(value_to_show_2, 2)
- # Extend the date by a timestamp
- json_date = datetime.date.fromisoformat(date) # Convert ISO-String to python date-object
- json_date = datetime.datetime.combine(json_date, datetime.datetime.min.time()) # Combine the date with midnight (00:00:00) to create a datetime object
- json_date = json_date.isoformat() # Convert back to ISO-String, now including a time
- # Store the values together with the corresponding date
- value_1 = [json_date, value_to_show_1]
- value_2 = [json_date, value_to_show_2]
-
- # Add the value pair to the list of values for this chart
- chart_1["data"].append(value_1)
- chart_2["data"].append(value_2)
- # Add the two series to the list of series in the TRML object
- charts_data.append(chart_1)
- charts_data.append(chart_2)
- # Generating nicer series titels
- if series_to_show_1 == "total":
- series_to_show_1 = "Portfolio"
- if series_to_show_2 == "total":
- series_to_show_2 = "Portfolio"
- # Generating nicer data titels
- data_to_show_1 = data_to_show_1.replace("_", " ").capitalize()
- data_to_show_2 = data_to_show_2.replace("_", " ").capitalize()
- # Increase look of IRR even more
- # Funktioniert auch dann, wenn "irr" nicht vorkommt
- data_to_show_1 = data_to_show_1.replace("irr", "IRR")
- data_to_show_2 = data_to_show_2.replace("irr", "IRR")
- # Generate the chat names / desciptions
- chart_1["name"] = data_to_show_1 + " " + series_to_show_1
- chart_2["name"] = data_to_show_2 + " " + series_to_show_2
- # Construct the trmnl_object
- trmnl_update_object = {}
- trmnl_update_object["merge_variables"] = {}
- trmnl_update_object["merge_variables"]["big_numbers"] = dict_big_numbers
- trmnl_update_object["merge_variables"]["charts"] = charts_data
- # Debugging
- if config.selected_logging_level == "debug":
- data = json.dumps(trmnl_update_object, indent=2) # Converts a python-dictionary into a json
- with open("trmnl_update_object.json", "w") as f:
- f.write(data)
- # Main Logging
- logging(logging_level="success")
- return trmnl_update_object
- except Exception as error_message:
- logging(logging_level="error")
- logging(logging_level="error", message=f"Failed with error: {error_message}")
- return False
|