evijit HF Staff commited on
Commit
a0cc89c
·
verified ·
1 Parent(s): f555918

Delete preprocess.py

Browse files
Files changed (1) hide show
  1. preprocess.py +0 -371
preprocess.py DELETED
@@ -1,371 +0,0 @@
1
- # --- START OF FILE preprocess.py ---
2
-
3
- import pandas as pd
4
- import numpy as np
5
- import json
6
- import ast
7
- from tqdm.auto import tqdm
8
- import time
9
- import os
10
- import duckdb
11
- import re # Import re for the manual regex check in debug
12
-
13
- # --- Constants ---
14
- PROCESSED_PARQUET_FILE_PATH = "models_processed.parquet"
15
- HF_PARQUET_URL = 'https://huggingface.co/datasets/cfahlgren1/hub-stats/resolve/main/models.parquet'
16
-
17
- MODEL_SIZE_RANGES = {
18
- "Small (<1GB)": (0, 1),
19
- "Medium (1-5GB)": (1, 5),
20
- "Large (5-20GB)": (5, 20),
21
- "X-Large (20-50GB)": (20, 50),
22
- "XX-Large (>50GB)": (50, float('inf'))
23
- }
24
-
25
- # --- Debugging Constant ---
26
- # <<<<<<< SET THE MODEL ID YOU WANT TO DEBUG HERE >>>>>>>
27
- MODEL_ID_TO_DEBUG = "openvla/openvla-7b"
28
- # Example: MODEL_ID_TO_DEBUG = "openai-community/gpt2"
29
- # If you don't have a specific ID, the debug block will just report it's not found.
30
-
31
- # --- Utility Functions (extract_model_file_size_gb, extract_org_from_id, process_tags_for_series, get_file_size_category - unchanged from previous correct version) ---
32
- def extract_model_file_size_gb(safetensors_data):
33
- try:
34
- if pd.isna(safetensors_data): return 0.0
35
- data_to_parse = safetensors_data
36
- if isinstance(safetensors_data, str):
37
- try:
38
- if (safetensors_data.startswith('{') and safetensors_data.endswith('}')) or \
39
- (safetensors_data.startswith('[') and safetensors_data.endswith(']')):
40
- data_to_parse = ast.literal_eval(safetensors_data)
41
- else: data_to_parse = json.loads(safetensors_data)
42
- except Exception: return 0.0
43
- if isinstance(data_to_parse, dict) and 'total' in data_to_parse:
44
- total_bytes_val = data_to_parse['total']
45
- try:
46
- size_bytes = float(total_bytes_val)
47
- return size_bytes / (1024 * 1024 * 1024)
48
- except (ValueError, TypeError): return 0.0
49
- return 0.0
50
- except Exception: return 0.0
51
-
52
- def extract_org_from_id(model_id):
53
- if pd.isna(model_id): return "unaffiliated"
54
- model_id_str = str(model_id)
55
- return model_id_str.split("/")[0] if "/" in model_id_str else "unaffiliated"
56
-
57
- def process_tags_for_series(series_of_tags_values):
58
- processed_tags_accumulator = []
59
-
60
- for i, tags_value_from_series in enumerate(tqdm(series_of_tags_values, desc="Standardizing Tags", leave=False, unit="row")):
61
- temp_processed_list_for_row = []
62
- current_value_for_error_msg = str(tags_value_from_series)[:200] # Truncate for long error messages
63
-
64
- try:
65
- # Order of checks is important!
66
- # 1. Handle explicit Python lists first
67
- if isinstance(tags_value_from_series, list):
68
- current_tags_in_list = []
69
- for idx_tag, tag_item in enumerate(tags_value_from_series):
70
- try:
71
- # Ensure item is not NaN before string conversion if it might be a float NaN in a list
72
- if pd.isna(tag_item): continue
73
- str_tag = str(tag_item)
74
- stripped_tag = str_tag.strip()
75
- if stripped_tag:
76
- current_tags_in_list.append(stripped_tag)
77
- except Exception as e_inner_list_proc:
78
- print(f"ERROR processing item '{tag_item}' (type: {type(tag_item)}) within a list for row {i}. Error: {e_inner_list_proc}. Original list: {current_value_for_error_msg}")
79
- temp_processed_list_for_row = current_tags_in_list
80
-
81
- # 2. Handle NumPy arrays
82
- elif isinstance(tags_value_from_series, np.ndarray):
83
- # Convert to list, then process elements, handling potential NaNs within the array
84
- current_tags_in_list = []
85
- for idx_tag, tag_item in enumerate(tags_value_from_series.tolist()): # .tolist() is crucial
86
- try:
87
- if pd.isna(tag_item): continue # Check for NaN after converting to Python type
88
- str_tag = str(tag_item)
89
- stripped_tag = str_tag.strip()
90
- if stripped_tag:
91
- current_tags_in_list.append(stripped_tag)
92
- except Exception as e_inner_array_proc:
93
- print(f"ERROR processing item '{tag_item}' (type: {type(tag_item)}) within a NumPy array for row {i}. Error: {e_inner_array_proc}. Original array: {current_value_for_error_msg}")
94
- temp_processed_list_for_row = current_tags_in_list
95
-
96
- # 3. Handle simple None or pd.NA after lists and arrays (which might contain pd.NA elements handled above)
97
- elif tags_value_from_series is None or pd.isna(tags_value_from_series): # Now pd.isna is safe for scalars
98
- temp_processed_list_for_row = []
99
-
100
- # 4. Handle strings (could be JSON-like, list-like, or comma-separated)
101
- elif isinstance(tags_value_from_series, str):
102
- processed_str_tags = []
103
- # Attempt ast.literal_eval for strings that look like lists/tuples
104
- if (tags_value_from_series.startswith('[') and tags_value_from_series.endswith(']')) or \
105
- (tags_value_from_series.startswith('(') and tags_value_from_series.endswith(')')):
106
- try:
107
- evaluated_tags = ast.literal_eval(tags_value_from_series)
108
- if isinstance(evaluated_tags, (list, tuple)): # Check if eval result is a list/tuple
109
- # Recursively process this evaluated list/tuple, as its elements could be complex
110
- # For simplicity here, assume elements are simple strings after eval
111
- current_eval_list = []
112
- for tag_item in evaluated_tags:
113
- if pd.isna(tag_item): continue
114
- str_tag = str(tag_item).strip()
115
- if str_tag: current_eval_list.append(str_tag)
116
- processed_str_tags = current_eval_list
117
- except (ValueError, SyntaxError):
118
- pass # If ast.literal_eval fails, let it fall to JSON or comma split
119
-
120
- # If ast.literal_eval didn't populate, try JSON
121
- if not processed_str_tags:
122
- try:
123
- json_tags = json.loads(tags_value_from_series)
124
- if isinstance(json_tags, list):
125
- # Similar to above, assume elements are simple strings after JSON parsing
126
- current_json_list = []
127
- for tag_item in json_tags:
128
- if pd.isna(tag_item): continue
129
- str_tag = str(tag_item).strip()
130
- if str_tag: current_json_list.append(str_tag)
131
- processed_str_tags = current_json_list
132
- except json.JSONDecodeError:
133
- # If not a valid JSON list, fall back to comma splitting as the final string strategy
134
- processed_str_tags = [tag.strip() for tag in tags_value_from_series.split(',') if tag.strip()]
135
- except Exception as e_json_other:
136
- print(f"ERROR during JSON processing for string '{current_value_for_error_msg}' for row {i}. Error: {e_json_other}")
137
- processed_str_tags = [tag.strip() for tag in tags_value_from_series.split(',') if tag.strip()] # Fallback
138
-
139
- temp_processed_list_for_row = processed_str_tags
140
-
141
- # 5. Fallback for other scalar types (e.g., int, float that are not NaN)
142
- else:
143
- # This path is for non-list, non-ndarray, non-None/NaN, non-string types.
144
- # Or for NaNs that slipped through if they are not None or pd.NA (e.g. float('nan'))
145
- if pd.isna(tags_value_from_series): # Catch any remaining NaNs like float('nan')
146
- temp_processed_list_for_row = []
147
- else:
148
- str_val = str(tags_value_from_series).strip()
149
- temp_processed_list_for_row = [str_val] if str_val else []
150
-
151
- processed_tags_accumulator.append(temp_processed_list_for_row)
152
-
153
- except Exception as e_outer_tag_proc:
154
- print(f"CRITICAL UNHANDLED ERROR processing row {i}: value '{current_value_for_error_msg}' (type: {type(tags_value_from_series)}). Error: {e_outer_tag_proc}. Appending [].")
155
- processed_tags_accumulator.append([])
156
-
157
- return processed_tags_accumulator
158
-
159
- def get_file_size_category(file_size_gb_val):
160
- try:
161
- numeric_file_size_gb = float(file_size_gb_val)
162
- if pd.isna(numeric_file_size_gb): numeric_file_size_gb = 0.0
163
- except (ValueError, TypeError): numeric_file_size_gb = 0.0
164
- if 0 <= numeric_file_size_gb < 1: return "Small (<1GB)"
165
- elif 1 <= numeric_file_size_gb < 5: return "Medium (1-5GB)"
166
- elif 5 <= numeric_file_size_gb < 20: return "Large (5-20GB)"
167
- elif 20 <= numeric_file_size_gb < 50: return "X-Large (20-50GB)"
168
- elif numeric_file_size_gb >= 50: return "XX-Large (>50GB)"
169
- else: return "Small (<1GB)"
170
-
171
-
172
- def main_preprocessor():
173
- print(f"Starting pre-processing script. Output: '{PROCESSED_PARQUET_FILE_PATH}'.")
174
- overall_start_time = time.time()
175
-
176
- print(f"Fetching fresh data from Hugging Face: {HF_PARQUET_URL}")
177
- try:
178
- fetch_start_time = time.time()
179
- query = f"SELECT * FROM read_parquet('{HF_PARQUET_URL}')"
180
- df_raw = duckdb.sql(query).df()
181
- data_download_timestamp = pd.Timestamp.now(tz='UTC')
182
-
183
- if df_raw is None or df_raw.empty: raise ValueError("Fetched data is empty or None.")
184
- if 'id' not in df_raw.columns: raise ValueError("Fetched data must contain 'id' column.")
185
-
186
- print(f"Fetched data in {time.time() - fetch_start_time:.2f}s. Rows: {len(df_raw)}. Downloaded at: {data_download_timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')}")
187
- except Exception as e_fetch:
188
- print(f"ERROR: Could not fetch data from Hugging Face: {e_fetch}.")
189
- return
190
-
191
- df = pd.DataFrame()
192
- print("Processing raw data...")
193
- proc_start = time.time()
194
-
195
- expected_cols_setup = {
196
- 'id': str, 'downloads': float, 'downloadsAllTime': float, 'likes': float,
197
- 'pipeline_tag': str, 'tags': object, 'safetensors': object
198
- }
199
- for col_name, target_dtype in expected_cols_setup.items():
200
- if col_name in df_raw.columns:
201
- df[col_name] = df_raw[col_name]
202
- if target_dtype == float: df[col_name] = pd.to_numeric(df[col_name], errors='coerce').fillna(0.0)
203
- elif target_dtype == str: df[col_name] = df[col_name].astype(str).fillna('')
204
- else:
205
- if col_name in ['downloads', 'downloadsAllTime', 'likes']: df[col_name] = 0.0
206
- elif col_name == 'pipeline_tag': df[col_name] = ''
207
- elif col_name == 'tags': df[col_name] = pd.Series([[] for _ in range(len(df_raw))]) # Initialize with empty lists
208
- elif col_name == 'safetensors': df[col_name] = None # Initialize with None
209
- elif col_name == 'id': print("CRITICAL ERROR: 'id' column missing."); return
210
-
211
- output_filesize_col_name = 'params'
212
- if output_filesize_col_name in df_raw.columns and pd.api.types.is_numeric_dtype(df_raw[output_filesize_col_name]):
213
- print(f"Using pre-existing '{output_filesize_col_name}' column as file size in GB.")
214
- df[output_filesize_col_name] = pd.to_numeric(df_raw[output_filesize_col_name], errors='coerce').fillna(0.0)
215
- elif 'safetensors' in df.columns:
216
- print(f"Calculating '{output_filesize_col_name}' (file size in GB) from 'safetensors' data...")
217
- df[output_filesize_col_name] = df['safetensors'].apply(extract_model_file_size_gb)
218
- df[output_filesize_col_name] = pd.to_numeric(df[output_filesize_col_name], errors='coerce').fillna(0.0)
219
- else:
220
- print(f"Cannot determine file size. Setting '{output_filesize_col_name}' to 0.0.")
221
- df[output_filesize_col_name] = 0.0
222
-
223
- df['data_download_timestamp'] = data_download_timestamp
224
- print(f"Added 'data_download_timestamp' column.")
225
-
226
- print("Categorizing models by file size...")
227
- df['size_category'] = df[output_filesize_col_name].apply(get_file_size_category)
228
-
229
- print("Standardizing 'tags' column...")
230
- df['tags'] = process_tags_for_series(df['tags']) # This now uses tqdm internally
231
-
232
- # --- START DEBUGGING BLOCK ---
233
- # This block will execute before the main tag processing loop
234
- if MODEL_ID_TO_DEBUG and MODEL_ID_TO_DEBUG in df['id'].values: # Check if ID exists
235
- print(f"\n--- Pre-Loop Debugging for Model ID: {MODEL_ID_TO_DEBUG} ---")
236
-
237
- # 1. Check the 'tags' column content after process_tags_for_series
238
- model_specific_tags_list = df.loc[df['id'] == MODEL_ID_TO_DEBUG, 'tags'].iloc[0]
239
- print(f"1. Tags from df['tags'] (after process_tags_for_series): {model_specific_tags_list}")
240
- print(f" Type of tags: {type(model_specific_tags_list)}")
241
- if isinstance(model_specific_tags_list, list):
242
- for i, tag_item in enumerate(model_specific_tags_list):
243
- print(f" Tag item {i}: '{tag_item}' (type: {type(tag_item)}, len: {len(str(tag_item))})")
244
- # Detailed check for 'robotics' specifically
245
- if 'robotics' in str(tag_item).lower():
246
- print(f" DEBUG: Found 'robotics' substring in '{tag_item}'")
247
- print(f" - str(tag_item).lower().strip(): '{str(tag_item).lower().strip()}'")
248
- print(f" - Is it exactly 'robotics'?: {str(tag_item).lower().strip() == 'robotics'}")
249
- print(f" - Ordinals: {[ord(c) for c in str(tag_item)]}")
250
-
251
- # 2. Simulate temp_tags_joined for this specific model
252
- if isinstance(model_specific_tags_list, list):
253
- simulated_temp_tags_joined = '~~~'.join(str(t).lower().strip() for t in model_specific_tags_list if pd.notna(t) and str(t).strip())
254
- else:
255
- simulated_temp_tags_joined = ''
256
- print(f"2. Simulated 'temp_tags_joined' for this model: '{simulated_temp_tags_joined}'")
257
-
258
- # 3. Simulate 'has_robot' check for this model
259
- robot_keywords = ['robot', 'robotics']
260
- robot_pattern = '|'.join(robot_keywords)
261
- manual_robot_check = bool(re.search(robot_pattern, simulated_temp_tags_joined, flags=re.IGNORECASE))
262
- print(f"3. Manual regex check for 'has_robot' ('{robot_pattern}' in '{simulated_temp_tags_joined}'): {manual_robot_check}")
263
- print(f"--- End Pre-Loop Debugging for Model ID: {MODEL_ID_TO_DEBUG} ---\n")
264
- elif MODEL_ID_TO_DEBUG:
265
- print(f"DEBUG: Model ID '{MODEL_ID_TO_DEBUG}' not found in DataFrame for pre-loop debugging.")
266
- # --- END DEBUGGING BLOCK ---
267
-
268
-
269
- print("Vectorized creation of cached tag columns...")
270
- tag_time = time.time()
271
- # This is the original temp_tags_joined creation:
272
- df['temp_tags_joined'] = df['tags'].apply(
273
- lambda tl: '~~~'.join(str(t).lower().strip() for t in tl if pd.notna(t) and str(t).strip()) if isinstance(tl, list) else ''
274
- )
275
-
276
- tag_map = {
277
- 'has_audio': ['audio'], 'has_speech': ['speech'], 'has_music': ['music'],
278
- 'has_robot': ['robot', 'robotics','openvla','vla'],
279
- 'has_bio': ['bio'], 'has_med': ['medic', 'medical'],
280
- 'has_series': ['series', 'time-series', 'timeseries'],
281
- 'has_video': ['video'], 'has_image': ['image', 'vision'],
282
- 'has_text': ['text', 'nlp', 'llm']
283
- }
284
- for col, kws in tag_map.items():
285
- pattern = '|'.join(kws)
286
- df[col] = df['temp_tags_joined'].str.contains(pattern, na=False, case=False, regex=True)
287
-
288
- df['has_science'] = (
289
- df['temp_tags_joined'].str.contains('science', na=False, case=False, regex=True) &
290
- ~df['temp_tags_joined'].str.contains('bigscience', na=False, case=False, regex=True)
291
- )
292
- del df['temp_tags_joined'] # Clean up temporary column
293
- df['is_audio_speech'] = (df['has_audio'] | df['has_speech'] |
294
- df['pipeline_tag'].str.contains('audio|speech', case=False, na=False, regex=True))
295
- df['is_biomed'] = df['has_bio'] | df['has_med']
296
- print(f"Vectorized tag columns created in {time.time() - tag_time:.2f}s.")
297
-
298
- # --- POST-LOOP DIAGNOSTIC for has_robot & a specific model ---
299
- if 'has_robot' in df.columns:
300
- print("\n--- 'has_robot' Diagnostics (Preprocessor - Post-Loop) ---")
301
- print(df['has_robot'].value_counts(dropna=False))
302
-
303
- if MODEL_ID_TO_DEBUG and MODEL_ID_TO_DEBUG in df['id'].values:
304
- model_has_robot_val = df.loc[df['id'] == MODEL_ID_TO_DEBUG, 'has_robot'].iloc[0]
305
- print(f"Value of 'has_robot' for model '{MODEL_ID_TO_DEBUG}': {model_has_robot_val}")
306
- if model_has_robot_val:
307
- print(f" Original tags for '{MODEL_ID_TO_DEBUG}': {df.loc[df['id'] == MODEL_ID_TO_DEBUG, 'tags'].iloc[0]}")
308
-
309
- if df['has_robot'].any():
310
- print("Sample models flagged as 'has_robot':")
311
- print(df[df['has_robot']][['id', 'tags', 'has_robot']].head(5))
312
- else:
313
- print("No models were flagged as 'has_robot' after processing.")
314
- print("--------------------------------------------------------\n")
315
- # --- END POST-LOOP DIAGNOSTIC ---
316
-
317
-
318
- print("Adding organization column...")
319
- df['organization'] = df['id'].apply(extract_org_from_id)
320
-
321
- # Drop safetensors if params was calculated from it, and params didn't pre-exist as numeric
322
- if 'safetensors' in df.columns and \
323
- not (output_filesize_col_name in df_raw.columns and pd.api.types.is_numeric_dtype(df_raw[output_filesize_col_name])):
324
- df = df.drop(columns=['safetensors'], errors='ignore')
325
-
326
- final_expected_cols = [
327
- 'id', 'downloads', 'downloadsAllTime', 'likes', 'pipeline_tag', 'tags',
328
- 'params', 'size_category', 'organization',
329
- 'has_audio', 'has_speech', 'has_music', 'has_robot', 'has_bio', 'has_med',
330
- 'has_series', 'has_video', 'has_image', 'has_text', 'has_science',
331
- 'is_audio_speech', 'is_biomed',
332
- 'data_download_timestamp'
333
- ]
334
- # Ensure all final columns exist, adding defaults if necessary
335
- for col in final_expected_cols:
336
- if col not in df.columns:
337
- print(f"Warning: Final expected column '{col}' is missing! Defaulting appropriately.")
338
- if col == 'params': df[col] = 0.0
339
- elif col == 'size_category': df[col] = "Small (<1GB)" # Default size category
340
- elif 'has_' in col or 'is_' in col : df[col] = False # Default boolean flags to False
341
- elif col == 'data_download_timestamp': df[col] = pd.NaT # Default timestamp to NaT
342
-
343
- print(f"Data processing completed in {time.time() - proc_start:.2f}s.")
344
- try:
345
- print(f"Saving processed data to: {PROCESSED_PARQUET_FILE_PATH}")
346
- df_to_save = df[final_expected_cols].copy() # Ensure only expected columns are saved
347
- df_to_save.to_parquet(PROCESSED_PARQUET_FILE_PATH, index=False, engine='pyarrow')
348
- print(f"Successfully saved processed data.")
349
- except Exception as e_save:
350
- print(f"ERROR: Could not save processed data: {e_save}")
351
- return
352
-
353
- total_elapsed_script = time.time() - overall_start_time
354
- print(f"Pre-processing finished. Total time: {total_elapsed_script:.2f}s. Final Parquet shape: {df_to_save.shape}")
355
-
356
- if __name__ == "__main__":
357
- if os.path.exists(PROCESSED_PARQUET_FILE_PATH):
358
- print(f"Deleting existing '{PROCESSED_PARQUET_FILE_PATH}' to ensure fresh processing...")
359
- try: os.remove(PROCESSED_PARQUET_FILE_PATH)
360
- except OSError as e: print(f"Error deleting file: {e}. Please delete manually and rerun."); exit()
361
-
362
- main_preprocessor()
363
-
364
- if os.path.exists(PROCESSED_PARQUET_FILE_PATH):
365
- print(f"\nTo verify, load parquet and check 'has_robot' and its 'tags':")
366
- print(f"import pandas as pd; df_chk = pd.read_parquet('{PROCESSED_PARQUET_FILE_PATH}')")
367
- print(f"print(df_chk['has_robot'].value_counts())")
368
- if MODEL_ID_TO_DEBUG:
369
- print(f"print(df_chk[df_chk['id'] == '{MODEL_ID_TO_DEBUG}'][['id', 'tags', 'has_robot']])")
370
- else:
371
- print(f"print(df_chk[df_chk['has_robot']][['id', 'tags', 'has_robot']].head())")