Spaces:
Runtime error
Runtime error
import requests | |
from datetime import date, timedelta, datetime | |
import time | |
import pandas as pd | |
from os.path import exists | |
import streamlit as st | |
def create_assets(total_coins=50): | |
''' | |
A function to retrieve info about the largest total_coins number of | |
cryptocurrencies, ranked by market cap, generated by a call to coincap assets | |
api. | |
''' | |
url = "https://api.coincap.io/v2/assets" | |
# N.B. here adampt the params dict to only request what you need | |
payload={'limit': total_coins} | |
headers = {} | |
assets_json = requests.request("GET", url, params=payload, headers=headers).json() | |
return assets_json | |
def gen_symbols(assets_json): | |
''' | |
Function to generate three lists: symbols, names and ids, from the result of | |
a call to the coincap assets api, assets_json. | |
''' | |
symbols_list = [] | |
names_list = [] | |
ids_list =[] | |
for dict in assets_json['data']: | |
symbols_list.append(dict['symbol']) | |
names_list.append(dict['name']) | |
ids_list.append(dict['id']) | |
return symbols_list, names_list, ids_list | |
def create_market_cap_dict(assets_json): | |
market_cap_dict = {} | |
for asset_dict in assets_json['data']: | |
market_cap_dict[asset_dict['id']] = int(float(asset_dict['marketCapUsd'])) | |
return market_cap_dict | |
def load_histories(coin_ids, start, end): | |
''' | |
Function to load daily historic prices for all crypto currencies in the | |
coin_ids list within the time period defined by the interval [start, end]. | |
''' | |
url = "http://api.coincap.io/v2/assets/{}/history" | |
payload={'interval':'d1', 'start':start, 'end':end} | |
headers = {} | |
histories_dict = {} | |
for id in coin_ids: | |
response_histories = requests.request("GET", url.format(id), headers=headers, params=payload) | |
histories_json = response_histories.json() | |
histories_dict[id] = histories_json['data'] | |
return histories_dict | |
def date_conv(date): | |
''' | |
Function to convert string to datetime.date. | |
''' | |
return datetime.strptime(date, '%Y-%m-%d').date() | |
def create_unix_dates(today=date.today(), lookback_years = 5): | |
''' | |
A function to create start_unix and end_unix times in UNIX time in milliseconds | |
''' | |
start_datetime = today-timedelta(365*lookback_years) | |
start_unix = int(time.mktime(start_datetime.timetuple()) * 1000) | |
end_unix = int(time.mktime(date.today().timetuple()) * 1000) | |
return start_unix, end_unix | |
def create_histories_df(coin_ids, start_unix, end_unix): | |
''' | |
A function to create a dataframe of historical prices for all of the | |
crypto currencies in the coin_ids=ids list, over a period defined by the | |
interval [start_unix, end_unix]. | |
N.B. This code uses the data for bitcoin as the first dataframe on which | |
other temp_df are outer joined from the right. This is because bitcoin | |
has the longest history. | |
''' | |
print('Downloading data from coincap.io, may take several minutes...') | |
# download histories from coincap.io | |
with st.spinner("You're the first user today so asset histories are being updated. May take several minutes."): | |
histories_dict = load_histories(coin_ids, start_unix, end_unix) | |
# convert all dates in histories_dict to python datetime.date objects and remove 'time' key | |
for id in coin_ids: | |
for dict in histories_dict[id]: | |
dict.pop('time') | |
dict['priceUsd']=float(dict['priceUsd']) | |
dict['date'] = date_conv(dict['date'][0:10]) | |
# convert histories_dict to pd.DataFrame | |
histories_df = pd.json_normalize(histories_dict['bitcoin']) | |
histories_df = histories_df.set_index('date', drop=True) | |
for id in [x for x in coin_ids if x != "bitcoin"]: | |
temp_df = pd.json_normalize(histories_dict[id]) | |
temp_df = temp_df.set_index('date', drop=True) | |
histories_df = histories_df.merge(temp_df, how='outer', left_index=True, right_index=True) | |
histories_df.columns = coin_ids | |
return histories_df | |
# N.B. allow_output_mutation set to True because in create_rebased_df I am | |
# deliberately chaning the value returns_df[start_date:start_date] to 0 | |
# however I want the cahced value to remain unchanged so that if I rebase to a | |
# different start_date we go back to the orrignal returns_df. | |
def create_returns_df(histories_df): | |
return histories_df.pct_change(1) | |
def create_rebased_df(returns_df, start_date, end_date): | |
returns_df[start_date:start_date]=0 | |
return (1 + returns_df[start_date:end_date]).cumprod() | |
def date_range(end_date, lookback_years): | |
return [end_date - timedelta(x) for x in range(365 * lookback_years)][::-1] | |
def ids2names_dict(coin_ids, names): | |
ids2names_dict={} | |
for i, id in enumerate(coin_ids): | |
ids2names_dict[id] = names[i] | |
return ids2names_dict | |
def names2ids_dict(names, coin_ids): | |
names2ids_dict={} | |
for i, name in enumerate(names): | |
names2ids_dict[name] = coin_ids[i] | |
return names2ids_dict | |
def gen_rebased_df(histories_df, ids_with_histories, start_date, end_date): | |
returns_df = histories_df[ids_with_histories].pct_change(1) | |
returns_df[start_date:start_date]=0 | |
return (1 + returns_df[start_date:end_date]).cumprod() | |