Spaces:
Runtime error
Runtime error
File size: 5,478 Bytes
642c876 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import requests
from datetime import date, timedelta, datetime
import time
import pandas as pd
from os.path import exists
import streamlit as st
@st.cache(persist=True)
def create_assets(total_coins=50):
'''
A function to retrieve info about the largest total_coins number of
cryptocurrencies, ranked by market cap, generated by a call to coincap assets
api.
'''
url = "https://api.coincap.io/v2/assets"
# N.B. here adampt the params dict to only request what you need
payload={'limit': total_coins}
headers = {}
assets_json = requests.request("GET", url, params=payload, headers=headers).json()
return assets_json
@st.cache(persist=True)
def gen_symbols(assets_json):
'''
Function to generate three lists: symbols, names and ids, from the result of
a call to the coincap assets api, assets_json.
'''
symbols_list = []
names_list = []
ids_list =[]
for dict in assets_json['data']:
symbols_list.append(dict['symbol'])
names_list.append(dict['name'])
ids_list.append(dict['id'])
return symbols_list, names_list, ids_list
@st.cache(persist=True, show_spinner=False)
def create_market_cap_dict(assets_json):
market_cap_dict = {}
for asset_dict in assets_json['data']:
market_cap_dict[asset_dict['id']] = int(float(asset_dict['marketCapUsd']))
return market_cap_dict
def load_histories(coin_ids, start, end):
'''
Function to load daily historic prices for all crypto currencies in the
coin_ids list within the time period defined by the interval [start, end].
'''
url = "http://api.coincap.io/v2/assets/{}/history"
payload={'interval':'d1', 'start':start, 'end':end}
headers = {}
histories_dict = {}
for id in coin_ids:
response_histories = requests.request("GET", url.format(id), headers=headers, params=payload)
histories_json = response_histories.json()
histories_dict[id] = histories_json['data']
return histories_dict
@st.cache(persist=True, show_spinner=False)
def date_conv(date):
'''
Function to convert string to datetime.date.
'''
return datetime.strptime(date, '%Y-%m-%d').date()
@st.cache(persist=True)
def create_unix_dates(today=date.today(), lookback_years = 5):
'''
A function to create start_unix and end_unix times in UNIX time in milliseconds
'''
start_datetime = today-timedelta(365*lookback_years)
start_unix = int(time.mktime(start_datetime.timetuple()) * 1000)
end_unix = int(time.mktime(date.today().timetuple()) * 1000)
return start_unix, end_unix
@st.cache(persist=True, show_spinner=False)
def create_histories_df(coin_ids, start_unix, end_unix):
'''
A function to create a dataframe of historical prices for all of the
crypto currencies in the coin_ids=ids list, over a period defined by the
interval [start_unix, end_unix].
N.B. This code uses the data for bitcoin as the first dataframe on which
other temp_df are outer joined from the right. This is because bitcoin
has the longest history.
'''
print('Downloading data from coincap.io, may take several minutes...')
# download histories from coincap.io
with st.spinner("You're the first user today so asset histories are being updated. May take several minutes."):
histories_dict = load_histories(coin_ids, start_unix, end_unix)
# convert all dates in histories_dict to python datetime.date objects and remove 'time' key
for id in coin_ids:
for dict in histories_dict[id]:
dict.pop('time')
dict['priceUsd']=float(dict['priceUsd'])
dict['date'] = date_conv(dict['date'][0:10])
# convert histories_dict to pd.DataFrame
histories_df = pd.json_normalize(histories_dict['bitcoin'])
histories_df = histories_df.set_index('date', drop=True)
for id in [x for x in coin_ids if x != "bitcoin"]:
temp_df = pd.json_normalize(histories_dict[id])
temp_df = temp_df.set_index('date', drop=True)
histories_df = histories_df.merge(temp_df, how='outer', left_index=True, right_index=True)
histories_df.columns = coin_ids
return histories_df
# N.B. allow_output_mutation set to True because in create_rebased_df I am
# deliberately chaning the value returns_df[start_date:start_date] to 0
# however I want the cahced value to remain unchanged so that if I rebase to a
# different start_date we go back to the orrignal returns_df.
@st.cache(persist=True, show_spinner=False, allow_output_mutation=True)
def create_returns_df(histories_df):
return histories_df.pct_change(1)
@st.cache(persist=True, show_spinner=False)
def create_rebased_df(returns_df, start_date, end_date):
returns_df[start_date:start_date]=0
return (1 + returns_df[start_date:end_date]).cumprod()
@st.cache(persist=True, show_spinner=False)
def date_range(end_date, lookback_years):
return [end_date - timedelta(x) for x in range(365 * lookback_years)][::-1]
@st.cache(persist=True, show_spinner=False)
def ids2names_dict(coin_ids, names):
ids2names_dict={}
for i, id in enumerate(coin_ids):
ids2names_dict[id] = names[i]
return ids2names_dict
@st.cache(persist=True, show_spinner=False)
def names2ids_dict(names, coin_ids):
names2ids_dict={}
for i, name in enumerate(names):
names2ids_dict[name] = coin_ids[i]
return names2ids_dict
@st.cache(persist=True, show_spinner=False)
def gen_rebased_df(histories_df, ids_with_histories, start_date, end_date):
returns_df = histories_df[ids_with_histories].pct_change(1)
returns_df[start_date:start_date]=0
return (1 + returns_df[start_date:end_date]).cumprod()
|