mohcineelharras's picture
working scheduler
349c960
raw
history blame
8.14 kB
# ---------------------- Library Imports ----------------------
import time
import os
import json
import pandas as pd
import numpy as np
import logging
import requests
from dotenv import load_dotenv
from requests import Session
# ---------------------- Environment Variables ----------------------
load_dotenv()
# Etherscan API
url_etherscan = os.getenv("URL_ETHERSCAN")
api_key_etherscan = os.getenv("API_KEY_ETHERSCAN")
# CoinMarketCap API
url_cmc = os.getenv("URL_CMC")
api_key_cmc = os.getenv("API_KEY_CMC")
# Logging
log_folder = os.getenv("LOG_FOLDER")
os.makedirs(log_folder, exist_ok=True)
log_file = os.path.join(log_folder, "scrapping.log")
log_format = "%(asctime)s [%(levelname)s] - %(message)s"
logging.basicConfig(filename=log_file, level=logging.INFO, format=log_format)
# Load the JSON file into a dictionary
with open("ressources/dict_tokens_addr.json", "r") as file:
dict_addresses = json.load(file)
L_created = []
L_updated = []
# Define the number of blocks to retrieve transactions from
n_blocks = 20000
n_loop = n_blocks // 10_000
# ---------------------- Processing ----------------------
# Helper function for logging execution time
def log_execution_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
logging.info(f"Function {func.__name__} executed in {end_time - start_time:.2f} seconds")
return result
return wrapper
@log_execution_time
# function 1: Fetch and Update Etherscan Data
def fetch_and_update_etherscan():
for tokenSymbol, contractAddr in dict_addresses.items():
file = f"output/transactions_{tokenSymbol}.csv"
if not os.path.exists(file):
L_created.append(file)
df_transactions = get_coin_data(contractAddr, n_loop)
df_transactions_no_dup = df_transactions.drop(["confirmations", "input"], axis=1).drop_duplicates(subset="hash")
df_transactions_no_dup.to_csv(file, sep=",", index=False)
else:
L_updated.append(file)
df_temp = pd.read_csv(file, sep=",")
df_temp = df_temp.sort_values("blockNumber", ascending=False)
start_block = df_temp["blockNumber"].iloc[0]
# Retrieve latest block number and calculate the difference
latest_block_number, diff = latest_block(start_block)
if latest_block_number is None:
logging.error(f"Failed to retrieve latest block number for token: {tokenSymbol}")
continue # Skip to the next token if the latest block number could not be retrieved
n_loop_to_concat = (diff // 10000) + 1
df_transactions = get_coin_data(contractAddr, n_loop_to_concat)
df_latest = pd.concat([df_transactions, df_temp]).drop(["confirmations", "input"], axis=1)
df_latest_no_dup = df_latest.drop_duplicates(subset="hash")
df_latest_no_dup.loc[:, "blockNumber"] = df_latest_no_dup["blockNumber"].astype(int)
df_latest_no_dup = df_latest_no_dup.sort_values(by="blockNumber")
df_latest_no_dup.to_csv(file, sep=",", index=False)
logging.info("Created files: " + ", ".join(L_created))
logging.info("Updated files: " + ", ".join(L_updated))
logging.info("Script execution completed.")
# Helper function to get latest block number
def latest_block(start_block=None):
params = {
"module": "proxy",
"action": "eth_blockNumber",
"apikey": api_key_etherscan
}
response = requests.get(url_etherscan, params=params)
if response.status_code == 200:
try:
latest_block_number = int(response.json()["result"], 16)
if start_block is not None:
return latest_block_number, latest_block_number - start_block
return latest_block_number
except (ValueError, KeyError):
logging.error(f"Invalid response format or missing data in response: {response.json()}")
return None, None
else:
logging.error(f"API call failed with status code {response.status_code}: {response.json()}")
return None, None
def get_coin_data(contractAddr, n):
latest_block_number = latest_block()
if latest_block_number is None:
logging.error(f"Could not retrieve latest block number for contract address {contractAddr}")
return pd.DataFrame() # Return an empty DataFrame
df_transactions = pd.DataFrame()
transactions_per_call = 10_000
for i in range(n):
start_block = latest_block_number - (n - i) * transactions_per_call
end_block = latest_block_number - (n - 1 - i) * transactions_per_call
params = {
"module": "account",
"action": "tokentx",
"contractaddress": contractAddr,
"startblock": start_block,
"endblock": end_block,
"sort": "asc",
"apikey": api_key_etherscan
}
response = requests.get(url_etherscan, params=params)
transactions = response.json().get("result", [])
# Check if transactions is a list of dictionaries
if not isinstance(transactions, list) or not all(isinstance(item, dict) for item in transactions):
logging.error(f"Invalid data format for transactions: {transactions}")
continue # Skip this iteration if transactions data is invalid
df_temp = pd.DataFrame(transactions)
if not df_temp.empty:
df_transactions = pd.concat([df_transactions, df_temp])
time.sleep(1)
if 'timeStamp' in df_transactions:
df_transactions['timeStamp'] = pd.to_datetime(df_transactions['timeStamp'].astype(int), unit='s')
else:
logging.error("'timeStamp' key not found in the response data.")
return pd.DataFrame() # Return an empty DataFrame if key is missing
df_transactions['value'] = df_transactions['value'].astype(float) / 1e18
return df_transactions
# function 2: Fetch and Process CMC Data
@log_execution_time
def fetch_and_process_cmc_data():
session = Session()
session.headers.update({
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': api_key_cmc,
})
parameters = {
'start': '1',
'limit': '100',
'convert': 'USD'
}
for endpoint in ["v1/cryptocurrency/listings/latest"]:
target = f"{url_cmc}/{endpoint}"
try:
response = session.get(target, params=parameters)
data = json.loads(response.text)
with open(f'output/cmc_data_{endpoint.replace("/", "_")}_100.json', 'w') as f:
json.dump(data, f)
process_cmc_data(data, '100')
except (ConnectionError, Timeout, TooManyRedirects) as e:
logging.error(f"Error while fetching data from {target}: {e}")
def process_cmc_data(data, stop):
df = pd.DataFrame(data["data"])[["name", "symbol", "circulating_supply", "total_supply", "quote"]]
quote_df = pd.json_normalize(df['quote'].apply(lambda x: x['USD']))[["price", "percent_change_24h", "percent_change_7d", "percent_change_90d", "market_cap", "fully_diluted_market_cap", "last_updated"]]
df = df.drop("quote", axis=1)
df["percent_tokens_circulation"] = np.round((df["circulating_supply"] / df["total_supply"]) * 100, 1)
df = df.join(quote_df)
df["last_updated"] = pd.to_datetime(df["last_updated"])
save_cmc_data(df, stop)
def save_cmc_data(df, stop):
output_file = f"output/top_{stop}_update.csv"
if os.path.isfile(output_file):
existing_data = pd.read_csv(output_file)
updated_data = pd.concat([existing_data, df], axis=0, ignore_index=True)
updated_data.drop_duplicates(subset=["symbol", "last_updated"], inplace=True)
updated_data.to_csv(output_file, index=False)
else:
df.to_csv(output_file, index=False)
logging.info("CMC data script execution completed.")
# ---------------------- Execution ----------------------
if __name__ == "__main__":
fetch_and_update_etherscan()
fetch_and_process_cmc_data()