Spaces:
Sleeping
Sleeping
# ------------------------ Libraries -------------------------- | |
import os | |
import pandas as pd | |
import streamlit as st | |
import plotly.graph_objs as go | |
import logging | |
import subprocess | |
import threading | |
from dotenv import load_dotenv | |
from requests.exceptions import ConnectionError, Timeout, TooManyRedirects | |
import plotly.express as px | |
import json | |
import networkx as nx | |
import time | |
# ------------------------ Environment Variables -------------------------- | |
load_dotenv() | |
log_folder = os.getenv("LOG_FOLDER") | |
# Logging | |
log_folder = os.getenv("LOG_STREAMLIT") | |
os.makedirs(log_folder, exist_ok=True) | |
log_file = os.path.join(log_folder, "front.log") | |
log_format = "%(asctime)s [%(levelname)s] - %(message)s" | |
logging.basicConfig(filename=log_file, level=logging.INFO, format=log_format) | |
logging.info("Streamlit app has started") | |
# Create output folder if it doesn't exist | |
if not os.path.exists("output"): | |
os.makedirs("output") | |
#-------------------------------------back---------------------------------- | |
def safe_read_csv(file_path, sep=','): | |
if os.path.exists(file_path) and os.path.getsize(file_path) > 0: | |
return pd.read_csv(file_path, sep=sep) | |
else: | |
logging.warning(f"File {file_path} is empty or does not exist.") | |
return pd.DataFrame() # return an empty DataFrame | |
# etherscan | |
## Load the data from the CSV files | |
df_etherscan = pd.DataFrame() | |
for filename in os.listdir('output'): | |
if filename.endswith('.csv') and 'transactions_' in filename: | |
df_temp = safe_read_csv(os.path.join('output', filename), sep=',') | |
df_etherscan = pd.concat([df_etherscan, df_temp], ignore_index=True) | |
# CMC | |
## Load cmc data | |
df_cmc = safe_read_csv("output/top_100_update.csv", sep=',') | |
df_cmc = df_cmc[df_cmc["last_updated"] == df_cmc["last_updated"].max()] | |
# Global metrics about the market | |
def load_global_metrics(): | |
try: | |
return pd.read_csv("output/global_metrics.csv") | |
except FileNotFoundError: | |
logging.warning("Global metrics file not found.") | |
return pd.DataFrame() # Return an empty DataFrame if file is not found | |
# Load influencers | |
def load_influencers(): | |
try: | |
with open("ressources/dict_influencers_addr.json", "r") as file: | |
return json.load(file) | |
except Exception as e: | |
st.error(f"Error loading influencers: {e}") | |
return {} | |
# Load influencers | |
def load_tokens(): | |
try: | |
with open("ressources/dict_tokens_addr.json", "r") as file: | |
return json.load(file) | |
except Exception as e: | |
st.error(f"Error loading influencers: {e}") | |
return {} | |
def create_dominance_pie_chart(df_global_metrics): | |
# Extract BTC and ETH dominance | |
btc_dominance = df_global_metrics['btc_dominance'].iloc[0] | |
eth_dominance = df_global_metrics['eth_dominance'].iloc[0] | |
# Calculate the dominance of other cryptocurrencies | |
others_dominance = 100 - btc_dominance - eth_dominance | |
#print(btc_dominance,eth_dominance,others_dominance) | |
# Prepare data for pie chart | |
dominance_data = { | |
'Cryptocurrency': ['BTC', 'ETH', 'Others'], | |
'Dominance': [btc_dominance, eth_dominance, others_dominance] | |
} | |
df_dominance = pd.DataFrame(dominance_data) | |
# Create a pie chart | |
fig = px.pie(df_dominance, values='Dominance', names='Cryptocurrency', title='Market Cap Dominance') | |
return fig | |
def display_greed_fear_index(): | |
try: | |
df = pd.read_csv('output/greed_fear_index.csv') | |
# Prepare data for plotting | |
time_periods = ['One Year Ago', 'One Month Ago', 'One Week Ago', 'Previous Close', 'Now'] | |
values = [ | |
df['fgi_oneYearAgo_value'].iloc[0], | |
df['fgi_oneMonthAgo_value'].iloc[0], | |
df['fgi_oneWeekAgo_value'].iloc[0], | |
df['fgi_previousClose_value'].iloc[0], | |
df['fgi_now_value'].iloc[0] | |
] | |
labels = [ | |
df['fgi_oneYearAgo_valueText'].iloc[0], | |
df['fgi_oneMonthAgo_valueText'].iloc[0], | |
df['fgi_oneWeekAgo_valueText'].iloc[0], | |
df['fgi_previousClose_valueText'].iloc[0], | |
df['fgi_now_valueText'].iloc[0] | |
] | |
# Create a Plotly figure | |
fig = go.Figure(data=[ | |
go.Scatter(x=time_periods, y=values, mode='lines+markers+text', text=labels, textposition='top center') | |
]) | |
# Update layout | |
fig.update_layout( | |
title='Fear and Greed Index Over Time', | |
xaxis_title='Time Period', | |
yaxis_title='Index Value', | |
yaxis=dict(range=[0, 100]) # Fear and Greed index ranges from 0 to 100 | |
) | |
# Display the figure | |
st.plotly_chart(fig) | |
except FileNotFoundError: | |
st.error("Greed and Fear index data not available. Please wait for the next update cycle.") | |
def load_token_balances(): | |
try: | |
return pd.read_csv("output/influencers_token_balances.csv") | |
except FileNotFoundError: | |
logging.warning("Token balances file not found.") | |
return pd.DataFrame() # Return an empty DataFrame if file is not found | |
def create_token_balance_bar_plot(df): | |
if df.empty: | |
return go.Figure() # Return an empty figure if there is no data | |
fig = px.bar(df, x="Influencer", y="Balance", color="Token", barmode="group") | |
fig.update_layout( | |
title="Token Balances of Influencers", | |
xaxis_title="Influencer", | |
yaxis_title="Token Balance", | |
legend_title="Token" | |
) | |
return fig | |
def get_top_buyers(df, token, top_n=5): | |
# Filter for selected token | |
token_df = df[df['tokenSymbol'] == token] | |
# Assuming 'value' column holds the amount bought and 'from' column holds the buyer's address | |
top_buyers = token_df.groupby('from')['value'].sum().sort_values(ascending=False).head(top_n) | |
return top_buyers.reset_index() | |
def plot_top_buyers(df): | |
fig = px.bar(df, x='from', y='value', title=f'Top 5 Buyers of {selected_token}',orientation="h") | |
fig.update_layout(xaxis_title="Address", yaxis_title="Total Amount Bought") | |
return fig | |
def load_influencer_interactions(influencer_name): | |
try: | |
# Load the influencer addresses dictionary | |
with open("ressources/dict_influencers_addr.json", "r") as file: | |
influencers = json.load(file) | |
# Get the address of the specified influencer | |
influencer_address = influencers.get(influencer_name, None) | |
if influencer_address is None: | |
return pd.DataFrame(), None | |
file_path = f"output/interactions_{influencer_name}.csv" | |
df = pd.read_csv(file_path) | |
# Keep only the 'from', 'to', and 'value' columns and remove duplicates | |
df = df[['from', 'to', 'value']].drop_duplicates() | |
return df, influencer_address | |
except FileNotFoundError: | |
return pd.DataFrame(), None | |
def create_network_graph(df, influencer_name, influencer_address): | |
G = nx.Graph() | |
# Consider bidirectional interactions | |
df_bi = pd.concat([df.rename(columns={'from': 'to', 'to': 'from'}), df]) | |
interaction_counts = df_bi.groupby(['from', 'to']).size().reset_index(name='count') | |
top_interactions = interaction_counts.sort_values('count', ascending=False).head(20) | |
# Add edges and nodes to the graph | |
for _, row in top_interactions.iterrows(): | |
G.add_edge(row['from'], row['to'], weight=row['count']) | |
G.add_node(row['from'], type='sender') | |
G.add_node(row['to'], type='receiver') | |
# Node positions | |
pos = nx.spring_layout(G, weight='weight') | |
# Edge trace | |
edge_x = [] | |
edge_y = [] | |
edge_hover = [] | |
for edge in G.edges(data=True): | |
x0, y0 = pos[edge[0]] | |
x1, y1 = pos[edge[1]] | |
edge_x.extend([x0, x1, None]) | |
edge_y.extend([y0, y1, None]) | |
edge_hover.append(f'Interactions: {edge[2]["weight"]}') | |
edge_trace = go.Scatter( | |
x=edge_x, y=edge_y, | |
line=dict(width=2, color='#888'), | |
hoverinfo='text', | |
text=edge_hover, | |
mode='lines') | |
# Node trace | |
node_x = [] | |
node_y = [] | |
node_hover = [] | |
node_size = [] | |
for node in G.nodes(): | |
x, y = pos[node] | |
node_x.append(x) | |
node_y.append(y) | |
connections = len(G.edges(node)) | |
interaction_sum = interaction_counts[interaction_counts['from'].eq(node) | interaction_counts['to'].eq(node)]['count'].sum() | |
node_hover_info = f'Address: {node}<br># of connections: {connections}<br># of interactions: {interaction_sum}' | |
if node == influencer_address: | |
node_hover_info = f'Influencer: {influencer_name}<br>' + node_hover_info | |
node_size.append(30) # Central node size | |
else: | |
node_size.append(20) # Other nodes size | |
node_hover.append(node_hover_info) | |
node_trace = go.Scatter( | |
x=node_x, y=node_y, | |
mode='markers', | |
hoverinfo='text', | |
text=node_hover, | |
marker=dict( | |
showscale=False, | |
color='blue', | |
size=node_size, | |
line=dict(width=2, color='black'))) | |
# Create figure | |
fig = go.Figure(data=[edge_trace, node_trace], | |
layout=go.Layout( | |
title=f'<br>Network graph of wallet interactions for {influencer_name}', | |
titlefont=dict(size=16), | |
showlegend=False, | |
hovermode='closest', | |
margin=dict(b=20, l=5, r=5, t=40), | |
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False))) | |
return fig, top_interactions | |
# Function to read the last update time from a file | |
def read_last_update_time(): | |
try: | |
with open("ressources/last_update.txt", "r") as file: | |
return file.read() | |
except FileNotFoundError: | |
return "" | |
# Initialize last_update_time using the function | |
st.session_state.last_update_time = read_last_update_time() | |
# Update Data Button with Timer Decorator | |
def update_data_with_timer(): | |
# Execute the scripts in the 'utils' folder to update data | |
subprocess.call(["python", "utils/scrap_etherscan.py"]) | |
subprocess.call(["python", "utils/scrap_cmc.py"]) | |
subprocess.call(["python", "utils/scrap_influencers_balance.py"]) | |
subprocess.call(["python", "utils/scrap_cmc_global_metrics.py"]) | |
subprocess.call(["python", "utils/scrap_greed_fear_index.py"]) | |
subprocess.call(["python", "utils/extract_tokens_balances.py"]) | |
# Update the last_update_time variable | |
last_update_time = time.strftime("%Y-%m-%d %H:%M:%S") | |
st.session_state.last_update_time = last_update_time | |
# Write the last update time to the file | |
with open("ressources/last_update.txt", "w") as file: | |
file.write(last_update_time) | |
# Update Data Button with Timer Decorator | |
def update_interactions(): | |
# Execute the scripts in the 'utils' folder to update data | |
subprocess.call(["python", "utils/extract_wallet_interactions.py"]) | |
# Update the last_update_time variable | |
#-------------------------------------scheduler ---------------------------------- | |
# # Function to execute the scraping functions | |
# def execute_etherscan_scraping(): | |
# subprocess.call(["python", "utils/scrap_etherscan.py"]) | |
# logging.info("Etherscan scraping completed") | |
# threading.Timer(3600, execute_etherscan_scraping).start() | |
# # Balancer scrapping | |
# def execute_influencers_scraping(): | |
# subprocess.call(["python", "utils/scrap_influencers_balance.py"]) | |
# logging.info("Influencers balance scraping completed") | |
# threading.Timer(3600, execute_influencers_scraping).start() | |
# # Function to execute the scraping functions | |
# def execute_cmc_scraping(): | |
# subprocess.call(["python", "utils/scrap_cmc.py"]) | |
# logging.info("CMC scraping completed") | |
# threading.Timer(2592000 / 9000, execute_cmc_scraping).start() | |
# # Function to execute the global metrics scraping | |
# def execute_global_metrics_scraping(): | |
# subprocess.call(["python", "utils/scrap_cmc_global_metrics.py"]) | |
# logging.info("Global metrics scraping completed") | |
# threading.Timer(2592000 / 9000, execute_influencers_scraping).start() | |
# def execute_greed_fear_index_scraping(): | |
# subprocess.call(["python", "utils/scrap_greed_fear_index.py"]) | |
# logging.info("Greed and Fear index scraping completed") | |
# threading.Timer(3600, execute_greed_fear_index_scraping).start() | |
# def execute_token_balances_scraping(): | |
# subprocess.call(["python", "utils/extract_tokens_balances.py"]) | |
# logging.info("Token balances scraping completed") | |
# threading.Timer(3600, execute_token_balances_scraping).start() | |
# if "initialized" not in st.session_state: | |
# # Start the scraping threads | |
# threading.Thread(target=execute_etherscan_scraping).start() | |
# threading.Thread(target=execute_cmc_scraping).start() | |
# threading.Thread(target=execute_influencers_scraping).start() | |
# threading.Thread(target=execute_global_metrics_scraping).start() | |
# threading.Thread(target=execute_greed_fear_index_scraping).start() | |
# threading.Thread(target=execute_token_balances_scraping).start() | |
# st.session_state["initialized"] = True | |
#-------------------------------------streamlit ---------------------------------- | |
# Set the title and other page configurations | |
st.title('Crypto Analysis') | |
st.write("Welcome to the Crypto Analysis app. Please note that data is not updated automatically due to API plan limitations.") | |
# Display the last update time | |
st.write(f"Time of last update: {st.session_state.last_update_time}") | |
# Update Data Button with Timer Decorator | |
if st.button("Scrap new data", on_click=update_data_with_timer): | |
st.success("Data updated.") | |
st.header("Global Cryptocurrency Market Metrics") | |
# Create two columns for the two plots | |
col1, col2 = st.columns(2) | |
global_metrics_df = load_global_metrics() | |
display_greed_fear_index() | |
st.write(global_metrics_df) | |
with col1: | |
# Create and display the pie chart | |
dominance_fig = create_dominance_pie_chart(global_metrics_df) | |
dominance_fig.update_layout( | |
autosize=False, | |
width=300, | |
height=300,) | |
st.plotly_chart(dominance_fig) | |
with col2: | |
# cmc | |
selected_var = st.selectbox('Select Var', ["percent_change_24h","percent_change_7d","percent_change_90d"], index=0) | |
# Sort the DataFrame by the 'percent_change_24h' column in ascending order | |
df_sorted = df_cmc.sort_values(by=selected_var, ascending=False) | |
# Select the top 10 and worst 10 rows | |
top_10 = df_sorted.head(10) | |
worst_10 = df_sorted.tail(10) | |
# Combine the top and worst dataframes for plotting | |
combined_df = pd.concat([top_10, worst_10], axis=0) | |
max_abs_val = max(abs(combined_df[selected_var].min()), abs(combined_df[selected_var].max())) | |
# Create a bar plot for the top 10 with a green color scale | |
fig = go.Figure(data=[ | |
go.Bar( | |
x=top_10["symbol"], | |
y=top_10[selected_var], | |
marker_color='rgb(0,100,0)', # Green color for top 10 | |
hovertext= "Name : "+top_10["name"].astype(str)+ '<br>' + | |
selected_var + " : " + top_10["percent_tokens_circulation"].astype(str) + '<br>' + | |
'Market Cap: ' + top_10["market_cap"].astype(str) + '<br>' + | |
'Fully Diluted Market Cap: ' + top_10["fully_diluted_market_cap"].astype(str) + '<br>' + | |
'Last Updated: ' + top_10["last_updated"].astype(str), | |
name="top_10" | |
) | |
]) | |
# Add the worst 10 to the same plot with a red color scale | |
fig.add_traces(go.Bar( | |
x=worst_10["symbol"], | |
y=worst_10[selected_var], | |
marker_color='rgb(255,0,0)', # Red color for worst 10 | |
hovertext="Name:"+worst_10["name"].astype(str)+ '<br>' + | |
selected_var + " : " + worst_10["percent_tokens_circulation"].astype(str) + '<br>' + | |
'Market Cap: ' + worst_10["market_cap"].astype(str) + '<br>' + | |
'Fully Diluted Market Cap: ' + worst_10["fully_diluted_market_cap"].astype(str) + '<br>' + | |
'Last Updated: ' + worst_10["last_updated"].astype(str), | |
name="worst_10" | |
) | |
) | |
# Customize aspect | |
fig.update_traces(marker_line_color='rgb(8,48,107)', marker_line_width=1.5, opacity=0.8) | |
fig.update_layout(title_text=f'Top 10 and Worst 10 by {selected_var.split("_")[-1]} Percentage Change') | |
fig.update_xaxes(categoryorder='total ascending') | |
fig.update_layout( | |
autosize=False, | |
width=300, | |
height=300, | |
#paper_bgcolor="LightSteelBlue", | |
) | |
st.plotly_chart(fig) | |
st.header("Deep Dive into Specific Coins") | |
col1, col2 = st.columns(2) | |
tokens = load_tokens() | |
selected_token = st.selectbox('Select Token', df_etherscan['tokenSymbol'].unique(), index=0) | |
token_input = st.text_input("Add new token", placeholder="e.g., APE:0x123...ABC") | |
if st.button("Add Token"): | |
if ":" in token_input: | |
try: | |
new_token_name, new_token_addr = token_input.split(":") | |
tokens[new_token_name.strip()] = new_token_addr.strip() | |
with open("ressources/dict_tokens_addr.json", "w") as file: | |
json.dump(tokens, file, indent=4) | |
st.success(f"Token {new_token_name} added") | |
subprocess.call(["python", "utils/scrap_etherscan.py"]) | |
df_etherscan = pd.DataFrame() | |
for filename in os.listdir('output'): | |
if filename.endswith('.csv') and 'transactions_' in filename: | |
df_temp = safe_read_csv(os.path.join('output', filename), sep=',') | |
df_etherscan = pd.concat([df_etherscan, df_temp], ignore_index=True) | |
except ValueError: | |
st.error("Invalid format. Please enter as 'name:address'") | |
else: | |
st.error("Please enter the influencer details as 'name:address'") | |
with col1: | |
# Filter the data based on the selected token | |
filtered_df = df_etherscan[df_etherscan['tokenSymbol'] == selected_token] | |
# Plot the token volume over time | |
st.plotly_chart( | |
go.Figure( | |
data=[ | |
go.Scatter( | |
x=filtered_df['timeStamp'], | |
y=filtered_df['value'], | |
mode='lines', | |
name='Volume over time' | |
) | |
], | |
layout=go.Layout( | |
title='Token Volume Over Time', | |
yaxis=dict( | |
title=f'Volume ({selected_token})', | |
), | |
showlegend=True, | |
legend=go.layout.Legend(x=0, y=1.0), | |
margin=go.layout.Margin(l=40, r=0, t=40, b=30), | |
width=300, | |
height=300, | |
) | |
) | |
) | |
with col2: | |
# Processing data | |
top_buyers_df = get_top_buyers(df_etherscan, selected_token) | |
# Plotting | |
if not top_buyers_df.empty: | |
top_buyers_fig = plot_top_buyers(top_buyers_df) | |
top_buyers_fig.update_layout( | |
autosize=False, | |
width=300, | |
height=300) | |
st.plotly_chart(top_buyers_fig) | |
else: | |
st.write(f"No buying data available for {selected_token}") | |
st.header("Influencers' Token Balances") | |
token_balances_df = load_token_balances() | |
col1, col2 = st.columns(2) | |
influencers = load_influencers() | |
influencer_input = st.text_input("Add a new influencer", placeholder="e.g., alice:0x123...ABC") | |
if st.button("Add Influencer"): | |
if ":" in influencer_input: | |
try: | |
new_influencer_name, new_influencer_addr = influencer_input.split(":") | |
influencers[new_influencer_name.strip()] = new_influencer_addr.strip() | |
with open("ressources/dict_influencers_addr.json", "w") as file: | |
json.dump(influencers, file, indent=4) | |
st.success(f"Influencer {new_influencer_name} added") | |
subprocess.call(["python", "utils/scrap_influencers_balance.py"]) | |
subprocess.call(["python", "utils/extract_tokens_balances.py"]) | |
token_balances_df = load_token_balances() | |
except ValueError: | |
st.error("Invalid format. Please enter as 'name:address'") | |
else: | |
st.error("Please enter the influencer details as 'name:address'") | |
with col1: | |
if not token_balances_df.empty: | |
token_balance_fig = create_token_balance_bar_plot(token_balances_df) | |
token_balance_fig.update_layout( | |
autosize=False, | |
width=300, | |
height=400,) | |
st.plotly_chart(token_balance_fig) | |
else: | |
st.write("No token balance data available.") | |
with col2: | |
# Load Ether balances | |
try: | |
df_balances = pd.read_csv("output/influencers_balances.csv") | |
logging.info(f"Balances uploaded, shape of dataframe is {df_balances.shape}") | |
#st.write("DataFrame Loaded:", df_balances) # Debugging line | |
except FileNotFoundError: | |
st.error("Balance data not found. Please wait for the next update cycle.") | |
df_balances = pd.DataFrame() | |
# Inverting the influencers dictionary | |
inverted_influencers = {v.lower(): k for k, v in influencers.items()} | |
if not df_balances.empty: | |
df_balances["balance"] = df_balances["balance"].astype(float) / 1e18 # Convert Wei to Ether | |
df_balances = df_balances.rename(columns={"account": "address"}) | |
# Ensure addresses are in the same format as in the inverted dictionary (e.g., lowercase) | |
df_balances["address"] = df_balances["address"].str.lower() | |
# Perform the mapping | |
df_balances["influencer"] = df_balances["address"].map(inverted_influencers) | |
#st.write("Mapped DataFrame:", df_balances) # Debugging line | |
fig = px.bar(df_balances, y="influencer", x="balance",orientation="h") | |
fig.update_layout( | |
title='Ether Balances of Influencers', | |
xaxis=dict( | |
title='Balance in eth', | |
titlefont_size=16, | |
tickfont_size=14, | |
)) | |
fig.update_layout( | |
autosize=False, | |
width=300, | |
height=400,) | |
st.plotly_chart(fig) | |
else: | |
logging.info("DataFrame is empty") | |
# In the Streamlit app | |
st.header("Wallet Interactions Network Graph") | |
# Update Data Button with Timer Decorator | |
if st.button("Update interactions", on_click=update_interactions): | |
st.success("Interactions data updated.") | |
selected_influencer = st.selectbox("Select an Influencer", list(influencers.keys())) | |
# Load interactions data for the selected influencer | |
interactions_df, influencer_address = load_influencer_interactions(selected_influencer) | |
if not interactions_df.empty: | |
# Generate the network graph and the table of top interactions | |
network_fig, top_interactions = create_network_graph(interactions_df, selected_influencer, influencer_address) | |
# Display the network graph | |
st.plotly_chart(network_fig) | |
# Display the table of top interactions | |
st.subheader(f"Top Interactions for {selected_influencer}") | |
st.table(top_interactions) | |
else: | |
st.write(f"No wallet interaction data available for {selected_influencer}.") | |
st.markdown(""" | |
<div style="text-align: center; margin-top: 20px;"> | |
<a href="https://github.com/mohcineelharras/llama-index-docs" target="_blank" style="margin: 10px; display: inline-block;"> | |
<img src="https://img.shields.io/badge/Repository-333?logo=github&style=for-the-badge" alt="Repository" style="vertical-align: middle;"> | |
</a> | |
<a href="https://www.linkedin.com/in/mohcine-el-harras" target="_blank" style="margin: 10px; display: inline-block;"> | |
<img src="https://img.shields.io/badge/-LinkedIn-0077B5?style=for-the-badge&logo=linkedin" alt="LinkedIn" style="vertical-align: middle;"> | |
</a> | |
<a href="https://mohcineelharras.github.io" target="_blank" style="margin: 10px; display: inline-block;"> | |
<img src="https://img.shields.io/badge/Visit-Portfolio-9cf?style=for-the-badge" alt="GitHub" style="vertical-align: middle;"> | |
</a> | |
</div> | |
<div style="text-align: center; margin-top: 20px; color: #666; font-size: 0.85em;"> | |
© 2023 Mohcine EL HARRAS | |
</div> | |
""", unsafe_allow_html=True) | |
#-------------------------------------end ---------------------------------- | |