Spaces:
Sleeping
Sleeping
File size: 5,675 Bytes
349c960 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# ---------------------- Library Imports ----------------------
import time
import os
import json
import pandas as pd
import logging
import requests
from dotenv import load_dotenv
# ---------------------- Environment Variables ----------------------
load_dotenv()
url_etherscan = os.getenv("URL_ETHERSCAN")
api_key_etherscan = os.getenv("API_KEY_ETHERSCAN")
log_folder = os.getenv("LOG_FOLDER")
os.makedirs(log_folder, exist_ok=True)
log_file = os.path.join(log_folder, "etherscan_scrap.log")
log_format = "%(asctime)s [%(levelname)s] - %(message)s"
logging.basicConfig(filename=log_file, level=logging.INFO, format=log_format)
# Load the JSON file into a dictionary
with open("ressources/dict_tokens_addr.json", "r") as file:
dict_addresses = json.load(file)
L_created = []
L_updated = []
n_blocks = 20000
n_loop = n_blocks // 10_000
# ---------------------- Helper Functions ----------------------
def log_execution_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
logging.info(f"Function {func.__name__} executed in {end_time - start_time:.2f} seconds")
return result
return wrapper
def latest_block(start_block=None):
params = {
"module": "proxy",
"action": "eth_blockNumber",
"apikey": api_key_etherscan
}
response = requests.get(url_etherscan, params=params)
if response.status_code == 200:
try:
latest_block_number = int(response.json()["result"], 16)
if start_block is not None:
return latest_block_number, latest_block_number - start_block
return latest_block_number
except (ValueError, KeyError):
logging.error(f"Invalid response format or missing data in response: {response.json()}")
return None, None
else:
logging.error(f"API call failed with status code {response.status_code}: {response.json()}")
return None, None
def get_coin_data(contractAddr, n):
latest_block_number = latest_block()
if latest_block_number is None:
logging.error(f"Could not retrieve latest block number for contract address {contractAddr}")
return pd.DataFrame() # Return an empty DataFrame
df_transactions = pd.DataFrame()
transactions_per_call = 10_000
for i in range(n):
start_block = latest_block_number - (n - i) * transactions_per_call
end_block = latest_block_number - (n - 1 - i) * transactions_per_call
params = {
"module": "account",
"action": "tokentx",
"contractaddress": contractAddr,
"startblock": start_block,
"endblock": end_block,
"sort": "asc",
"apikey": api_key_etherscan
}
response = requests.get(url_etherscan, params=params)
transactions = response.json().get("result", [])
if not isinstance(transactions, list) or not all(isinstance(item, dict) for item in transactions):
logging.error(f"Invalid data format for transactions: {transactions}")
continue # Skip this iteration if transactions data is invalid
df_temp = pd.DataFrame(transactions)
if not df_temp.empty:
df_transactions = pd.concat([df_transactions, df_temp])
time.sleep(1)
if 'timeStamp' in df_transactions:
df_transactions['timeStamp'] = pd.to_datetime(df_transactions['timeStamp'].astype(int), unit='s')
else:
logging.error("'timeStamp' key not found in the response data.")
return pd.DataFrame() # Return an empty DataFrame if key is missing
df_transactions['value'] = df_transactions['value'].astype(float) / 1e18
return df_transactions
# ---------------------- Main Function ----------------------
@log_execution_time
def fetch_and_update_etherscan():
for tokenSymbol, contractAddr in dict_addresses.items():
file = f"output/transactions_{tokenSymbol}.csv"
if not os.path.exists(file):
L_created.append(file)
df_transactions = get_coin_data(contractAddr, n_loop)
df_transactions_no_dup = df_transactions.drop(["confirmations", "input"], axis=1).drop_duplicates(subset="hash")
df_transactions_no_dup.to_csv(file, sep=",", index=False)
else:
L_updated.append(file)
df_temp = pd.read_csv(file, sep=",")
df_temp = df_temp.sort_values("blockNumber", ascending=False)
start_block = df_temp["blockNumber"].iloc[0]
latest_block_number, diff = latest_block(start_block)
if latest_block_number is None:
logging.error(f"Failed to retrieve latest block number for token: {tokenSymbol}")
continue
n_loop_to_concat = (diff // 10000) + 1
df_transactions = get_coin_data(contractAddr, n_loop_to_concat)
df_latest = pd.concat([df_transactions, df_temp]).drop(["confirmations", "input"], axis=1)
df_latest_no_dup = df_latest.drop_duplicates(subset="hash")
df_latest_no_dup.loc[:, "blockNumber"] = df_latest_no_dup["blockNumber"].astype(int)
df_latest_no_dup = df_latest_no_dup.sort_values(by="blockNumber")
df_latest_no_dup.to_csv(file, sep=",", index=False)
logging.info("Created files: " + ", ".join(L_created))
logging.info("Updated files: " + ", ".join(L_updated))
logging.info("Etherscan scraping script execution completed.")
# ---------------------- Script Execution ----------------------
if __name__ == "__main__":
fetch_and_update_etherscan()
|