Spaces:
Runtime error
Runtime error
File size: 6,186 Bytes
642c876 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import pandas as pd
import plotly.express as px
import streamlit as st
from PIL import Image
import glob
from risk_metrics import absolute_return, annual_vol, max_drawdown
from streamlit_custom_slider import st_custom_slider
@st.cache(persist=True, show_spinner=False)
def create_rebase_chart(rebased_df, num_coins):
melt_df = pd.melt(rebased_df.iloc[:,:num_coins], ignore_index=False)
melt_df.columns=['coin','price (USD)']
return melt_df
@st.cache(persist=True, show_spinner=False)
def create_chart_df(all_returns_df, portfolio, coin):
melt_df = pd.melt(all_returns_df, value_vars=[portfolio, coin], ignore_index=False)
melt_df.columns=['Asset','Value (USD)']
return melt_df
@st.cache(persist=True, show_spinner=False)
def create_comparison_df(all_returns_df, selected_assets):
selected_assets_present = [asset for asset in selected_assets if asset in list(all_returns_df.columns)]
melt_df = pd.melt(all_returns_df, value_vars=selected_assets_present, ignore_index=False)
melt_df.columns=['Asset','Value (USD)']
return melt_df
@st.cache(persist=True, show_spinner=False)
def ordered_dict(dictionary):
return {k: v for k, v in sorted(dictionary.items(), key=lambda item: item[1], reverse=True)}
# allow output mutation in this function because I'm not worried about mutation
# and i want to reduce the time it takes streamlit to check it hasn't mutated.
@st.cache(persist=True, show_spinner=False, allow_output_mutation=True)
def load_images():
image_dict = {}
for filename in glob.glob('logos/*.jpg'): #assuming all logos are png format
im=Image.open(filename)
image_dict[filename[6:][:-4]]=im
return image_dict
@st.cache(persist=True, show_spinner=False)
def gen_performance_df(all_returns_df, market_cap_dict, strategy_dict):
assets = all_returns_df.columns
performance_df = pd.DataFrame(index = assets)
performance_df['Type'] = ["Portfolio" if x in list(strategy_dict.keys()) else "Coin" for x in assets]
abs_return = all_returns_df.apply(absolute_return)
ann_vol = all_returns_df.apply(annual_vol)
drawdown_triples = all_returns_df.apply(max_drawdown)
sharpe = abs_return.divide(ann_vol)
market_caps=[]
for asset in assets:
try:
market_caps.append(int(market_cap_dict[asset]))
except:
market_caps.append(0)
performance_df['Total return %'] = abs_return * 100
performance_df['Risk / return'] = sharpe *100
performance_df['Annual vol'] = ann_vol *100
performance_df['Max loss %'] = drawdown_triples.iloc[0] *100
performance_df['Market cap $M'] = [cap/1000000 for cap in market_caps]
return performance_df
@st.cache(persist=True, show_spinner=False)
def gen_performance_ag_df(all_returns_df, market_cap_dict, strategy_dict):
assets = all_returns_df.columns
performance_df = pd.DataFrame(index=assets)
performance_df['Asset'] = assets
performance_df['Type'] = ["Portfolio" if x in list(strategy_dict.keys()) else "Coin" for x in assets]
abs_return = all_returns_df.apply(absolute_return)
ann_vol = all_returns_df.apply(annual_vol)
drawdown_triples = all_returns_df.apply(max_drawdown)
sharpe = abs_return.divide(ann_vol)
market_caps=[]
for asset in assets:
try:
market_caps.append(int(market_cap_dict[asset]))
except:
market_caps.append(0)
performance_df['Risk adjusted return %'] = sharpe *100
performance_df['Return over period %'] = abs_return * 100
performance_df['Annual volatility'] = ann_vol *100
performance_df['Max loss %'] = drawdown_triples.iloc[0] *100
performance_df['Market cap $M'] = [cap/1000000 for cap in market_caps]
return performance_df
@st.cache(persist=True, show_spinner=False)
def add_drawdown(fig, all_returns_df, selected_asset):
#calculate max drawdown
max_dd, start_idx, end_idx = max_drawdown(all_returns_df[selected_asset])
start_dd = all_returns_df.index[start_idx]
end_dd = all_returns_df.index[end_idx]
fig.add_vline(x=start_dd, line_width=1, line_color="red")
fig.add_vline(x=end_dd, line_width=1, line_color="red")
fig.add_vrect(x0=start_dd, x1=end_dd, line_width=0, fillcolor="red", opacity=0.05, annotation_text=selected_asset + " maxdd")
return fig, max_dd, start_dd, end_dd
def write_coins(non_zero_coins, weights_dict, ids2names_dict, n_cols=2):
n_coins = len(non_zero_coins)
n_rows = 1 + n_coins // int(n_cols)
rows = [st.container() for _ in range(n_rows)]
cols_per_row = [r.columns(n_cols) for r in rows]
cols = [column for row in cols_per_row for column in row]
#cols = st.columns(n_coins)
#checkboxes=[]
for i, coin_id in enumerate(non_zero_coins):
cols[i].slider(ids2names_dict[coin_id], min_value=0, max_value=100,
value=int(weights_dict[coin_id]*100), key=coin_id,
disabled=True)
def write_bespoke_coins(coin_names, n_cols=2):
n_coins = len(coin_names)
n_rows = 1 + n_coins // int(n_cols)
rows = [st.container() for _ in range(n_rows)]
cols_per_row = [r.columns(n_cols) for r in rows]
cols = [column for row in cols_per_row for column in row]
#cols = st.columns(n_coins)
#checkboxes=[]
weights_list = []
for i, coin_name in enumerate(coin_names):
weight = cols[i].slider(coin_name, min_value=0, max_value=100,
value=50, key=coin_name,
disabled=False)
weights_list.append(weight)
weights_list = [weight/sum(weights_list) for weight in weights_list]
return weights_list
def write_coins_custom(coin_names, n_cols=2):
n_coins = len(coin_names)
n_rows = 1 + n_coins // int(n_cols)
rows = [st.container() for _ in range(n_rows)]
cols_per_row = [r.columns(n_cols) for r in rows]
cols = [column for row in cols_per_row for column in row]
#cols = st.columns(n_coins)
#checkboxes=[]
weights_list = []
for i, coin_name in enumerate(coin_names):
with cols[i]:
weight = st_custom_slider(coin_name, min_value=0, max_value=100,
value=50, key=coin_name)
weights_list.append(weight)
weights_list = [weight/sum(weights_list) for weight in weights_list]
return weights_list
@st.cache(persist=True, show_spinner=False)
def get_pre_selected_idx(assets, pre_selected):
return [i for i in range(len(assets)) if assets[i] in pre_selected]
|