|
import pandas as pd |
|
import ujson as json |
|
import gc |
|
import numpy as np |
|
from concurrent.futures import ProcessPoolExecutor |
|
import multiprocessing as mp |
|
from pymongo import MongoClient |
|
from collections import defaultdict |
|
from pathlib import Path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_data_mongo(file_path, num_workers=None): |
|
"""Read JSON file using parallel processing""" |
|
if num_workers is None: |
|
num_workers = max(1, mp.cpu_count() - 1) |
|
|
|
print(f"Reading {file_path}...") |
|
conn_str = "mongodb://Mtalha:[email protected]/" |
|
|
|
client = MongoClient(conn_str) |
|
databases = client.list_database_names() |
|
db_client=client["Yelp"] |
|
|
|
|
|
|
|
try: |
|
|
|
collection = db_client[file_path] |
|
documents = collection.find({}, {"_id": 0}) |
|
data = list(documents) |
|
final_dict=defaultdict(list) |
|
|
|
for dictt in data: |
|
for k,v in dictt.items(): |
|
final_dict[k].append(v) |
|
df=pd.DataFrame(final_dict) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
|
|
print("ERROR WHILE READING FILES FORM MONGODB AS : ",e) |
|
print(f"Finished reading. DataFrame shape: {df.shape}") |
|
return df |
|
|
|
def process_datasets(output_path,filename): |
|
|
|
file_paths = { |
|
'business': "yelp_academic_dataset_business", |
|
'checkin': "yelp_academic_dataset_checkin", |
|
'review': "yelp_academic_dataset_review", |
|
'tip': "yelp_academic_dataset_tip", |
|
'user': "yelp_academic_dataset_user", |
|
'google': "google_review_dataset" |
|
} |
|
|
|
|
|
print("Reading datasets...") |
|
dfs = {} |
|
for name, path in file_paths.items(): |
|
print(f"Processing {name} dataset...") |
|
dfs[name] = read_data_mongo(path) |
|
print(f"Finished reading {name} dataset. Shape: {dfs[name].shape}") |
|
|
|
print("All files read. Starting column renaming...") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
dfs['review'] = dfs['review'].rename(columns={ |
|
'date': 'review_date', |
|
'stars': 'review_stars', |
|
'text': 'review_text', |
|
'useful': 'review_useful', |
|
'funny': 'review_funny', |
|
'cool': 'review_cool' |
|
}) |
|
|
|
|
|
|
|
dfs['tip'] = dfs['tip'].rename(columns={ |
|
'date': 'tip_date', |
|
'text': 'tip_text', |
|
'compliment_count': 'tip_compliment_count' |
|
}) |
|
|
|
|
|
dfs['checkin'] = dfs['checkin'].rename(columns={ |
|
'date': 'checkin_date' |
|
}) |
|
|
|
|
|
dfs['user'] = dfs['user'].rename(columns={ |
|
'name': 'user_name', |
|
'review_count': 'user_review_count', |
|
'useful': 'user_useful', |
|
'funny': 'user_funny', |
|
'cool': 'user_cool' |
|
}) |
|
|
|
|
|
dfs['business'] = dfs['business'].rename(columns={ |
|
'name': 'business_name', |
|
'stars': 'business_stars', |
|
'review_count': 'business_review_count' |
|
}) |
|
dfs['google'] = dfs['google'].rename(columns={ |
|
'name': 'business_name', |
|
'stars': 'business_stars', |
|
'review_count': 'business_review_count' |
|
}) |
|
df_business_final= dfs['business'] |
|
df_google_final=dfs['google'] |
|
df_review_final=dfs['review'] |
|
df_tip_final=dfs['tip'] |
|
df_checkin_final=dfs['checkin'] |
|
df_user_final=dfs['user'] |
|
|
|
|
|
df_business_final=pd.concat([df_business_final,df_google_final],axis=0) |
|
df_business_final.reset_index(drop=True,inplace=True) |
|
|
|
|
|
|
|
|
|
print("Starting merge process...") |
|
|
|
|
|
print("Step 1: Starting with reviews...") |
|
merged_df = df_review_final |
|
|
|
|
|
print("Step 2: Merging with business data...") |
|
merged_df = merged_df.merge( |
|
df_business_final, |
|
on='business_id', |
|
how='left' |
|
) |
|
|
|
|
|
print("Step 3: Merging with user data...") |
|
merged_df = merged_df.merge( |
|
df_user_final, |
|
on='user_id', |
|
how='left' |
|
) |
|
|
|
|
|
print("Step 4: Merging with checkin data...") |
|
merged_df = merged_df.merge( |
|
df_checkin_final, |
|
on='business_id', |
|
how='left' |
|
) |
|
|
|
|
|
print("Step 5: Aggregating and merging tip data...") |
|
tip_agg = df_tip_final.groupby('business_id').agg({ |
|
'tip_compliment_count': 'sum', |
|
'tip_text': 'count' |
|
}).rename(columns={'tip_text': 'tip_count'}) |
|
|
|
merged_df = merged_df.merge( |
|
tip_agg, |
|
on='business_id', |
|
how='left' |
|
) |
|
|
|
|
|
|
|
print("Filling NaN values...") |
|
merged_df['tip_count'] = merged_df['tip_count'].fillna(0) |
|
merged_df['tip_compliment_count'] = merged_df['tip_compliment_count'].fillna(0) |
|
merged_df['checkin_date'] = merged_df['checkin_date'].fillna('') |
|
merged_df["friends"].fillna(0,inplace=True) |
|
|
|
for col in merged_df.columns: |
|
if merged_df[col].isnull().sum()>0: |
|
print(f" {col} has {merged_df[col].isnull().sum()} null values") |
|
|
|
|
|
print("Shape of Merged Dataset is : ",merged_df.shape) |
|
output_file = Path(output_path) / filename |
|
print("COLUMNS BEFORE PREPROCESING") |
|
print() |
|
print(merged_df.info()) |
|
for col in merged_df.columns: |
|
for v in merged_df[col]: |
|
print(f"Type of values in {col} is {type(v)} and values are like : {v}") |
|
break |
|
merged_df.to_csv(output_file,index=False) |
|
|
|
|
|
|
|
|
|
return merged_df |
|
|
|
|
|
|
|
|