|
|
|
|
|
import pandas as pd |
|
import assets.text_content as tc |
|
import calendar |
|
from typing import Union, List |
|
from datetime import datetime |
|
|
|
current_year = str(datetime.now().year) |
|
|
|
def filter_cols(df): |
|
|
|
df = df[[ |
|
tc.MODEL_NAME, |
|
tc.CLEMSCORE, |
|
tc.INPUT, |
|
tc.OUTPUT, |
|
tc.LATENCY, |
|
tc.CONTEXT, |
|
tc.PARAMS, |
|
tc.RELEASE_DATE, |
|
tc.LICENSE |
|
]] |
|
|
|
return df |
|
|
|
|
|
def convert_date_components_to_timestamp(year: str, month: str) -> int: |
|
"""Convert year and month strings to timestamp.""" |
|
|
|
date_str = f"{year}-{month:02d}-01" |
|
return int(pd.to_datetime(date_str).timestamp()) |
|
|
|
def filter_by_date(df: pd.DataFrame, |
|
start_year, start_month, |
|
end_year, end_month, |
|
date_column: str = tc.RELEASE_DATE) -> pd.DataFrame: |
|
""" |
|
Filter DataFrame by date range using separate year and month components. |
|
""" |
|
|
|
if not start_year: |
|
start_year = tc.START_YEAR |
|
if not end_year: |
|
end_year = current_year |
|
|
|
if not start_month: |
|
start_month = "January" |
|
if not end_month: |
|
end_month = "December" |
|
|
|
try: |
|
|
|
start_timestamp = convert_date_components_to_timestamp( |
|
int(start_year), |
|
int(tc.MONTH_MAP[start_month]) |
|
) |
|
|
|
end_timestamp = convert_date_components_to_timestamp( |
|
int(end_year), |
|
int(tc.MONTH_MAP[end_month]) |
|
) |
|
|
|
|
|
date_timestamps = pd.to_datetime(df[date_column]).apply(lambda x: int(x.timestamp())) |
|
|
|
|
|
return df[ |
|
(date_timestamps >= start_timestamp) & |
|
(date_timestamps <= end_timestamp) |
|
] |
|
except (ValueError, TypeError) as e: |
|
print(f"Error processing dates: {e}") |
|
return df |
|
|
|
|
|
def filter(df, language_list, parameters, input_price, output_price, multimodal, |
|
context, open_weight, |
|
start_year, start_month, end_year, end_month, |
|
license ): |
|
|
|
|
|
if not df.empty: |
|
df = df[df[tc.LANGS].apply(lambda x: all(lang in x for lang in language_list))] |
|
|
|
if not df.empty: |
|
|
|
open_weight_true = df[ |
|
(df[tc.OPEN_WEIGHT] == True) & |
|
(~df[tc.PARAMS].isna()) |
|
] |
|
open_weight_false = df[ |
|
(df[tc.OPEN_WEIGHT] == False) | |
|
(df[tc.PARAMS].isna()) | |
|
(~df.index.isin(open_weight_true.index)) |
|
] |
|
|
|
|
|
assert len(df) == len(open_weight_true) + len(open_weight_false), "Data loss detected" |
|
assert len(set(open_weight_true.index) & set(open_weight_false.index)) == 0, "Duplicate entries detected" |
|
|
|
|
|
if not open_weight_true.empty: |
|
filtered_open = open_weight_true[ |
|
(open_weight_true[tc.PARAMS] >= parameters[0]) & |
|
(open_weight_true[tc.PARAMS] <= parameters[1]) |
|
] |
|
|
|
|
|
df = pd.concat([filtered_open, open_weight_false]) |
|
|
|
if not df.empty: |
|
df = df[(df[tc.INPUT] >= input_price[0]) & (df[tc.INPUT] <= input_price[1])] |
|
|
|
if not df.empty: |
|
df = df[(df[tc.OUTPUT] >= output_price[0]) & (df[tc.OUTPUT] <= output_price[1])] |
|
|
|
if not df.empty: |
|
if tc.SINGLE_IMG in multimodal: |
|
df = df[df[tc.SINGLE_IMG] == True] |
|
if tc.MULT_IMG in multimodal: |
|
df = df[df[tc.MULT_IMG] == True] |
|
if tc.AUDIO in multimodal: |
|
df = df[df[tc.AUDIO] == True] |
|
if tc.VIDEO in multimodal: |
|
df = df[df[tc.VIDEO] == True] |
|
|
|
|
|
|
|
|
|
if not df.empty: |
|
if tc.OPEN in open_weight and tc.COMM not in open_weight: |
|
df = df[df[tc.OPEN_WEIGHT] == True] |
|
elif tc.COMM in open_weight and tc.OPEN not in open_weight: |
|
df = df[df[tc.OPEN_WEIGHT] == False] |
|
elif tc.OPEN not in open_weight and tc.COMM not in open_weight: |
|
|
|
df = pd.DataFrame(columns=df.columns) |
|
|
|
if not df.empty: |
|
df = df[df[tc.LICENSE_NAME].apply(lambda x: any(lic in x for lic in license))] |
|
|
|
df = filter_by_date(df, start_year, start_month, end_year, end_month, tc.TEMP_DATE) |
|
|
|
df = filter_cols(df) |
|
df = df.sort_values(by=tc.CLEMSCORE, ascending=False) |
|
|
|
return df |
|
|
|
|
|
|