Files
Notion-Performance-Tracker/functions.py

1047 lines
37 KiB
Python

### -------------------- LIBARIES --------------------
import datetime
import time
import json
import yfinance as yf
import pandas as pd
import requests
import config
### -------------------- FUNCTIONS --------------------
# ---------------- #
# HELPER FUNCTIONS #
# ---------------- #
# LOGGING / PRINTING TO TERMINAL
def logging(message = "", logging_level = "", new_line = True):
# Take the selected logging level in the config file
# Look this up in the list of all available logging levels in the config file
# Return the index number
config_logging_level = config.logging_levels.index(config.selected_logging_level)
try:
# Take the logging level of the text to print
# Look this up in the list of all available logging levels in the config file
# Return the index number
message_logging_level = config.logging_levels.index(logging_level)
except:
# Fallback to the least important logging level
# Solved by checking the lenght of the available logging levels
message_logging_level = len(config.logging_levels)
# Check for false new_line entries
if new_line is not bool:
new_line = True
# Check if the warning should be printed
if message_logging_level <= config_logging_level:
# Geting the log color
log_color = getattr(config.log_colors, logging_level)
# Construct the logging-text incl. color
log_text = str(log_color + "[" + logging_level + "] " + config.log_colors.endcode + message)
# Check if the warning should end with a new-line
# Printing the text
if new_line == True:
print(log_text)
else:
print(log_text, end=" ", flush=True)
# CALCULATE THE IRR
def calculate_irr(date_now, date_open, value_now, value_open):
error = False
irr = 0.0
try:
# Count the number in days
a = date_now - date_open
a = a.days
# Am Tag des Kaufs selbst, liegt das Delta in Tagen bei 0
# Um dennoch einen IRR kalkulieren zu können, wird das Delta auf 1 gsetzt
if a == 0:
a = 1
a = a / 365 # Umrechnung auf Jahresanteil, um auch den Jahreszinssatz zu bekommen
b = value_now / value_open
# Catch negative IRRs
if b < 0:
b = b * (-1)
irr = b**(1/a) # matematisch identisch zur b-ten Wurzel von a
irr = irr * (-1)
else:
irr = b**(1/a) # matematisch identisch zur b-ten Wurzel von a
except:
error = True
# Return data if successful
if error == True:
print("[ERROR] Calculation of irr")
return error
else:
return irr
# GET THE DAY OF THE OLDEST TRADE
def get_date_open_oldest_trade(trades):
# Identify the open date for the oldest trade
date_open_oldest_trade = datetime.date.today()
for i in trades:
if trades[i]["date_open"] < date_open_oldest_trade:
date_open_oldest_trade = trades[i]["date_open"]
return date_open_oldest_trade
# CREATES LIST OF UNIQUE TICKERS
def filter_list_of_tickers(trades):
tickers = []
try:
for i in trades:
# Fetch ticker belonging to trade
ticker = trades[i]['ticker']
# Add ticker to list, if not already present
if ticker not in tickers:
tickers.append(ticker)
# Main Logging
logging(logging_level="success")
logging(logging_level="info", message=f"{len(tickers)} tickers found")
return tickers
except Exception as error_message:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed with error: {error_message}")
return False
# CREATE LIST OF WEEKLY DATES
def create_list_filtered_dates(trades, days_seperation):
stop_date = get_date_open_oldest_trade(trades)
index_date = datetime.date.today()
try:
# Create reversed list (1st entry is today going back in time)
list_filtered_dates = []
while index_date >= stop_date:
list_filtered_dates.append(index_date.isoformat())
index_date = index_date - datetime.timedelta(days=days_seperation)
# Reverse the list, so that the frist entry is the oldest one
list_filtered_dates.reverse()
# Main Logging
logging(logging_level="success")
logging(logging_level="info", message=f"{len(list_filtered_dates)} dates in weekly list")
return list_filtered_dates
except Exception as error_message:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed with error: {error_message}")
return False
# FETCH THE LAST INDEX FROM A DICT
def fetch_last_key_from_dict(dict):
key_list = list(dict.keys()) # Extract the keys and convert them to a list
last_key = key_list[-1] # select the last entry from the list as it is the most current entry
return last_key
# ADD BENCHMARK-TICKER TO TICKER-DICT
def add_benchmark_ticker(tickers, ticker_benchmarkt):
tickers.append(ticker_benchmarkt)
logging(logging_level="success")
# CREATE BENCHMARK TRADES
def create_benchmark_trades(trades):
try:
benchmark_trades = trades
for trade_id in benchmark_trades:
benchmark_trades[trade_id]['ticker'] = config.ticker_benchmark
logging(logging_level="success")
return benchmark_trades
except Exception as error_message:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed with error: {error_message}")
return False
# -------------------------- #
# NETWORK DOWNLOAD FUNCTIONS #
# -------------------------- #
# NOTION FETCH PAGES
def notion_get_pages(db_id_trades, num_pages=None):
try:
# ------------------ FETCH THE FIRST 100 PAGES FROM A DB
# Prepare Request
url = f"https://api.notion.com/v1/databases/{db_id_trades}/query"
get_all = num_pages is None # If num_pages is None, get all pages, otherwise just the defined number.
page_size = 100 if get_all else num_pages
payload = {"page_size": page_size}
# Make Request
raw_response = requests.post(url, json=payload, headers=config.notion_headers)
# Process Reply
parsed_response = raw_response.json()
result = parsed_response["results"]
# ------------------ FETCH 100 MORE PAGES AS OFTEN AS REQUIRED
while parsed_response["has_more"] and get_all:
# Prepare Request
payload = {"page_size": page_size, "start_cursor": parsed_response["next_cursor"]}
url = f"https://api.notion.com/v1/databases/{db_id_trades}/query"
# Make Request
raw_response = requests.post(url, json=payload, headers=config.notion_headers)
# Process Reply
parsed_response = raw_response.json()
result.extend(parsed_response["results"])
# Logging
return result
except Exception:
return True # Return True when there was an error
# NOTION FETCH & FORMAT TRADES
def fetch_format_notion_trades(db_id_trades):
trades = {}
fetch_error = False
format_errors = 0
number_of_trades = 0
error_message = ""
# Download data from notion
data = notion_get_pages(db_id_trades)
# Check, if cuccessfull
if data is True:
fetch_error = True
else:
# Format the recieved data
for i in data:
# Count for stratistics
number_of_trades = number_of_trades + 1
# Each page is loaded as a dictionary
notion_page = dict(i)
# Handling desired missing entries
try:
date_close = notion_page["properties"]["Close"]["date"]
date_close = date_close["start"]
date_close = datetime.date(*map(int, date_close.split('-')))
except:
date_close = 0
# Handeling non-desired missing entries (by skipping this trade)
try:
# Try extracting values
trade = {}
# Format date-open
date_open = notion_page["properties"]["Open"]["date"]
date_open = date_open["start"]
date_open = datetime.date(*map(int, date_open.split('-')))
# Combine data into json structure
trade = {
'ticker' : notion_page["properties"]["Ticker"]["select"]["name"],
'date_open' : date_open,
'date_close' : date_close,
'course_open' : notion_page["properties"]["Open (€)"]["number"],
'course_close' : notion_page["properties"]["Close (€)"]["number"],
'course_current' : notion_page["properties"]["Current (€)"]["number"],
'irr' : notion_page["properties"]["IRR (%)"]["number"],
'units' : notion_page["properties"]["Units"]["number"],
'dividends' : notion_page["properties"]["Dividends (€)"]["number"]
}
# Save values
notion_page_id = notion_page["id"] # Use as key for the dictionary
trades[notion_page_id] = trade
except Exception as e:
format_errors = format_errors + 1
error_message = e
# Main Logging
if fetch_error == False & format_errors == 0:
logging(logging_level="success")
logging(logging_level="info", message=f"{number_of_trades} trades recieved and formated")
return trades
elif fetch_error == False & format_errors > 0:
logging(logging_level="warning")
logging(logging_level="warning", message=f"{format_errors} trades out of {number_of_trades} skiped...maybe due to missing values?")
return trades
else:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed with error: {error_message}")
return False
# NOTION FETCH & FORMAT INVESTMENT OVERVIEW
def fetch_format_notion_investments(db_id_investments):
investments = {}
fetch_error = False
format_errors = 0
number_of_investments = 0
# Download data & check for success
data = notion_get_pages(db_id_investments)
if data is True:
error = True
else:
# Format recieved data
for i in data:
# Count up for statistics
number_of_investments = number_of_investments + 1
try:
# Each page is loaded as a dictionary
notion_page = dict(i)
# Extract values
notion_page_id = notion_page["id"] # Use as key for the dictionary
investments[notion_page_id] = {}
investments[notion_page_id]["ticker"] = notion_page["properties"]["Ticker"]["select"]["name"]
investments[notion_page_id]["total_dividends"] = notion_page["properties"]["Dividends (€)"]["number"]
investments[notion_page_id]["current_value"] = notion_page["properties"]["Current (€)"]["number"]
investments[notion_page_id]["current_irr"] = notion_page["properties"]["IRR (%)"]["number"]
investments[notion_page_id]["total_performanance"] = notion_page["properties"]["Performance (€)"]["number"]
# Skip this entry, if errors show up
except:
format_errors = format_errors + 1
# Main Logging
if fetch_error == False & format_errors == 0:
logging(logging_level="success")
logging(logging_level="info", message=f"{number_of_investments} trades recieved and formated")
return investments
elif fetch_error == False & format_errors > 0:
logging(logging_level="warning")
logging(logging_level="warning", message=f"{format_errors} trades out of {number_of_investments} skiped...maybe due to missing values?")
return investments
else:
logging(logging_level="error")
return False
# YFINANCE FETCH & FORMAT DATA
def fetch_format_yf_data(tickers):
yf_data = {}
fetch_errors = 0
format_errors = 0
number_of_tickers = 0
# Download data for each ticker seperately
for i in tickers:
number_of_tickers = number_of_tickers +1
skip_formating = False # Helper varianbel (see flow logik)
ticker = i
# Catch errors during the download
try:
# Download data
api = yf.Ticker(ticker)
data = api.history(period="max")
except:
# Store error for later logging
fetch_errors = fetch_errors + 1
skip_formating = True
# If the download was successfull:
if skip_formating == False:
# Try formating the data
try:
# Convert to Pandas DataFrame
data = pd.DataFrame(data)
# Delete the columns "Stock Splits", "High", "Low" and "Open"
del data['Open']
del data['Low']
del data['High']
del data['Volume']
# Delete these 2 columns, if they exist
if 'Stock Splits' in data.columns:
del data['Stock Splits']
if 'Capital Gains' in data.columns:
del data['Capital Gains']
# Get the Number of rows in data
data_rows = data.shape[0]
# Create new index without the time from the existing datetime64-index
old_index = data.index
new_index = []
x = 0
while x < data_rows:
date = pd.Timestamp.date(old_index[x]) # Converts the "Pandas Timestamp"-object to a "date" object
new_index.append(date)
x+=1
# Add the new index to the dataframe and set it as the index
data.insert(1, 'Date', new_index)
data.set_index('Date', inplace=True)
# Save the data-frame to the yf_data dict
yf_data[ticker] = data
# Handle formating errors
except:
format_errors = format_errors +1
# in case of an error the entry never get's added to the yf_data object
# Wait for the API to cool down
print(".", end="", flush=True)
time.sleep(config.api_cooldowm_time)
# Main Logging
print(" ", end="", flush=True)
if fetch_errors == 0 & format_errors == 0:
logging(logging_level="success")
logging(logging_level="info", message=f"{number_of_tickers} tickers recieved and formated")
return yf_data
elif fetch_errors == 0 & format_errors > 0:
logging(logging_level="warning")
logging(logging_level="warning", message=f"{format_errors} tickers out of {number_of_tickers} skiped")
return yf_data
else:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed with error: {number_of_tickers}")
print("\n")
return False
# ------------------------ #
# NETWORK UPLOAD FUNCTIONS #
# ------------------------ #
# NOTION UPDATE PAGES
def notion_update_page(page_id: str, data: dict):
url = f"https://api.notion.com/v1/pages/{page_id}"
payload = {"properties": data}
results = requests.patch(url, json=payload, headers=config.notion_headers)
return results
# UPDATE NOTION-TRADES-DATABASE
def push_notion_trades_update(trades):
# Logging
error_count = 0
number_of_uploads = 0
for notion_page_id in trades:
number_of_uploads = number_of_uploads+1
try:
# The irr is stored in the format 1.2534
# Notion need the format 0,2534
irr_notion = trades[notion_page_id]['irr'] - 1
irr_notion = round(irr_notion, 4)
# Construct Notion-Update-Object
notion_update = {
"Current (€)": {
"number": trades[notion_page_id]['course_current']
},
"IRR (%)": {
"number": irr_notion
},
"Dividends (€)": {
"number": trades[notion_page_id]['dividends']
}
}
# Update the properties of the corresponding notion-page
notion_update_page(notion_page_id, notion_update)
except:
error_count = error_count + 1
# Wait for the API to cool off
print(".", end="", flush=True)
time.sleep(config.api_cooldowm_time)
# Logging
print(" ", end="", flush=True)
if error_count == 0:
logging(logging_level="success")
elif error_count < number_of_uploads:
logging(logging_level="warning")
logging(logging_level="success", message=f"Updating notion trades failed for {error_count} out of {number_of_uploads} entries")
else:
logging(logging_level="error")
logging(logging_level="success", message=f"Updating notion trades failed for all {error_count} entries")
# UPDATE NOTION-INVESTMENT-OVERVIEW
def push_notion_investment_update(investments):
# Logging
error_count = 0
number_of_uploads = 0
for notion_page_id in investments:
number_of_uploads = number_of_uploads+1
# Try uploading an update
try:
# The irr is stored in the format 1.2534
# Notion need the format 0,2534
irr_notion = investments[notion_page_id]['current_irr'] - 1
irr_notion = round(irr_notion, 4)
# Construct Notion-Update-Object
notion_update = {
"Current (€)": {
"number": investments[notion_page_id]['current_value']
},
"IRR (%)": {
"number": irr_notion
},
"Performance (€)": {
"number": investments[notion_page_id]['total_performanance']
},
"Dividends (€)": {
"number": investments[notion_page_id]['total_dividends']
}
}
# Update the properties of the corresponding notion-page
notion_update_page(notion_page_id, notion_update)
except:
error_count = error_count + 1
# Wait for the API to cool off
print(".", end="", flush=True)
time.sleep(config.api_cooldowm_time)
# Logging
print(" ", end="", flush=True)
if error_count == 0:
logging(logging_level="success")
elif error_count < number_of_uploads:
logging(logging_level="warning")
logging(logging_level="success", message=f"Updating notion investments failed for {error_count} out of {number_of_uploads} entries")
else:
logging(logging_level="error")
logging(logging_level="success", message=f"Updating notion investments failed for all {error_count} entries")
# TRMNL UPDATE DIAGRAMMS
def push_trmnl_update_chart(trmnl_update_object, trmnl_url, trmnl_headers):
# Send the data to TRMNL
try:
data = json.dumps(trmnl_update_object, indent=2) # Converts a python-dictionary into a json
reply = requests.post(trmnl_url, data=data, headers=trmnl_headers)
# Logging
if reply.status_code == 200:
logging(logging_level="success")
elif reply.status_code == 429:
logging_level="success"
logging(logging_level="warning")
logging(logging_level="warning", message="Exceeded TRMNL's API rate limits")
logging(logging_level="warning", message="Waiting some time should work")
elif reply.status_code == 422:
logging(logging_level="warning")
logging(logging_level="warning", message="Upload successful, but data cannot be displayed correctly")
logging(logging_level="warning", message="The payload is probably to large in size")
else:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed pushing data to TRMNL with server reply code: {reply.status_code}")
logging(logging_level="debug", message=f"Complete server reply message: {reply}")
except Exception as e:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed pushing data to TRMNL with error code: {e}")
# ----------------------------- #
# HISTORY CALCULATION FUNCTIONS #
# ----------------------------- #
# CALC HISTORY PER TRADE
def calc_history_per_trade(trades, yf_data):
# Create support variables
history_per_trade = {}
total_dividends = 0
date_open_oldest_trade = get_date_open_oldest_trade(trades)
# Logging & statistics
missing_day_entrys = 0
days_formated = 0
number_of_trades = 0
# As this history is so important, it is okay if this functions fails in total if errors araise
try:
# ------------------ LOOP OVER ALL TRADES
for trade_id in trades:
# Statistics
number_of_trades = number_of_trades +1
# ------------------ PREPARE FOR THE (NEXT) LOOP OVER ALL DAYS
# Set / Reset the index-date to the oldest trade day
# Resetting is required so that the calculations for the next trade start with day 1
index_date = date_open_oldest_trade
# Set the initial value for the course on the previous day to 0
# Just in case the very first trade was made on a weekend somehow, where there is no yfinance data available
previous_course = 0.0
# Check, if the trade was closed already
# If it was not, set the closure date to the future (Trick 17)
if trades[trade_id]["date_close"] == 0:
date_close = datetime.date.today() + datetime.timedelta(days=1)
else:
date_close = trades[trade_id]["date_close"]
date_open = trades[trade_id]["date_open"]
# Keep ticker for connecting performance later
ticker = trades[trade_id]['ticker']
# ------------------ DETERMINE THE COUSE PER DAY
while index_date != datetime.date.today() + datetime.timedelta(days=1):
# Statistics
days_formated = days_formated +1
# Fetch course for the day & eventual dividends from yf_data
try:
current_course = yf_data[ticker].at[index_date, 'Close']
current_dividends_per_ticker = yf_data[ticker].at[index_date, 'Dividends']
# Catch missing yf-data (eg. for weekends) by reusing course from previous day
except:
current_course = previous_course
current_dividends_per_ticker = 0.0 # there are never dividends on non-trading days
missing_day_entrys = missing_day_entrys +1 # Increase the warning count
# Catch the special case of the day when the trade was closed
# In this case, the current course needs to be overwritten with the sell-value
if date_close == index_date:
current_course = trades[trade_id]['course_close']
# Save the result for the next iteration
# This setup also makes it possible, that a previous course is passed down across mutiple days
# This makes sense is case i.e. for a weekend
previous_course = current_course
# ------------------ CALCULATE PERFORMANCE IF REQUIRED
if index_date >= date_open and index_date <= date_close:
# Calculate performance values
current_amount = trades[trade_id]['units']
current_invested = current_amount * trades[trade_id]['course_open']
total_dividends = total_dividends + current_amount * current_dividends_per_ticker
current_value = current_amount * current_course
current_value_with_dividends = current_value + total_dividends
current_irr = calculate_irr(index_date, date_open, current_value_with_dividends, current_invested)
total_performanance = current_value_with_dividends - current_invested
if current_value_with_dividends == 0:
print("0-value Error with ticker: {}".format(ticker))
else:
# Write 0, if trade is not relevant for current timeframe
current_amount = 0
current_invested = 0.00
total_dividends = 0.00
current_value = 0.00
current_irr = 0.00
total_performanance = 0.0
# ------------------ STORE RESULTS
index_date_iso = index_date.isoformat()
# Store all values into a dict
dict_a = {}
dict_a['current_amount'] = current_amount
dict_a['current_invested'] = current_invested
dict_a['total_dividends'] = total_dividends
dict_a['current_value'] = current_value
dict_a['current_irr'] = current_irr
dict_a['current_course'] = current_course
dict_a['total_performanance'] = total_performanance
# Check if the date is already present
if index_date_iso in history_per_trade:
dict_b = history_per_trade[index_date_iso]
else:
dict_b = {}
# Add the values to the trade_id value-pair
dict_b[trade_id] = dict_a
# Update the hostory_per_trade
history_per_trade.update({index_date_iso : dict_b})
# ------------------ NEXT ITERATION
index_date = index_date + datetime.timedelta(days=1)
# ------------------ LOGGING & DEBUGING
# Debug writing history to disk
if config.selected_logging_level == "debug":
data = json.dumps(history_per_trade, indent=2) # Converts a python-dictionary into a json
with open("history_per_trade.json", "w") as f:
f.write(data)
# Logging logging
if missing_day_entrys == 0:
logging(logging_level="success")
logging(logging_level="info", message=f"created a history with {days_formated} across all {number_of_trades} tickers o_O")
else:
logging(logging_level="warning")
logging(logging_level="warning", message=f"No yf-data available in {missing_day_entrys} cases accross all {number_of_trades} tickers")
logging(logging_level="warning", message="Probably reason is non-trading-days eg. weekends")
logging(logging_level="warning", message="Used values from previous trade-day instead")
# Return date
return history_per_trade
except Exception as error_message:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed with error message: {error_message}")
return False
# CALC THE HISTORY PER TRADE & OVERALL
def calc_history_per_ticker(history_per_trade, tickers):
# ------------------ CREATE JSON OBJECT
# Create the json-dict
history_per_ticker = {}
# Logging & statistics
missing_day_entrys = 0
days_formated = 0
# As this history is so important, it is okay if this functions fails in total if errors araise
try:
# Loop over each date entry in the history
for date_entry in history_per_trade:
# Statistics
days_formated = days_formated +1
# Create a dict to store the results per day and ticker
dict_daily = {}
for ticker in tickers:
dict_daily[ticker] = {}
dict_daily[ticker]["current_invested"] = 0
dict_daily[ticker]["total_dividends"] = 0
dict_daily[ticker]["current_value"] = 0
dict_daily[ticker]["current_irr"] = 0
dict_daily[ticker]["current_irr"] = 0
dict_daily[ticker]["total_performanance"] = 0
dict_daily[ticker]["current_amount"] = 0 # Added only for ticker entries, not for the "total" value
dict_daily[ticker]["current_course"] = 0 # Added only for ticker entries, not for the "total" value
dict_daily["total"] = {}
dict_daily["total"]["current_invested"] = 0
dict_daily["total"]["total_dividends"] = 0
dict_daily["total"]["current_value"] = 0
dict_daily["total"]["current_irr"] = 0
dict_daily["total"]["current_irr"] = 0
dict_daily["total"]["total_performanance"] = 0
# Loop over each trade-entry for that day
for trade_id in history_per_trade[date_entry]:
# Extract data from the history_per_trade
trade_amount = history_per_trade[date_entry][trade_id]['current_amount']
trade_invested = history_per_trade[date_entry][trade_id]['current_invested']
trade_dividends = history_per_trade[date_entry][trade_id]['total_dividends']
trade_value = history_per_trade[date_entry][trade_id]['current_value']
trade_irr = history_per_trade[date_entry][trade_id]['current_irr']
trade_course = history_per_trade[date_entry][trade_id]['current_course']
trade_performanance = history_per_trade[date_entry][trade_id]['total_performanance']
# Lookup the ticker by the trade-id
ticker = trades[trade_id]["ticker"]
# Extract data from the history_per_ticker
ticker_amount = dict_daily[ticker]['current_amount']
ticker_invested = dict_daily[ticker]['current_invested']
ticker_dividends = dict_daily[ticker]['total_dividends']
ticker_value = dict_daily[ticker]['current_value']
ticker_irr = dict_daily[ticker]['current_irr']
ticker_performanance = dict_daily[ticker]['total_performanance']
# Overwrite the values in the history_per_ticker
dict_daily[ticker]['current_amount'] = ticker_amount + trade_amount # Simple addition works
dict_daily[ticker]['current_invested'] = ticker_invested + trade_invested
dict_daily[ticker]['total_dividends'] = ticker_dividends + trade_dividends
dict_daily[ticker]['current_value'] = ticker_value + trade_value
dict_daily[ticker]['total_performanance'] = ticker_performanance + trade_performanance
dict_daily[ticker]['current_course'] = trade_course # Simple overwrite is fine, as the course is the same for all trades
if ticker_invested == 0 and trade_invested == 0:
dict_daily[ticker]['current_irr'] = 0
# Catch 0 values
else:
dict_daily[ticker]['current_irr'] = (ticker_irr * ticker_invested + trade_irr * trade_invested) / (ticker_invested + trade_invested)
# --> IRR is adjusted based on the trade values. This way a trade of 25% of the initial trade volume has only a 25% influence on the irr
# Calculate the "total" entry after finishing with all the trades
for ticker in tickers:
# Same logic as above, but shortended code
dict_daily["total"]['total_dividends'] = dict_daily["total"]['total_dividends'] + dict_daily[ticker]['total_dividends']
dict_daily["total"]['current_value'] = dict_daily["total"]['current_value'] + dict_daily[ticker]['current_value']
dict_daily["total"]['total_performanance'] = dict_daily["total"]['total_performanance'] + dict_daily[ticker]['total_performanance']
# Extracting the values before rewriting them, to preserve them for the IRR calculation
total_invested = dict_daily["total"]['current_invested']
ticker_invested = dict_daily[ticker]['current_invested']
dict_daily["total"]['current_invested'] = total_invested + ticker_invested
# Extracting the values before rewriting them, to preserve them for the IRR calculation
if ticker_invested == 0 and total_invested == 0:
dict_daily["total"]['current_irr'] = 0
else:
total_irr = dict_daily["total"]['current_irr']
ticker_irr = dict_daily[ticker]['current_irr']
dict_daily["total"]['current_irr'] = (total_irr * total_invested + ticker_irr * ticker_invested) / (total_invested + ticker_invested)
# Finally, write the results for this day-entry to the history_per_ticker
history_per_ticker[date_entry] = dict_daily
# ------------------ LOGGING & DEBUGING
# Debugging
if config.selected_logging_level == "debug":
data = json.dumps(history_per_ticker, indent=2) # Converts a python-dictionary into a json
with open("history_per_ticker.json", "w") as f:
f.write(data)
# Success Logging
logging(logging_level="success")
logging(logging_level="info", message=f"created a history with {days_formated} days formated o_O")
return history_per_ticker
# Error Logging
except Exception as error_message:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed with error message: {error_message}")
return False
# --------------------------- #
# HISTORY SELECTION FUNCTIONS #
# --------------------------- #
# FILTER ANY HISTORY OBJECT TO SELECTED DATES
def filter_history_by_list(history, dates_list):
filtered_history = {}
try:
# Loop over all days
for index_date in history:
# Check, if the history-date is in the filter-list
if index_date in dates_list:
# If so, add this date-entry to the filtered history object
filtered_history[index_date] = history[index_date]
# Main Logging
logging(logging_level="success")
return filtered_history
except Exception as error_message:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed with error: {error_message}")
return False
# SELECT CURRENT VALUES PER TRADE
def select_current_value_per_trade(trades, history_per_trade):
# Logging
format_errors = 0
# Loop over all trades
for trade_id in trades:
try:
# Determine, what values to fetch based on whether the trade was closed already
date_closed = trades[trade_id]["date_close"]
if date_closed == 0:
# If trade still open, use performance data from today
index_date_iso = datetime.date.today().isoformat()
else:
# If trade closed, use performance data from close-date
index_date_iso = date_closed.isoformat()
# Fetch data from history and save for this trade
trades[trade_id]["course_current"] = history_per_trade[index_date_iso][trade_id]['current_course']
trades[trade_id]["irr"] = history_per_trade[index_date_iso][trade_id]['current_irr']
trades[trade_id]["dividends"] = history_per_trade[index_date_iso][trade_id]['total_dividends']
except:
format_errors = format_errors + 1
# Logging logging
if format_errors == 0:
logging(logging_level="success")
else:
logging(logging_level="warning")
logging(logging_level="warning", message=f"Failed updating the current value per trade in {format_errors} cases")
return trades
# SELECT CURRENT VALUES PER TICKER
def select_current_value_per_ticker(investments, history_per_ticker):
# Logging
format_errors = 0
# Loop over all investments
for investment_id in investments:
try:
# Generate the iso-date of today as the required index
index_date_iso = datetime.date.today().isoformat()
# Get the ticker corresponding to the investment
ticker = investments[investment_id]["ticker"]
# Select latest data from history and save for this investment
investments[investment_id]["total_dividends"] = history_per_ticker[index_date_iso][ticker]['total_dividends']
investments[investment_id]["current_value"] = history_per_ticker[index_date_iso][ticker]['current_value']
investments[investment_id]["current_irr"] = history_per_ticker[index_date_iso][ticker]['current_irr']
investments[investment_id]["total_performanance"] = history_per_ticker[index_date_iso][ticker]['total_performanance']
except:
format_errors = format_errors + 1
# Logging
if format_errors == 0:
logging(logging_level="success")
else:
logging(logging_level="warning")
logging(logging_level="warning", message=f"Failed updating the current value per ticker in {format_errors} cases")
return investments
# TRMNL CREATE IRR-UPDATE
def prep_trmnl_chart_udpate(history_to_show, series_to_show_1 = "total", data_to_show_1 = "current_value", series_to_show_2 = "bechnmark", data_to_show_2 = "current_value"): # default value = current invested
# Setup
dict_big_numbers = {}
charts_data = []
chart_1 = {}
chart_2 = {}
try:
# Fetch the latest date entry from the history
index_date_iso = fetch_last_key_from_dict(history_to_show)
# Select latest data from history for the big-numbers
current_value = history_to_show[index_date_iso]["total"]["current_value"]
total_performanance = history_to_show[index_date_iso]["total"]["total_performanance"]
current_irr = history_to_show[index_date_iso]["total"]["current_irr"]
current_irr = (current_irr -1) *100
# Round the nubers
dict_big_numbers["current_value"] = str(round(current_value, 0))
dict_big_numbers["total_performanance"] = str(round(total_performanance, 0))
dict_big_numbers["current_irr"] = str(round(current_irr, 2))
# Catching false inputs for the series to show
possible_series_to_show = list(history_to_show[index_date_iso].keys()) # Get a list of all the series values, that could be shown
if series_to_show_1 not in possible_series_to_show: # checks, if the selected series is not part of the history-object sent to the function
logging(logging_level="warning")
logging(logging_level="warning", message="Selecting 'total' as the series to show, as the input was not valid")
series_to_show_1 = "total"
if series_to_show_2 not in possible_series_to_show: # checks, if the selected series is not part of the history-object sent to the function
logging(logging_level="warning")
logging(logging_level="warning", message="Selecting 'total' as the series to show, as the input was not valid")
series_to_show_2 = "total"
# Catching false inputs for the data to show
possible_data_to_show = list(history_to_show[index_date_iso][series_to_show_1].keys())
if data_to_show_1 not in possible_data_to_show:
logging(logging_level="warning")
logging(logging_level="warning", message="Selecting 'current invested' as chart data, as the input was not valid")
data_to_show_1 = "current_value"
possible_data_to_show = list(history_to_show[index_date_iso][series_to_show_2].keys())
if data_to_show_2 not in possible_data_to_show:
logging(logging_level="warning")
logging(logging_level="warning", message="Selecting 'current invested' as chart data, as the input was not valid")
data_to_show_2 = "current_value"
# Create space for storing values
chart_1["data"] = []
chart_2["data"] = []
# Format the chart data into the right data
for date in history_to_show:
# Extract the value to be stored
value_to_show_1 = history_to_show[date][series_to_show_1][data_to_show_1]
value_to_show_2 = history_to_show[date][series_to_show_2][data_to_show_2]
# Catch the case irr and convert to percent
if data_to_show_1 == "current_irr":
value_to_show_1 = (value_to_show_1 -1) *100
if data_to_show_2 == "current_irr":
value_to_show_2 = (value_to_show_2 -1) *100
# Round to 2 decimal values
value_to_show_1 = round(value_to_show_1, 2)
value_to_show_2 = round(value_to_show_2, 2)
# Extend the date by a timestamp
json_date = datetime.date.fromisoformat(date) # Convert ISO-String to python date-object
json_date = datetime.datetime.combine(json_date, datetime.datetime.min.time()) # Combine the date with midnight (00:00:00) to create a datetime object
json_date = json_date.isoformat() # Convert back to ISO-String, now including a time
# Store the values together with the corresponding date
value_1 = [json_date, value_to_show_1]
value_2 = [json_date, value_to_show_2]
# Add the value pair to the list of values for this chart
chart_1["data"].append(value_1)
chart_2["data"].append(value_2)
# Add the two series to the list of series in the TRML object
charts_data.append(chart_1)
charts_data.append(chart_2)
# Generating nicer series titels
if series_to_show_1 == "total":
series_to_show_1 = "Portfolio"
if series_to_show_2 == "total":
series_to_show_2 = "Portfolio"
# Generating nicer data titels
data_to_show_1 = data_to_show_1.replace("_", " ").capitalize()
data_to_show_2 = data_to_show_2.replace("_", " ").capitalize()
# Increase look of IRR even more
# Funktioniert auch dann, wenn "irr" nicht vorkommt
data_to_show_1 = data_to_show_1.replace("irr", "IRR")
data_to_show_2 = data_to_show_2.replace("irr", "IRR")
# Generate the chat names / desciptions
chart_1["name"] = data_to_show_1 + " " + series_to_show_1
chart_2["name"] = data_to_show_2 + " " + series_to_show_2
# Construct the trmnl_object
trmnl_update_object = {}
trmnl_update_object["merge_variables"] = {}
trmnl_update_object["merge_variables"]["big_numbers"] = dict_big_numbers
trmnl_update_object["merge_variables"]["charts"] = charts_data
# Debugging
if config.selected_logging_level == "debug":
data = json.dumps(trmnl_update_object, indent=2) # Converts a python-dictionary into a json
with open("trmnl_update_object.json", "w") as f:
f.write(data)
# Main Logging
logging(logging_level="success")
return trmnl_update_object
except Exception as error_message:
logging(logging_level="error")
logging(logging_level="error", message=f"Failed with error: {error_message}")
return False