Spaces:
Sleeping
Sleeping
#-------------------------------------libraries ---------------------------------- | |
from requests import Request, Session | |
from requests.exceptions import ConnectionError, Timeout, TooManyRedirects | |
import json | |
import os | |
import pandas as pd | |
import numpy as np | |
import logging | |
from dotenv import load_dotenv | |
load_dotenv() | |
#-------------------------------------env vars---------------------------------- | |
url = os.getenv("URL_CMC") | |
endpoints = ["v1/cryptocurrency/listings/latest", | |
#"/v1/cryptocurrency/trending/latest", | |
] | |
start = "1" | |
stop = "100" | |
parameters = { | |
'start':start, | |
'limit':stop, | |
'convert':'USD' | |
} | |
headers = { | |
'Accepts': 'application/json', | |
'X-CMC_PRO_API_KEY': os.getenv("API_KEY_CMC"), | |
} | |
# Configure the logging settings | |
log_folder = "./logs/scrapping/" | |
os.makedirs(log_folder, exist_ok=True) # Ensure the log folder exists | |
log_file = os.path.join(log_folder, "scrapping.log") | |
log_format = "%(asctime)s [%(levelname)s] - %(message)s" | |
logging.basicConfig(filename=log_file, level=logging.INFO, format=log_format) | |
#-------------------------------------api call---------------------------------- | |
session = Session() | |
session.headers.update(headers) | |
for endpoint in endpoints: | |
target = f"{url}/{endpoint}" | |
try: | |
response = session.get(target, params=parameters) | |
data = json.loads(response.text) | |
with open(f'output/cmc_data_{endpoint.replace("/", "_")}_{stop}.json', 'w') as f: | |
json.dump(data, f) | |
logging.info(f"Successfully fetched data from {target}") | |
except (ConnectionError, Timeout, TooManyRedirects) as e: | |
logging.error(f"Error while fetching data from {target}: {e}") | |
#-------------------------------------process data---------------------------------- | |
# create data frame with chosen columns | |
df = pd.DataFrame(data["data"])[["name","symbol","circulating_supply","total_supply","quote"]] | |
# explode column quote then chose columns | |
quote_df = pd.json_normalize(df['quote'].apply(lambda x: x['USD']))[["price","percent_change_24h","percent_change_7d","percent_change_90d","market_cap","fully_diluted_market_cap","last_updated"]] | |
# drop quote | |
df = df.drop("quote",axis=1) | |
# create features | |
df["percent_tokens_circulation"] = np.round((df["circulating_supply"]/df["total_supply"])*100,1) | |
# merge dataframe | |
df = df.join(quote_df) | |
df["last_updated"] = pd.to_datetime(df["last_updated"]) | |
df.to_csv(f"output/top_{stop}_update.csv") | |
#-------------------------------------save data---------------------------------- | |
# Check if the file exists | |
output_file = f"output/top_{stop}_update.csv" | |
if os.path.isfile(output_file): | |
logging.info("Updating dataset"+f"top_{stop}_update"+". ") | |
# Read the existing data | |
existing_data = pd.read_csv(output_file) | |
# Concatenate the existing data with the new data vertically | |
updated_data = pd.concat([existing_data, df], axis=0, ignore_index=True) | |
# Remove duplicates (if any) based on a unique identifier column | |
updated_data.drop_duplicates(subset=["symbol", "last_updated"], inplace=True) | |
# Save the updated data back to the same file | |
updated_data.to_csv(output_file, index=False) | |
else: | |
# If the file doesn't exist, save the current data to it | |
df.to_csv(output_file, index=False) | |
logging.info("Script execution completed.") | |
#-------------------------------------end---------------------------------- |