Spaces:
Sleeping
Sleeping
File size: 8,139 Bytes
349c960 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# ---------------------- Library Imports ----------------------
import time
import os
import json
import pandas as pd
import numpy as np
import logging
import requests
from dotenv import load_dotenv
from requests import Session
# ---------------------- Environment Variables ----------------------
load_dotenv()
# Etherscan API
url_etherscan = os.getenv("URL_ETHERSCAN")
api_key_etherscan = os.getenv("API_KEY_ETHERSCAN")
# CoinMarketCap API
url_cmc = os.getenv("URL_CMC")
api_key_cmc = os.getenv("API_KEY_CMC")
# Logging
log_folder = os.getenv("LOG_FOLDER")
os.makedirs(log_folder, exist_ok=True)
log_file = os.path.join(log_folder, "scrapping.log")
log_format = "%(asctime)s [%(levelname)s] - %(message)s"
logging.basicConfig(filename=log_file, level=logging.INFO, format=log_format)
# Load the JSON file into a dictionary
with open("ressources/dict_tokens_addr.json", "r") as file:
dict_addresses = json.load(file)
L_created = []
L_updated = []
# Define the number of blocks to retrieve transactions from
n_blocks = 20000
n_loop = n_blocks // 10_000
# ---------------------- Processing ----------------------
# Helper function for logging execution time
def log_execution_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
logging.info(f"Function {func.__name__} executed in {end_time - start_time:.2f} seconds")
return result
return wrapper
@log_execution_time
# function 1: Fetch and Update Etherscan Data
def fetch_and_update_etherscan():
for tokenSymbol, contractAddr in dict_addresses.items():
file = f"output/transactions_{tokenSymbol}.csv"
if not os.path.exists(file):
L_created.append(file)
df_transactions = get_coin_data(contractAddr, n_loop)
df_transactions_no_dup = df_transactions.drop(["confirmations", "input"], axis=1).drop_duplicates(subset="hash")
df_transactions_no_dup.to_csv(file, sep=",", index=False)
else:
L_updated.append(file)
df_temp = pd.read_csv(file, sep=",")
df_temp = df_temp.sort_values("blockNumber", ascending=False)
start_block = df_temp["blockNumber"].iloc[0]
# Retrieve latest block number and calculate the difference
latest_block_number, diff = latest_block(start_block)
if latest_block_number is None:
logging.error(f"Failed to retrieve latest block number for token: {tokenSymbol}")
continue # Skip to the next token if the latest block number could not be retrieved
n_loop_to_concat = (diff // 10000) + 1
df_transactions = get_coin_data(contractAddr, n_loop_to_concat)
df_latest = pd.concat([df_transactions, df_temp]).drop(["confirmations", "input"], axis=1)
df_latest_no_dup = df_latest.drop_duplicates(subset="hash")
df_latest_no_dup.loc[:, "blockNumber"] = df_latest_no_dup["blockNumber"].astype(int)
df_latest_no_dup = df_latest_no_dup.sort_values(by="blockNumber")
df_latest_no_dup.to_csv(file, sep=",", index=False)
logging.info("Created files: " + ", ".join(L_created))
logging.info("Updated files: " + ", ".join(L_updated))
logging.info("Script execution completed.")
# Helper function to get latest block number
def latest_block(start_block=None):
params = {
"module": "proxy",
"action": "eth_blockNumber",
"apikey": api_key_etherscan
}
response = requests.get(url_etherscan, params=params)
if response.status_code == 200:
try:
latest_block_number = int(response.json()["result"], 16)
if start_block is not None:
return latest_block_number, latest_block_number - start_block
return latest_block_number
except (ValueError, KeyError):
logging.error(f"Invalid response format or missing data in response: {response.json()}")
return None, None
else:
logging.error(f"API call failed with status code {response.status_code}: {response.json()}")
return None, None
def get_coin_data(contractAddr, n):
latest_block_number = latest_block()
if latest_block_number is None:
logging.error(f"Could not retrieve latest block number for contract address {contractAddr}")
return pd.DataFrame() # Return an empty DataFrame
df_transactions = pd.DataFrame()
transactions_per_call = 10_000
for i in range(n):
start_block = latest_block_number - (n - i) * transactions_per_call
end_block = latest_block_number - (n - 1 - i) * transactions_per_call
params = {
"module": "account",
"action": "tokentx",
"contractaddress": contractAddr,
"startblock": start_block,
"endblock": end_block,
"sort": "asc",
"apikey": api_key_etherscan
}
response = requests.get(url_etherscan, params=params)
transactions = response.json().get("result", [])
# Check if transactions is a list of dictionaries
if not isinstance(transactions, list) or not all(isinstance(item, dict) for item in transactions):
logging.error(f"Invalid data format for transactions: {transactions}")
continue # Skip this iteration if transactions data is invalid
df_temp = pd.DataFrame(transactions)
if not df_temp.empty:
df_transactions = pd.concat([df_transactions, df_temp])
time.sleep(1)
if 'timeStamp' in df_transactions:
df_transactions['timeStamp'] = pd.to_datetime(df_transactions['timeStamp'].astype(int), unit='s')
else:
logging.error("'timeStamp' key not found in the response data.")
return pd.DataFrame() # Return an empty DataFrame if key is missing
df_transactions['value'] = df_transactions['value'].astype(float) / 1e18
return df_transactions
# function 2: Fetch and Process CMC Data
@log_execution_time
def fetch_and_process_cmc_data():
session = Session()
session.headers.update({
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': api_key_cmc,
})
parameters = {
'start': '1',
'limit': '100',
'convert': 'USD'
}
for endpoint in ["v1/cryptocurrency/listings/latest"]:
target = f"{url_cmc}/{endpoint}"
try:
response = session.get(target, params=parameters)
data = json.loads(response.text)
with open(f'output/cmc_data_{endpoint.replace("/", "_")}_100.json', 'w') as f:
json.dump(data, f)
process_cmc_data(data, '100')
except (ConnectionError, Timeout, TooManyRedirects) as e:
logging.error(f"Error while fetching data from {target}: {e}")
def process_cmc_data(data, stop):
df = pd.DataFrame(data["data"])[["name", "symbol", "circulating_supply", "total_supply", "quote"]]
quote_df = pd.json_normalize(df['quote'].apply(lambda x: x['USD']))[["price", "percent_change_24h", "percent_change_7d", "percent_change_90d", "market_cap", "fully_diluted_market_cap", "last_updated"]]
df = df.drop("quote", axis=1)
df["percent_tokens_circulation"] = np.round((df["circulating_supply"] / df["total_supply"]) * 100, 1)
df = df.join(quote_df)
df["last_updated"] = pd.to_datetime(df["last_updated"])
save_cmc_data(df, stop)
def save_cmc_data(df, stop):
output_file = f"output/top_{stop}_update.csv"
if os.path.isfile(output_file):
existing_data = pd.read_csv(output_file)
updated_data = pd.concat([existing_data, df], axis=0, ignore_index=True)
updated_data.drop_duplicates(subset=["symbol", "last_updated"], inplace=True)
updated_data.to_csv(output_file, index=False)
else:
df.to_csv(output_file, index=False)
logging.info("CMC data script execution completed.")
# ---------------------- Execution ----------------------
if __name__ == "__main__":
fetch_and_update_etherscan()
fetch_and_process_cmc_data()
|