|
import pandas as pd |
|
import numpy as np |
|
from pathlib import Path |
|
import logging |
|
import sys |
|
from datetime import datetime |
|
import warnings |
|
import gc |
|
import json |
|
|
|
from loguru import logger |
|
from src.create_dataset import process_datasets |
|
from src.preprocessing import Preprocessor |
|
from src.clean_data import DataCleaner |
|
from src.feature_analyzer import FeatureAnalyzer |
|
from src.model_trainer import ModelTrainer |
|
from pathlib import Path |
|
|
|
|
|
def create_directories(): |
|
"""Create all necessary directories for the pipeline""" |
|
directories = { |
|
'combined_data': Path('output_files/combined_data'), |
|
'preprocessed': Path('output_files/cleaned_preprocessed_data'), |
|
'feature_analyzer': Path('output_files/feature_analysis'), |
|
'model_outputs': Path('output_files/model_outputs'), |
|
|
|
} |
|
|
|
for dir_path in directories.values(): |
|
dir_path.mkdir(parents=True, exist_ok=True) |
|
|
|
return directories |
|
|
|
def handle_memory(): |
|
"""Handle memory management""" |
|
gc.collect() |
|
warnings.filterwarnings('ignore') |
|
|
|
def save_pipeline_metrics(metrics: dict, filepath: Path): |
|
"""Save pipeline metrics to JSON file""" |
|
with open(filepath, 'w') as f: |
|
json.dump(metrics, f, indent=4, default=str) |
|
|
|
def start_pipelines(train_size=0.25): |
|
|
|
logger.info("STARTING YELP DATA ANALYSIS PIPELINES...") |
|
dirs = create_directories() |
|
logger.info("Created necessary directories") |
|
|
|
|
|
|
|
|
|
|
|
logger.info("Pipeline 1: Creating initial dataset...") |
|
try: |
|
filename="combined_merged_full.csv" |
|
df = process_datasets(output_path=dirs['combined_data'],filename=filename) |
|
|
|
logger.info(f"Dataset created successfully with shape: {df.shape}") |
|
except Exception as e: |
|
logger.error(f"Error in dataset creation: {str(e)}") |
|
|
|
|
|
|
|
|
|
try: |
|
logger.info("Pipeline 2: Preprocessing and Feature Engineering....") |
|
output_before_preprocess=Path(str(dirs['combined_data']) )/ "combined_merged_full.csv" |
|
df = pd.read_csv(output_before_preprocess) |
|
prep=Preprocessor(df) |
|
feature_engineered_df=prep.run_pipeline() |
|
|
|
except Exception as e: |
|
logger.error(f"Error in Pipeline 2 Preprocessing and Feature Engineering as : {e}") |
|
|
|
|
|
try: |
|
logger.info("Pipeline 3: Cleaning data...") |
|
filename="preprocessed_cleaned.csv" |
|
|
|
cleaner = DataCleaner(df=feature_engineered_df,output_path=str(dirs['preprocessed']),filename=filename) |
|
cleaner.run_pipeline() |
|
clean_output_file_path = Path(str(dirs['preprocessed']) )/ filename |
|
print("Preprocessed and Cleand data saved in ",clean_output_file_path) |
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
logger.error(f"Error in Pipeline 3 Cleaning Data : {str(e)}") |
|
|
|
|
|
|
|
try: |
|
logger.info("Pipeline 4: Analyzing features...") |
|
filename="preprocessed_cleaned.csv" |
|
preprocessed_clean_output_file=Path(str(dirs['preprocessed']) )/ filename |
|
preprocessed_clean_df=pd.read_csv(preprocessed_clean_output_file) |
|
|
|
analyzer = FeatureAnalyzer(df=preprocessed_clean_df,output_path=str(dirs['feature_analyzer'])) |
|
analyzer.run_pipeline() |
|
except Exception as e: |
|
logger.error(f"Error in Feature analysis: {str(e)}") |
|
raise |
|
|
|
|
|
try: |
|
logger.info("Pipeline 5 : Training and Evaluating Models...") |
|
filename="preprocessed_cleaned.csv" |
|
preprocessed_clean_output_file=Path(str(dirs['preprocessed']) )/ filename |
|
preprocessed_clean_df=pd.read_csv(preprocessed_clean_output_file) |
|
preprocessed_clean_df = preprocessed_clean_df.sample(frac=1, random_state=42).reset_index(drop=True) |
|
size=int(train_size*len(preprocessed_clean_df)) |
|
|
|
preprocessed_clean_df=preprocessed_clean_df.iloc[:size,:] |
|
|
|
|
|
|
|
trainer = ModelTrainer(df=preprocessed_clean_df,output_path=str(dirs['model_outputs']), epochs=50,test_size=0.3) |
|
trainer.train_and_evaluate() |
|
|
|
logger.info(f"Models training completed ") |
|
except Exception as e: |
|
logger.error(f"Error in Model Trainer: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|