File size: 4,203 Bytes
67b1c6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import pandas as pd
import numpy as np
from pathlib import Path
import logging
import sys
from datetime import datetime
import warnings
import gc
import json

from loguru import logger
from src.create_dataset import process_datasets
from src.preprocessing import Preprocessor
from src.clean_data import DataCleaner
from src.feature_analyzer import FeatureAnalyzer
from src.model_trainer import ModelTrainer
from pathlib import Path


def create_directories():
    """Create all necessary directories for the pipeline"""
    directories = {
        'combined_data': Path('output_files/combined_data'),
        'preprocessed': Path('output_files/cleaned_preprocessed_data'),
        'feature_analyzer': Path('output_files/feature_analysis'),
        'model_outputs': Path('output_files/model_outputs'),
        
    }
    
    for dir_path in directories.values():
        dir_path.mkdir(parents=True, exist_ok=True)
    
    return directories

def handle_memory():
    """Handle memory management"""
    gc.collect()
    warnings.filterwarnings('ignore')

def save_pipeline_metrics(metrics: dict, filepath: Path):
    """Save pipeline metrics to JSON file"""
    with open(filepath, 'w') as f:
        json.dump(metrics, f, indent=4, default=str)

def start_pipelines(train_size=0.25):
    # Setup logging
    logger.info("STARTING YELP DATA ANALYSIS PIPELINES...")
    dirs = create_directories()
    logger.info("Created necessary directories")
    
    


    
    logger.info("Pipeline 1: Creating initial dataset...")
    try:
        filename="combined_merged_full.csv"
        df = process_datasets(output_path=dirs['combined_data'],filename=filename)
      
        logger.info(f"Dataset created successfully with shape: {df.shape}")
    except Exception as e:
        logger.error(f"Error in dataset creation: {str(e)}")



    
    try: 
        logger.info("Pipeline 2: Preprocessing and Feature Engineering....")
        output_before_preprocess=Path(str(dirs['combined_data']) )/  "combined_merged_full.csv"
        df = pd.read_csv(output_before_preprocess)
        prep=Preprocessor(df)
        feature_engineered_df=prep.run_pipeline()
        
    except Exception as e:
        logger.error(f"Error in Pipeline 2 Preprocessing and Feature Engineering as : {e}")

    
    try:
        logger.info("Pipeline 3: Cleaning data...")
        filename="preprocessed_cleaned.csv"
  
        cleaner = DataCleaner(df=feature_engineered_df,output_path=str(dirs['preprocessed']),filename=filename)
        cleaner.run_pipeline()
        clean_output_file_path = Path(str(dirs['preprocessed']) )/  filename
        print("Preprocessed and Cleand data saved in ",clean_output_file_path)
        
                
              

    except Exception as e:
        logger.error(f"Error in Pipeline 3 Cleaning Data : {str(e)}")
     

   
    try:
        logger.info("Pipeline 4: Analyzing features...")
        filename="preprocessed_cleaned.csv"
        preprocessed_clean_output_file=Path(str(dirs['preprocessed']) )/  filename
        preprocessed_clean_df=pd.read_csv(preprocessed_clean_output_file)
        
        analyzer = FeatureAnalyzer(df=preprocessed_clean_df,output_path=str(dirs['feature_analyzer']))
        analyzer.run_pipeline()
    except Exception as e:
        logger.error(f"Error in Feature analysis: {str(e)}")
        raise
    
    
    try:
        logger.info("Pipeline 5 : Training and Evaluating Models...")
        filename="preprocessed_cleaned.csv"
        preprocessed_clean_output_file=Path(str(dirs['preprocessed']) )/  filename
        preprocessed_clean_df=pd.read_csv(preprocessed_clean_output_file)
        preprocessed_clean_df = preprocessed_clean_df.sample(frac=1, random_state=42).reset_index(drop=True)
        size=int(train_size*len(preprocessed_clean_df))

        preprocessed_clean_df=preprocessed_clean_df.iloc[:size,:]
    

        
        trainer = ModelTrainer(df=preprocessed_clean_df,output_path=str(dirs['model_outputs']), epochs=50,test_size=0.3)
        trainer.train_and_evaluate()

        logger.info(f"Models training completed ")
    except Exception as e:
        logger.error(f"Error in Model Trainer: {str(e)}")