cryptoTester / plot_creator.py
SamHastings1088's picture
first commit
642c876
import pandas as pd
import plotly.express as px
import streamlit as st
from PIL import Image
import glob
from risk_metrics import absolute_return, annual_vol, max_drawdown
from streamlit_custom_slider import st_custom_slider
@st.cache(persist=True, show_spinner=False)
def create_rebase_chart(rebased_df, num_coins):
melt_df = pd.melt(rebased_df.iloc[:,:num_coins], ignore_index=False)
melt_df.columns=['coin','price (USD)']
return melt_df
@st.cache(persist=True, show_spinner=False)
def create_chart_df(all_returns_df, portfolio, coin):
melt_df = pd.melt(all_returns_df, value_vars=[portfolio, coin], ignore_index=False)
melt_df.columns=['Asset','Value (USD)']
return melt_df
@st.cache(persist=True, show_spinner=False)
def create_comparison_df(all_returns_df, selected_assets):
selected_assets_present = [asset for asset in selected_assets if asset in list(all_returns_df.columns)]
melt_df = pd.melt(all_returns_df, value_vars=selected_assets_present, ignore_index=False)
melt_df.columns=['Asset','Value (USD)']
return melt_df
@st.cache(persist=True, show_spinner=False)
def ordered_dict(dictionary):
return {k: v for k, v in sorted(dictionary.items(), key=lambda item: item[1], reverse=True)}
# allow output mutation in this function because I'm not worried about mutation
# and i want to reduce the time it takes streamlit to check it hasn't mutated.
@st.cache(persist=True, show_spinner=False, allow_output_mutation=True)
def load_images():
image_dict = {}
for filename in glob.glob('logos/*.jpg'): #assuming all logos are png format
im=Image.open(filename)
image_dict[filename[6:][:-4]]=im
return image_dict
@st.cache(persist=True, show_spinner=False)
def gen_performance_df(all_returns_df, market_cap_dict, strategy_dict):
assets = all_returns_df.columns
performance_df = pd.DataFrame(index = assets)
performance_df['Type'] = ["Portfolio" if x in list(strategy_dict.keys()) else "Coin" for x in assets]
abs_return = all_returns_df.apply(absolute_return)
ann_vol = all_returns_df.apply(annual_vol)
drawdown_triples = all_returns_df.apply(max_drawdown)
sharpe = abs_return.divide(ann_vol)
market_caps=[]
for asset in assets:
try:
market_caps.append(int(market_cap_dict[asset]))
except:
market_caps.append(0)
performance_df['Total return %'] = abs_return * 100
performance_df['Risk / return'] = sharpe *100
performance_df['Annual vol'] = ann_vol *100
performance_df['Max loss %'] = drawdown_triples.iloc[0] *100
performance_df['Market cap $M'] = [cap/1000000 for cap in market_caps]
return performance_df
@st.cache(persist=True, show_spinner=False)
def gen_performance_ag_df(all_returns_df, market_cap_dict, strategy_dict):
assets = all_returns_df.columns
performance_df = pd.DataFrame(index=assets)
performance_df['Asset'] = assets
performance_df['Type'] = ["Portfolio" if x in list(strategy_dict.keys()) else "Coin" for x in assets]
abs_return = all_returns_df.apply(absolute_return)
ann_vol = all_returns_df.apply(annual_vol)
drawdown_triples = all_returns_df.apply(max_drawdown)
sharpe = abs_return.divide(ann_vol)
market_caps=[]
for asset in assets:
try:
market_caps.append(int(market_cap_dict[asset]))
except:
market_caps.append(0)
performance_df['Risk adjusted return %'] = sharpe *100
performance_df['Return over period %'] = abs_return * 100
performance_df['Annual volatility'] = ann_vol *100
performance_df['Max loss %'] = drawdown_triples.iloc[0] *100
performance_df['Market cap $M'] = [cap/1000000 for cap in market_caps]
return performance_df
@st.cache(persist=True, show_spinner=False)
def add_drawdown(fig, all_returns_df, selected_asset):
#calculate max drawdown
max_dd, start_idx, end_idx = max_drawdown(all_returns_df[selected_asset])
start_dd = all_returns_df.index[start_idx]
end_dd = all_returns_df.index[end_idx]
fig.add_vline(x=start_dd, line_width=1, line_color="red")
fig.add_vline(x=end_dd, line_width=1, line_color="red")
fig.add_vrect(x0=start_dd, x1=end_dd, line_width=0, fillcolor="red", opacity=0.05, annotation_text=selected_asset + " maxdd")
return fig, max_dd, start_dd, end_dd
def write_coins(non_zero_coins, weights_dict, ids2names_dict, n_cols=2):
n_coins = len(non_zero_coins)
n_rows = 1 + n_coins // int(n_cols)
rows = [st.container() for _ in range(n_rows)]
cols_per_row = [r.columns(n_cols) for r in rows]
cols = [column for row in cols_per_row for column in row]
#cols = st.columns(n_coins)
#checkboxes=[]
for i, coin_id in enumerate(non_zero_coins):
cols[i].slider(ids2names_dict[coin_id], min_value=0, max_value=100,
value=int(weights_dict[coin_id]*100), key=coin_id,
disabled=True)
def write_bespoke_coins(coin_names, n_cols=2):
n_coins = len(coin_names)
n_rows = 1 + n_coins // int(n_cols)
rows = [st.container() for _ in range(n_rows)]
cols_per_row = [r.columns(n_cols) for r in rows]
cols = [column for row in cols_per_row for column in row]
#cols = st.columns(n_coins)
#checkboxes=[]
weights_list = []
for i, coin_name in enumerate(coin_names):
weight = cols[i].slider(coin_name, min_value=0, max_value=100,
value=50, key=coin_name,
disabled=False)
weights_list.append(weight)
weights_list = [weight/sum(weights_list) for weight in weights_list]
return weights_list
def write_coins_custom(coin_names, n_cols=2):
n_coins = len(coin_names)
n_rows = 1 + n_coins // int(n_cols)
rows = [st.container() for _ in range(n_rows)]
cols_per_row = [r.columns(n_cols) for r in rows]
cols = [column for row in cols_per_row for column in row]
#cols = st.columns(n_coins)
#checkboxes=[]
weights_list = []
for i, coin_name in enumerate(coin_names):
with cols[i]:
weight = st_custom_slider(coin_name, min_value=0, max_value=100,
value=50, key=coin_name)
weights_list.append(weight)
weights_list = [weight/sum(weights_list) for weight in weights_list]
return weights_list
@st.cache(persist=True, show_spinner=False)
def get_pre_selected_idx(assets, pre_selected):
return [i for i in range(len(assets)) if assets[i] in pre_selected]