Spaces:
Restarting
Restarting
# -*- coding: utf-8 -*- | |
# ------------------------------------------------------------------------------ | |
# | |
# Copyright 2023 Valory AG | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# ------------------------------------------------------------------------------ | |
import time | |
import requests | |
import datetime | |
import pandas as pd | |
from collections import defaultdict | |
from typing import Any | |
from string import Template | |
from enum import Enum | |
from tqdm import tqdm | |
import numpy as np | |
import os | |
from pathlib import Path | |
from get_mech_info import DATETIME_60_DAYS_AGO | |
from utils import SUBGRAPH_API_KEY, wei_to_unit, convert_hex_to_int, _to_content | |
from queries import omen_xdai_trades_query, conditional_tokens_gc_user_query | |
QUERY_BATCH_SIZE = 1000 | |
DUST_THRESHOLD = 10000000000000 | |
INVALID_ANSWER = -1 | |
FPMM_QS_CREATOR = "0x89c5cc945dd550bcffb72fe42bff002429f46fec" | |
FPMM_PEARL_CREATOR = "0xFfc8029154ECD55ABED15BD428bA596E7D23f557" | |
DEFAULT_FROM_DATE = "1970-01-01T00:00:00" | |
DEFAULT_TO_DATE = "2038-01-19T03:14:07" | |
DEFAULT_FROM_TIMESTAMP = 0 | |
DEFAULT_60_DAYS_AGO_TIMESTAMP = (DATETIME_60_DAYS_AGO).timestamp() | |
DEFAULT_TO_TIMESTAMP = 2147483647 # around year 2038 | |
WXDAI_CONTRACT_ADDRESS = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d" | |
DEFAULT_MECH_FEE = 0.01 | |
DUST_THRESHOLD = 10000000000000 | |
SCRIPTS_DIR = Path(__file__).parent | |
ROOT_DIR = SCRIPTS_DIR.parent | |
DATA_DIR = ROOT_DIR / "data" | |
class MarketState(Enum): | |
"""Market state""" | |
OPEN = 1 | |
PENDING = 2 | |
FINALIZING = 3 | |
ARBITRATING = 4 | |
CLOSED = 5 | |
def __str__(self) -> str: | |
"""Prints the market status.""" | |
return self.name.capitalize() | |
class MarketAttribute(Enum): | |
"""Attribute""" | |
NUM_TRADES = "Num_trades" | |
WINNER_TRADES = "Winner_trades" | |
NUM_REDEEMED = "Num_redeemed" | |
INVESTMENT = "Investment" | |
FEES = "Fees" | |
MECH_CALLS = "Mech_calls" | |
MECH_FEES = "Mech_fees" | |
EARNINGS = "Earnings" | |
NET_EARNINGS = "Net_earnings" | |
REDEMPTIONS = "Redemptions" | |
ROI = "ROI" | |
def __str__(self) -> str: | |
"""Prints the attribute.""" | |
return self.value | |
def __repr__(self) -> str: | |
"""Prints the attribute representation.""" | |
return self.name | |
def argparse(s: str) -> "MarketAttribute": | |
"""Performs string conversion to MarketAttribute.""" | |
try: | |
return MarketAttribute[s.upper()] | |
except KeyError as e: | |
raise ValueError(f"Invalid MarketAttribute: {s}") from e | |
ALL_TRADES_STATS_DF_COLS = [ | |
"trader_address", | |
"trade_id", | |
"creation_timestamp", | |
"title", | |
"market_status", | |
"collateral_amount", | |
"outcome_index", | |
"trade_fee_amount", | |
"outcomes_tokens_traded", | |
"current_answer", | |
"is_invalid", | |
"winning_trade", | |
"earnings", | |
"redeemed", | |
"redeemed_amount", | |
"num_mech_calls", | |
"mech_fee_amount", | |
"net_earnings", | |
"roi", | |
] | |
SUMMARY_STATS_DF_COLS = [ | |
"trader_address", | |
"num_trades", | |
"num_winning_trades", | |
"num_redeemed", | |
"total_investment", | |
"total_trade_fees", | |
"num_mech_calls", | |
"total_mech_fees", | |
"total_earnings", | |
"total_redeemed_amount", | |
"total_net_earnings", | |
"total_net_earnings_wo_mech_fees", | |
"total_roi", | |
"total_roi_wo_mech_fees", | |
"mean_mech_calls_per_trade", | |
"mean_mech_fee_amount_per_trade", | |
] | |
headers = { | |
"Accept": "application/json, multipart/mixed", | |
"Content-Type": "application/json", | |
} | |
def _query_omen_xdai_subgraph( | |
trader_category: str, | |
from_timestamp: float, | |
to_timestamp: float, | |
fpmm_from_timestamp: float, | |
fpmm_to_timestamp: float, | |
) -> dict[str, Any]: | |
"""Query the subgraph.""" | |
OMEN_SUBGRAPH_URL = Template( | |
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" | |
) | |
omen_subgraph = OMEN_SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) | |
print(f"omen_subgraph = {omen_subgraph}") | |
grouped_results = defaultdict(list) | |
id_gt = "" | |
if trader_category == "quickstart": | |
creator_id = FPMM_QS_CREATOR.lower() | |
else: # pearl | |
creator_id = FPMM_PEARL_CREATOR.lower() | |
while True: | |
query = omen_xdai_trades_query.substitute( | |
fpmm_creator=creator_id, | |
creationTimestamp_gte=int(from_timestamp), | |
creationTimestamp_lte=int(to_timestamp), | |
fpmm_creationTimestamp_gte=int(fpmm_from_timestamp), | |
fpmm_creationTimestamp_lte=int(fpmm_to_timestamp), | |
first=QUERY_BATCH_SIZE, | |
id_gt=id_gt, | |
) | |
content_json = _to_content(query) | |
res = requests.post(omen_subgraph, headers=headers, json=content_json) | |
result_json = res.json() | |
# print(f"result = {result_json}") | |
user_trades = result_json.get("data", {}).get("fpmmTrades", []) | |
if not user_trades: | |
break | |
for trade in user_trades: | |
fpmm_id = trade.get("fpmm", {}).get("id") | |
grouped_results[fpmm_id].append(trade) | |
id_gt = user_trades[len(user_trades) - 1]["id"] | |
all_results = { | |
"data": { | |
"fpmmTrades": [ | |
trade | |
for trades_list in grouped_results.values() | |
for trade in trades_list | |
] | |
} | |
} | |
return all_results | |
def _query_conditional_tokens_gc_subgraph(creator: str) -> dict[str, Any]: | |
"""Query the subgraph.""" | |
SUBGRAPH_URL = Template( | |
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/7s9rGBffUTL8kDZuxvvpuc46v44iuDarbrADBFw5uVp2""" | |
) | |
subgraph = SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) | |
all_results: dict[str, Any] = {"data": {"user": {"userPositions": []}}} | |
userPositions_id_gt = "" | |
while True: | |
query = conditional_tokens_gc_user_query.substitute( | |
id=creator.lower(), | |
first=QUERY_BATCH_SIZE, | |
userPositions_id_gt=userPositions_id_gt, | |
) | |
content_json = {"query": query} | |
print("sending query to subgraph") | |
res = requests.post(subgraph, headers=headers, json=content_json) | |
result_json = res.json() | |
# print(f"result = {result_json}") | |
user_data = result_json.get("data", {}).get("user", {}) | |
if not user_data: | |
break | |
user_positions = user_data.get("userPositions", []) | |
if user_positions: | |
all_results["data"]["user"]["userPositions"].extend(user_positions) | |
userPositions_id_gt = user_positions[len(user_positions) - 1]["id"] | |
else: | |
break | |
if len(all_results["data"]["user"]["userPositions"]) == 0: | |
return {"data": {"user": None}} | |
return all_results | |
def _is_redeemed(user_json: dict[str, Any], fpmmTrade: dict[str, Any]) -> bool: | |
"""Returns whether the user has redeemed the position.""" | |
user_positions = user_json["data"]["user"]["userPositions"] | |
condition_id = fpmmTrade["fpmm.condition.id"] | |
for position in user_positions: | |
position_condition_ids = position["position"]["conditionIds"] | |
balance = int(position["balance"]) | |
if condition_id in position_condition_ids: | |
if balance == 0: | |
return True | |
# return early | |
return False | |
return False | |
def transform_fpmmTrades(df: pd.DataFrame) -> pd.DataFrame: | |
print("Transforming trades dataframe") | |
# convert creator to address | |
df["creator"] = df["creator"].apply(lambda x: x["id"]) | |
# normalize fpmm column | |
fpmm = pd.json_normalize(df["fpmm"]) | |
fpmm.columns = [f"fpmm.{col}" for col in fpmm.columns] | |
df = pd.concat([df, fpmm], axis=1) | |
# drop fpmm column | |
df.drop(["fpmm"], axis=1, inplace=True) | |
# change creator to creator_address | |
df.rename(columns={"creator": "trader_address"}, inplace=True) | |
print(df.head()) | |
print(df.info()) | |
return df | |
def create_fpmmTrades(rpc: str, from_timestamp: float = DEFAULT_FROM_TIMESTAMP): | |
"""Create fpmmTrades for all trades.""" | |
# Quickstart trades | |
qs_trades_json = _query_omen_xdai_subgraph( | |
trader_category="quickstart", | |
from_timestamp=from_timestamp, | |
to_timestamp=DEFAULT_TO_TIMESTAMP, | |
fpmm_from_timestamp=from_timestamp, | |
fpmm_to_timestamp=DEFAULT_TO_TIMESTAMP, | |
) | |
print(f"length of the qs_trades_json dataset {len(qs_trades_json)}") | |
# convert to dataframe | |
qs_df = pd.DataFrame(qs_trades_json["data"]["fpmmTrades"]) | |
qs_df["market_creator"] = "quickstart" | |
qs_df = transform_fpmmTrades(qs_df) | |
# Pearl trades | |
pearl_trades_json = _query_omen_xdai_subgraph( | |
trader_category="pearl", | |
from_timestamp=from_timestamp, | |
to_timestamp=DEFAULT_TO_TIMESTAMP, | |
fpmm_from_timestamp=from_timestamp, | |
fpmm_to_timestamp=DEFAULT_TO_TIMESTAMP, | |
) | |
print(f"length of the pearl_trades_json dataset {len(pearl_trades_json)}") | |
# convert to dataframe | |
pearl_df = pd.DataFrame(pearl_trades_json["data"]["fpmmTrades"]) | |
pearl_df["market_creator"] = "pearl" | |
pearl_df = transform_fpmmTrades(pearl_df) | |
return pd.concat([qs_df, pearl_df], ignore_index=True) | |
def prepare_profitalibity_data( | |
rpc: str, | |
tools_filename: str = "tools.parquet", | |
trades_filename: str = "fpmmTrades.parquet", | |
from_timestamp: float = DEFAULT_60_DAYS_AGO_TIMESTAMP, | |
): | |
"""Prepare data for profitalibity analysis.""" | |
# Check if tools.parquet is in the same directory | |
try: | |
tools = pd.read_parquet(DATA_DIR / tools_filename) | |
# make sure creator_address is in the columns | |
assert "trader_address" in tools.columns, "trader_address column not found" | |
# lowercase and strip creator_address | |
tools["trader_address"] = tools["trader_address"].str.lower().str.strip() | |
# drop duplicates | |
tools.drop_duplicates(inplace=True) | |
print(f"{tools_filename} loaded") | |
except FileNotFoundError: | |
print("tools.parquet not found. Please run tools.py first.") | |
return | |
# Check if fpmmTrades.parquet is in the same directory | |
try: | |
fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename) | |
print(f"{trades_filename} loaded") | |
except FileNotFoundError: | |
print("fpmmTrades.parquet not found. Creating fpmmTrades.parquet...") | |
fpmmTrades = create_fpmmTrades(rpc, from_timestamp=from_timestamp) | |
fpmmTrades.to_parquet(DATA_DIR / "fpmmTrades.parquet", index=False) | |
# make sure trader_address is in the columns | |
assert "trader_address" in fpmmTrades.columns, "trader_address column not found" | |
# lowercase and strip creator_address | |
fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip() | |
return fpmmTrades, tools | |
def determine_market_status(trade, current_answer): | |
"""Determine the market status of a trade.""" | |
if current_answer is np.nan and time.time() >= int(trade["fpmm.openingTimestamp"]): | |
return MarketState.PENDING | |
elif current_answer == np.nan: | |
return MarketState.OPEN | |
elif trade["fpmm.isPendingArbitration"]: | |
return MarketState.ARBITRATING | |
elif time.time() < int(trade["fpmm.answerFinalizedTimestamp"]): | |
return MarketState.FINALIZING | |
return MarketState.CLOSED | |
def analyse_trader( | |
trader_address: str, fpmmTrades: pd.DataFrame, tools: pd.DataFrame | |
) -> pd.DataFrame: | |
"""Analyse a trader's trades""" | |
# Filter trades and tools for the given trader | |
trades = fpmmTrades[fpmmTrades["trader_address"] == trader_address] | |
tools_usage = tools[tools["trader_address"] == trader_address] | |
# Prepare the DataFrame | |
trades_df = pd.DataFrame(columns=ALL_TRADES_STATS_DF_COLS) | |
if trades.empty: | |
return trades_df | |
# Fetch user's conditional tokens gc graph | |
try: | |
user_json = _query_conditional_tokens_gc_subgraph(trader_address) | |
except Exception as e: | |
print(f"Error fetching user data: {e}") | |
return trades_df | |
# Iterate over the trades | |
for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"): | |
try: | |
if not trade["fpmm.currentAnswer"]: | |
print(f"Skipping trade {i} because currentAnswer is NaN") | |
continue | |
# Parsing and computing shared values | |
creation_timestamp_utc = datetime.datetime.fromtimestamp( | |
int(trade["creationTimestamp"]), tz=datetime.timezone.utc | |
) | |
collateral_amount = wei_to_unit(float(trade["collateralAmount"])) | |
fee_amount = wei_to_unit(float(trade["feeAmount"])) | |
outcome_tokens_traded = wei_to_unit(float(trade["outcomeTokensTraded"])) | |
earnings, winner_trade = (0, False) | |
redemption = _is_redeemed(user_json, trade) | |
current_answer = trade["fpmm.currentAnswer"] | |
# Determine market status | |
market_status = determine_market_status(trade, current_answer) | |
# Skip non-closed markets | |
if market_status != MarketState.CLOSED: | |
print( | |
f"Skipping trade {i} because market is not closed. Market Status: {market_status}" | |
) | |
continue | |
current_answer = convert_hex_to_int(current_answer) | |
# Compute invalidity | |
is_invalid = current_answer == INVALID_ANSWER | |
# Compute earnings and winner trade status | |
if is_invalid: | |
earnings = collateral_amount | |
winner_trade = False | |
elif int(trade["outcomeIndex"]) == current_answer: | |
earnings = outcome_tokens_traded | |
winner_trade = True | |
# Compute mech calls | |
num_mech_calls = ( | |
tools_usage["prompt_request"].apply(lambda x: trade["title"] in x).sum() | |
) | |
net_earnings = ( | |
earnings | |
- fee_amount | |
- (num_mech_calls * DEFAULT_MECH_FEE) | |
- collateral_amount | |
) | |
# Assign values to DataFrame | |
trades_df.loc[i] = { | |
"trader_address": trader_address, | |
"trade_id": trade["id"], | |
"market_status": market_status.name, | |
"creation_timestamp": creation_timestamp_utc, | |
"title": trade["title"], | |
"collateral_amount": collateral_amount, | |
"outcome_index": trade["outcomeIndex"], | |
"trade_fee_amount": fee_amount, | |
"outcomes_tokens_traded": outcome_tokens_traded, | |
"current_answer": current_answer, | |
"is_invalid": is_invalid, | |
"winning_trade": winner_trade, | |
"earnings": earnings, | |
"redeemed": redemption, | |
"redeemed_amount": earnings if redemption else 0, | |
"num_mech_calls": num_mech_calls, | |
"mech_fee_amount": num_mech_calls * DEFAULT_MECH_FEE, | |
"net_earnings": net_earnings, | |
"roi": net_earnings | |
/ (collateral_amount + fee_amount + num_mech_calls * DEFAULT_MECH_FEE), | |
} | |
except Exception as e: | |
print(f"Error processing trade {i}: {e}") | |
continue | |
return trades_df | |
def analyse_all_traders(trades: pd.DataFrame, tools: pd.DataFrame) -> pd.DataFrame: | |
"""Analyse all creators.""" | |
all_traders = [] | |
for trader in tqdm( | |
trades["trader_address"].unique(), | |
total=len(trades["trader_address"].unique()), | |
desc="Analysing creators", | |
): | |
all_traders.append(analyse_trader(trader, trades, tools)) | |
# concat all creators | |
all_creators_df = pd.concat(all_traders) | |
return all_creators_df | |
def summary_analyse(df): | |
"""Summarise profitability analysis.""" | |
# Ensure DataFrame is not empty | |
if df.empty: | |
return pd.DataFrame(columns=SUMMARY_STATS_DF_COLS) | |
# Group by trader_address | |
grouped = df.groupby("trader_address") | |
# Create summary DataFrame | |
summary_df = grouped.agg( | |
num_trades=("trader_address", "size"), | |
num_winning_trades=("winning_trade", lambda x: float((x).sum())), | |
num_redeemed=("redeemed", lambda x: float(x.sum())), | |
total_investment=("collateral_amount", "sum"), | |
total_trade_fees=("trade_fee_amount", "sum"), | |
num_mech_calls=("num_mech_calls", "sum"), | |
total_mech_fees=("mech_fee_amount", "sum"), | |
total_earnings=("earnings", "sum"), | |
total_redeemed_amount=("redeemed_amount", "sum"), | |
total_net_earnings=("net_earnings", "sum"), | |
) | |
# Calculating additional columns | |
summary_df["total_roi"] = ( | |
summary_df["total_net_earnings"] / summary_df["total_investment"] | |
) | |
summary_df["mean_mech_calls_per_trade"] = ( | |
summary_df["num_mech_calls"] / summary_df["num_trades"] | |
) | |
summary_df["mean_mech_fee_amount_per_trade"] = ( | |
summary_df["total_mech_fees"] / summary_df["num_trades"] | |
) | |
summary_df["total_net_earnings_wo_mech_fees"] = ( | |
summary_df["total_net_earnings"] + summary_df["total_mech_fees"] | |
) | |
summary_df["total_roi_wo_mech_fees"] = ( | |
summary_df["total_net_earnings_wo_mech_fees"] / summary_df["total_investment"] | |
) | |
# Resetting index to include trader_address | |
summary_df.reset_index(inplace=True) | |
return summary_df | |
def run_profitability_analysis( | |
rpc: str, | |
tools_filename: str = "tools.parquet", | |
trades_filename: str = "fpmmTrades.parquet", | |
from_timestamp: float = DEFAULT_60_DAYS_AGO_TIMESTAMP, | |
): | |
"""Create all trades analysis.""" | |
# load dfs from data folder for analysis | |
print(f"Preparing data with {tools_filename} and {trades_filename}") | |
fpmmTrades, tools = prepare_profitalibity_data( | |
rpc, tools_filename, trades_filename, from_timestamp | |
) | |
tools["trader_address"] = tools["trader_address"].str.lower() | |
print(f"List of market creators = {trades["trader_address"].unique()}") | |
# all trades profitability df | |
print("Analysing trades...") | |
all_trades_df = analyse_all_traders(fpmmTrades, tools) | |
# filter invalid markets. Condition: "is_invalid" is True | |
invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True] | |
invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False) | |
all_trades_df = all_trades_df.loc[all_trades_df["is_invalid"] == False] | |
# summarize profitability df | |
print("Summarising trades...") | |
summary_df = summary_analyse(all_trades_df) | |
# save to parquet | |
all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False) | |
summary_df.to_parquet(DATA_DIR / "summary_profitability.parquet", index=False) | |
print("Done!") | |
return all_trades_df, summary_df | |
if __name__ == "__main__": | |
rpc = "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a" | |
if os.path.exists(DATA_DIR / "fpmmTrades.parquet"): | |
os.remove(DATA_DIR / "fpmmTrades.parquet") | |
run_profitability_analysis(rpc) | |