Spaces:
Runtime error
Runtime error
File size: 5,758 Bytes
642c876 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
import pandas as pd
import streamlit as st
from pypfopt import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
from datetime import date, timedelta
@st.cache(persist=True, show_spinner=False)
def uniform(returns_df, num_coins, start_date, end_date):
# THERE IS AN ERROR
# Need to change this from num_coins being a number to investment_set being
# a list of assets available. otherwise there could be assets in your
# portfolio that are not included in your investment set graph
'''
A function to return a uniform distribution of weights across all assets with
a full returns history (no NaN values) between start_date and end_date.
Returns:
weights: a vector of weights of dimension num_coins.
investment_cols: a vector of column names for coins with full histories.
'''
investment_df = returns_df[start_date:end_date]
investment_df.dropna(axis=1, inplace=True) # drop cols with any NaN values
investment_cols = investment_df.columns[0:num_coins]
weights = [1/num_coins for _ in range(num_coins)]
return weights, investment_cols
@st.cache(persist=True, show_spinner=False)
def markowitz(returns_df):
pass
@st.cache(persist=True, show_spinner=False, allow_output_mutation=True)
def create_port_rtns(returns_df, weights, investment_cols, start_date, end_date):
investment_df = returns_df[investment_cols]
investment_df[start_date:start_date]=0
rebased_df = (1 + investment_df[start_date:end_date]).cumprod()
port_returns = rebased_df.dot(weights)
port_returns.index.name = 'date'
port_returns.name = 'price (USD)'
return port_returns
@st.cache(persist=True, show_spinner=False)
def markowitz_weights(histories_df,start_port_date,investment_cols, analysis_days=365):
start_analysis_date = start_port_date - timedelta(analysis_days)
analysis_df = histories_df[start_analysis_date:start_port_date][investment_cols]
# Calculate expected returns and sample covariance
mu = expected_returns.mean_historical_return(analysis_df)
S = risk_models.sample_cov(analysis_df)
# Optimize for maximal Sharpe ratio
attempts=0
while attempts < 50:
try:
ef = EfficientFrontier(mu, S, weight_bounds=(0, 1))
ef.max_sharpe()
break
except Exception as e:
attempts += 1
try:
cleaned_weights = ef.clean_weights()
except Exception as e:
print("Could not find optimal solution, try changing optimisation constraints or investment set")
return cleaned_weights
@st.cache(persist=True, show_spinner=False)
def create_weights_df(weights_dict, strategy):
return pd.DataFrame({
'strategy': strategy,
'assets': list(weights_dict.keys()),
'weights': list(weights_dict.values())
})
@st.cache(persist=True, show_spinner=False)
def ids_with_histories(histories_df, start_date, end_date):
investment_df = histories_df[start_date:end_date]
investment_df.dropna(axis=1, inplace=True) # drop cols with any NaN values
return investment_df.columns
@st.cache(persist=True, show_spinner=False)
def uniform_weights_dict(ids_with_histories):
weight = 1/len(ids_with_histories)
uniform_weights_dict = {}
for id in ids_with_histories:
uniform_weights_dict[id] = weight
return uniform_weights_dict
@st.cache(persist=True, show_spinner=False)
def markowitz_weights_dict(histories_df,start_port_date,ids_with_histories, analysis_days=365):
start_analysis_date = start_port_date - timedelta(analysis_days)
analysis_df = histories_df[start_analysis_date:start_port_date][ids_with_histories]
# Calculate expected returns and sample covariance
mu = expected_returns.mean_historical_return(analysis_df)
S = risk_models.sample_cov(analysis_df)
# Optimize for maximal Sharpe ratio
attempts=0
while attempts < 10:
try:
ef = EfficientFrontier(mu, S, weight_bounds=(0, 1))
ef.max_sharpe()
break
except Exception as e:
attempts += 1
try:
cleaned_weights = ef.clean_weights()
except Exception as e:
print("Could not find optimal solution, try changing optimisation constraints or investment set")
return {k: v for k, v in sorted(cleaned_weights.items(), key=lambda item: item[1], reverse=True)}
#return cleaned_weights
@st.cache(persist=True, show_spinner=False)
def gen_port_rtns(rebased_df, weights_dict):
new_weights_dict = {k: v for k, v in weights_dict.items() if k in rebased_df.columns}
new_weights_dict = {k: v/sum(new_weights_dict.values()) for k, v in new_weights_dict.items()}
return rebased_df[list(new_weights_dict.keys())].dot(list(new_weights_dict.values()))
#return rebased_df[list(weights_dict.keys())].dot(list(weights_dict.values()))
@st.cache(persist=True, show_spinner=False)
def gen_all_returns(rebased_df, ids_with_histories, strategy_dict):
'''
A function to generate returns for all portfolios and all coins with full
histories over the backtest period, rebased to the start of the backtest
period.
'''
port_returns = gen_port_rtns(rebased_df, strategy_dict['Uniform'])
port_returns = pd.DataFrame({'Uniform': port_returns})
temp_dict = {k: v for k, v in strategy_dict.items() if k != 'Uniform'}
if len(temp_dict)!=0:
for name, weights in temp_dict.items():
temp_returns = gen_port_rtns(rebased_df, weights)
temp_returns.name = name
port_returns = port_returns.join(temp_returns)
return port_returns.join(rebased_df[ids_with_histories])
#uniform_returns = gen_port_rtns(rebased_df, uniform_weights_dict)
#uniform_returns.name = "Uniform"
#markowitz_returns = gen_port_rtns(rebased_df, markowitz_weights_dict)
#markowitz_returns.name = "Markowitz"
#port_returns = uniform_returns.to_frame().join(markowitz_returns)
#return port_returns.join(rebased_df[ids_with_histories])
|