|
import os |
|
import logging |
|
from typing import Annotated |
|
import mlflow |
|
import joblib |
|
import pandas as pd |
|
|
|
from sklearn.pipeline import Pipeline |
|
|
|
from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri |
|
from zenml import ArtifactConfig, step |
|
from zenml.client import Client |
|
from zenml import Model |
|
|
|
|
|
from src.model_building import ModelBuilding |
|
|
|
|
|
experiment_tracker = Client().active_stack.experiment_tracker |
|
|
|
|
|
model_metadata = Model( |
|
name="customer_churn_prediction", |
|
version=None, |
|
license="Apache-2.0", |
|
description="Customer churn prediction model for Telecom company.", |
|
) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
@step(enable_cache=False, experiment_tracker=experiment_tracker.name, model=model_metadata) |
|
def model_builder_step(model_name: str, X_train: pd.DataFrame, y_train: pd.Series) -> Annotated[ |
|
Pipeline,ArtifactConfig(name = "sklearn_pipeline",is_model_artifact = True)]: |
|
""" |
|
ZenML step to create, preprocess, train, and return a specified model. |
|
|
|
Parameters |
|
|
|
model_name : str |
|
Name of the model to create. |
|
X_train : pd.DataFrame |
|
Training data features. |
|
y_train : pd.Series |
|
Training data labels/target. |
|
|
|
Returns |
|
|
|
Any |
|
The trained model or pipeline including preprocessing. |
|
|
|
""" |
|
|
|
|
|
categorical_cols = X_train.select_dtypes(include=['object', "category"]).columns |
|
numerical_cols = X_train.select_dtypes(exclude=['object', 'category']).columns |
|
|
|
logger.info(f"Categorical columns: {categorical_cols.tolist()}") |
|
logger.info(f"Numerical columns: {numerical_cols.tolist()}") |
|
logger.info("Starting model building step...") |
|
|
|
if not mlflow.active_run(): |
|
mlflow.start_run() |
|
|
|
|
|
model_builder = ModelBuilding() |
|
|
|
try: |
|
mlflow.sklearn.autolog() |
|
model = model_builder.get_model(model_name, X_train, y_train) |
|
logger.info(f"Model '{model_name}' has been successfully created.") |
|
|
|
pipeline = Pipeline(steps=[("model", model)]) |
|
|
|
pipeline.fit(X_train, y_train) |
|
logger.info("Model training completed") |
|
except ValueError as e: |
|
logger.error(f"An error occurred: {e}") |
|
raise |
|
finally: |
|
|
|
mlflow.end_run() |
|
|
|
return pipeline |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|