# ---------------------- Library Imports ---------------------- import time import os import json import pandas as pd import numpy as np import logging import requests from dotenv import load_dotenv from requests import Session # ---------------------- Environment Variables ---------------------- load_dotenv() # Etherscan API url_etherscan = os.getenv("URL_ETHERSCAN") api_key_etherscan = os.getenv("API_KEY_ETHERSCAN") # CoinMarketCap API url_cmc = os.getenv("URL_CMC") api_key_cmc = os.getenv("API_KEY_CMC") # Logging log_folder = os.getenv("LOG_FOLDER") os.makedirs(log_folder, exist_ok=True) log_file = os.path.join(log_folder, "scrapping.log") log_format = "%(asctime)s [%(levelname)s] - %(message)s" logging.basicConfig(filename=log_file, level=logging.INFO, format=log_format) # Load the JSON file into a dictionary with open("ressources/dict_tokens_addr.json", "r") as file: dict_addresses = json.load(file) L_created = [] L_updated = [] # Define the number of blocks to retrieve transactions from n_blocks = 20000 n_loop = n_blocks // 10_000 # ---------------------- Processing ---------------------- # Helper function for logging execution time def log_execution_time(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() logging.info(f"Function {func.__name__} executed in {end_time - start_time:.2f} seconds") return result return wrapper @log_execution_time # function 1: Fetch and Update Etherscan Data def fetch_and_update_etherscan(): for tokenSymbol, contractAddr in dict_addresses.items(): file = f"output/transactions_{tokenSymbol}.csv" if not os.path.exists(file): L_created.append(file) df_transactions = get_coin_data(contractAddr, n_loop) df_transactions_no_dup = df_transactions.drop(["confirmations", "input"], axis=1).drop_duplicates(subset="hash") df_transactions_no_dup.to_csv(file, sep=",", index=False) else: L_updated.append(file) df_temp = pd.read_csv(file, sep=",") df_temp = df_temp.sort_values("blockNumber", ascending=False) start_block = df_temp["blockNumber"].iloc[0] # Retrieve latest block number and calculate the difference latest_block_number, diff = latest_block(start_block) if latest_block_number is None: logging.error(f"Failed to retrieve latest block number for token: {tokenSymbol}") continue # Skip to the next token if the latest block number could not be retrieved n_loop_to_concat = (diff // 10000) + 1 df_transactions = get_coin_data(contractAddr, n_loop_to_concat) df_latest = pd.concat([df_transactions, df_temp]).drop(["confirmations", "input"], axis=1) df_latest_no_dup = df_latest.drop_duplicates(subset="hash") df_latest_no_dup.loc[:, "blockNumber"] = df_latest_no_dup["blockNumber"].astype(int) df_latest_no_dup = df_latest_no_dup.sort_values(by="blockNumber") df_latest_no_dup.to_csv(file, sep=",", index=False) logging.info("Created files: " + ", ".join(L_created)) logging.info("Updated files: " + ", ".join(L_updated)) logging.info("Script execution completed.") # Helper function to get latest block number def latest_block(start_block=None): params = { "module": "proxy", "action": "eth_blockNumber", "apikey": api_key_etherscan } response = requests.get(url_etherscan, params=params) if response.status_code == 200: try: latest_block_number = int(response.json()["result"], 16) if start_block is not None: return latest_block_number, latest_block_number - start_block return latest_block_number except (ValueError, KeyError): logging.error(f"Invalid response format or missing data in response: {response.json()}") return None, None else: logging.error(f"API call failed with status code {response.status_code}: {response.json()}") return None, None def get_coin_data(contractAddr, n): latest_block_number = latest_block() if latest_block_number is None: logging.error(f"Could not retrieve latest block number for contract address {contractAddr}") return pd.DataFrame() # Return an empty DataFrame df_transactions = pd.DataFrame() transactions_per_call = 10_000 for i in range(n): start_block = latest_block_number - (n - i) * transactions_per_call end_block = latest_block_number - (n - 1 - i) * transactions_per_call params = { "module": "account", "action": "tokentx", "contractaddress": contractAddr, "startblock": start_block, "endblock": end_block, "sort": "asc", "apikey": api_key_etherscan } response = requests.get(url_etherscan, params=params) transactions = response.json().get("result", []) # Check if transactions is a list of dictionaries if not isinstance(transactions, list) or not all(isinstance(item, dict) for item in transactions): logging.error(f"Invalid data format for transactions: {transactions}") continue # Skip this iteration if transactions data is invalid df_temp = pd.DataFrame(transactions) if not df_temp.empty: df_transactions = pd.concat([df_transactions, df_temp]) time.sleep(1) if 'timeStamp' in df_transactions: df_transactions['timeStamp'] = pd.to_datetime(df_transactions['timeStamp'].astype(int), unit='s') else: logging.error("'timeStamp' key not found in the response data.") return pd.DataFrame() # Return an empty DataFrame if key is missing df_transactions['value'] = df_transactions['value'].astype(float) / 1e18 return df_transactions # function 2: Fetch and Process CMC Data @log_execution_time def fetch_and_process_cmc_data(): session = Session() session.headers.update({ 'Accepts': 'application/json', 'X-CMC_PRO_API_KEY': api_key_cmc, }) parameters = { 'start': '1', 'limit': '100', 'convert': 'USD' } for endpoint in ["v1/cryptocurrency/listings/latest"]: target = f"{url_cmc}/{endpoint}" try: response = session.get(target, params=parameters) data = json.loads(response.text) with open(f'output/cmc_data_{endpoint.replace("/", "_")}_100.json', 'w') as f: json.dump(data, f) process_cmc_data(data, '100') except (ConnectionError, Timeout, TooManyRedirects) as e: logging.error(f"Error while fetching data from {target}: {e}") def process_cmc_data(data, stop): df = pd.DataFrame(data["data"])[["name", "symbol", "circulating_supply", "total_supply", "quote"]] quote_df = pd.json_normalize(df['quote'].apply(lambda x: x['USD']))[["price", "percent_change_24h", "percent_change_7d", "percent_change_90d", "market_cap", "fully_diluted_market_cap", "last_updated"]] df = df.drop("quote", axis=1) df["percent_tokens_circulation"] = np.round((df["circulating_supply"] / df["total_supply"]) * 100, 1) df = df.join(quote_df) df["last_updated"] = pd.to_datetime(df["last_updated"]) save_cmc_data(df, stop) def save_cmc_data(df, stop): output_file = f"output/top_{stop}_update.csv" if os.path.isfile(output_file): existing_data = pd.read_csv(output_file) updated_data = pd.concat([existing_data, df], axis=0, ignore_index=True) updated_data.drop_duplicates(subset=["symbol", "last_updated"], inplace=True) updated_data.to_csv(output_file, index=False) else: df.to_csv(output_file, index=False) logging.info("CMC data script execution completed.") # ---------------------- Execution ---------------------- if __name__ == "__main__": fetch_and_update_etherscan() fetch_and_process_cmc_data()