Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, redirect, url_for, flash, send_file | |
import os | |
import pandas as pd | |
from werkzeug.utils import secure_filename | |
from joblib import load | |
import numpy as np | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from sklearn.model_selection import train_test_split | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.decomposition import PCA | |
from sklearn.pipeline import Pipeline | |
from sklearn.tree import DecisionTreeRegressor | |
from sklearn.ensemble import RandomForestRegressor | |
from sklearn.linear_model import LinearRegression | |
from xgboost import XGBRegressor | |
from sklearn.neighbors import KNeighborsRegressor | |
from sklearn.model_selection import cross_val_score | |
from sklearn.metrics import mean_squared_error | |
from sklearn import metrics | |
from sklearn.metrics.pairwise import cosine_similarity | |
from time import time | |
app = Flask(__name__) | |
# Set the secret key for session management | |
app.secret_key = os.urandom(24) | |
# Configurations | |
UPLOAD_FOLDER = "uploads/" | |
DATA_FOLDER = "data/" | |
# Define the model directory (ensuring correct path formatting) | |
MODEL_DIR = r'.\Model' | |
LABEL_ENOCDER_DIR = r'.\Label_encoders' | |
# Define the output file path | |
PRED_OUTPUT_FILE = "data/pred_output.csv" | |
CLASS_OUTPUT_FILE = "data/class_output.csv" | |
ALLOWED_EXTENSIONS = {'csv', 'xlsx'} | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
# Ensure the upload folder exists | |
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
os.makedirs(app.config['DATA_FOLDER'], exist_ok=True) | |
# Load models using os.path.join for better cross-platform compatibility | |
# linear_regression_model | |
gia_model = load(os.path.join(MODEL_DIR, 'linear_regression_model_gia_price.joblib')) | |
grade_model = load(os.path.join(MODEL_DIR, 'linear_regression_model_grade_price.joblib')) | |
bygrade_model = load(os.path.join(MODEL_DIR, 'linear_regression_model_bygrade_price.joblib')) | |
makable_model = load(os.path.join(MODEL_DIR, 'linear_regression_model_makable_price.joblib')) | |
# classifier_model | |
col_model = load(os.path.join(MODEL_DIR, 'classification_LogisticRegression_col.joblib')) | |
cts_model = load(os.path.join(MODEL_DIR, 'classification_LogisticRegression_cts.joblib')) | |
cut_model = load(os.path.join(MODEL_DIR, 'classification_LogisticRegression_cut.joblib')) | |
qua_model = load(os.path.join(MODEL_DIR, 'classification_LogisticRegression_qua.joblib')) | |
shp_model = load(os.path.join(MODEL_DIR, 'classification_LogisticRegression_shp.joblib')) | |
# print("===================================models==================================") | |
# print(gia_model) | |
# print(grade_model) | |
# print(bygrade_model) | |
# print(makable_model) | |
# Load label encoders | |
encoder_list = ['Tag', 'EngShp', 'EngQua', 'EngCol', 'EngCut', 'EngPol', 'EngSym', 'EngFlo', 'EngNts', 'EngMikly', 'EngLab', | |
'Change_cts_value', 'Change_shape_value', 'Change_quality_value', 'Change_color_value', 'Change_cut_value'] | |
#loaded_label_encoder = {val: load(f"./Label_encoders/label_encoder_{val}.joblib") for val in encoder_list} | |
loaded_label_encoder = {} | |
for val in encoder_list: | |
#encoder_path = f"H:/DEV PATEL/2025/AI_In_Diamond_Industry/Label_encoders/label_encoder_{val}.joblib" | |
encoder_path = os.path.join(LABEL_ENOCDER_DIR, f"label_encoder_{val}.joblib") | |
loaded_label_encoder[val] = load(encoder_path) | |
# print(loaded_label_encoder) | |
# Ensure upload folder exists | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def index(): | |
return render_template('index.html') | |
def predict(): | |
if 'file' not in request.files: | |
flash('No file part', 'error') | |
return redirect(request.url) | |
file = request.files['file'] | |
if file.filename == '': | |
flash('No selected file', 'error') | |
return redirect(request.url) | |
if file and allowed_file(file.filename): | |
filename = secure_filename(file.filename) | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
file.save(filepath) | |
# Convert to DataFrame | |
if filename.endswith('.csv'): | |
df = pd.read_csv(filepath) | |
else: | |
df = pd.read_excel(filepath) | |
# Preprocess DataFrame | |
print("===================================process_dataframe=0==================================") | |
df,dx = process_dataframe(df) | |
print("===================================process_dataframe=5==================================") | |
return render_template('output.html', df=df.to_html(), dx=dx.to_html()) | |
else: | |
flash('Invalid file type. Only CSV and Excel files are allowed.', 'error') | |
print('Invalid file type. Only CSV and Excel files are allowed.') | |
return redirect(request.url) | |
def process_dataframe(df): | |
try: | |
print("===================================process_dataframe=1==================================") | |
# 'EngLab' is not in the required columns | |
required_columns = ['Tag', 'EngCts', 'EngShp', 'EngQua', 'EngCol', 'EngCut', 'EngPol', | |
'EngSym', 'EngFlo', 'EngNts', 'EngMikly', 'EngAmt'] | |
# for prediction | |
df = df[required_columns] | |
df = df.copy() | |
# for classification | |
# df[col] = df[col].map(lambda x: loaded_label_encoder[col].transform([x])[0] if x in loaded_label_encoder[col].classes_ else np.nan) | |
# Transform categorical features using loaded label encoders | |
df["Tag"] = loaded_label_encoder['Tag'].transform(df["Tag"]) | |
df["EngShp"] = loaded_label_encoder['EngShp'].transform(df["EngShp"]) | |
df["EngQua"] = loaded_label_encoder['EngQua'].transform(df["EngQua"]) | |
df["EngCol"] = loaded_label_encoder['EngCol'].transform(df["EngCol"]) | |
df["EngCut"] = loaded_label_encoder['EngCut'].transform(df["EngCut"]) | |
df["EngPol"] = loaded_label_encoder['EngPol'].transform(df["EngPol"]) | |
df["EngSym"] = loaded_label_encoder['EngSym'].transform(df["EngSym"]) | |
df["EngFlo"] = loaded_label_encoder['EngFlo'].transform(df["EngFlo"]) | |
df["EngNts"] = loaded_label_encoder['EngNts'].transform(df["EngNts"]) | |
df["EngMikly"] = loaded_label_encoder['EngMikly'].transform(df["EngMikly"]) | |
#EngLab = loaded_label_encoder['EngLab'].transform(df[EngLab]) | |
df=df.astype(float) | |
print(df.head()) | |
dx = df.copy() | |
print(df.columns) | |
x= df.copy() | |
# print("Model expects", gia_model.n_features_in_, "features.") | |
# print("X_features shape:", x.shape) | |
print("===================================process_dataframe=2==================================") | |
# ================================================================================================ | |
# Prediction report | |
# ================================================================================================ | |
# Predict prices | |
df['GIA_Predicted'] = gia_model.predict(x) | |
df['Grade_Predicted'] = grade_model.predict(x) | |
df['ByGrade_Predicted'] = bygrade_model.predict(x) | |
df['Makable_Predicted'] = makable_model.predict(x) | |
# Compute differences | |
df['GIA_Diff'] = df['EngAmt'] - df['GIA_Predicted'] | |
df['Grade_Diff'] = df['EngAmt'] - df['Grade_Predicted'] | |
df['ByGrade_Diff'] = df['EngAmt'] - df['ByGrade_Predicted'] | |
df['Makable_Diff'] = df['EngAmt'] - df['Makable_Predicted'] | |
print(df.head()) | |
predictions = df.to_dict(orient='records') | |
analysis = df.describe().to_html() | |
#print(analysis) | |
#print(predictions) | |
print("===================================process_dataframe=3==================================") | |
# ================================================================================================ | |
# Classification report | |
# ================================================================================================ | |
dx['col_change'] = col_model.predict(x) | |
dx['cts_change'] = cts_model.predict(x) | |
dx['cut_change'] = cut_model.predict(x) | |
dx['qua_change'] = qua_model.predict(x) | |
dx['shp_change'] = shp_model.predict(x) | |
# Inverse transform the predictions | |
dx['col_change'] = loaded_label_encoder['Change_color_value'].inverse_transform(dx['col_change']) | |
dx['cts_change'] = loaded_label_encoder['Change_cts_value'].inverse_transform(dx['cts_change']) | |
dx['cut_change'] = loaded_label_encoder['Change_cut_value'].inverse_transform(dx['cut_change']) | |
dx['qua_change'] = loaded_label_encoder['Change_quality_value'].inverse_transform(dx['qua_change']) | |
dx['shp_change'] = loaded_label_encoder['Change_shape_value'].inverse_transform(dx['shp_change']) | |
print(dx.head()) | |
print("===================================process_dataframe=4==================================") | |
# Save output file with date and time | |
time = str(pd.Timestamp.now().strftime("%Y-%m-%d")) | |
#saving the output file | |
global PRED_OUTPUT_FILE | |
PRED_OUTPUT_FILE = f'data/prediction_output_{time}.csv' | |
df.to_csv(PRED_OUTPUT_FILE, index=False) | |
#saving the output file | |
global CLASS_OUTPUT_FILE | |
CLASS_OUTPUT_FILE = f'data/classification_output_{time}.csv' | |
dx.to_csv(CLASS_OUTPUT_FILE, index=False) | |
print("===================================Output file saved as output.csv===================================") | |
return df.head(), dx.head() | |
except Exception as e: | |
print(f'Error processing file: {e}') | |
flash(f'Error processing file: {e}', 'error') | |
return pd.DataFrame(), pd.DataFrame() | |
def classification_report(df): | |
try: | |
classifcation_data = df[["EngGraphCts","EngCts","EngShp","EngQua","EngCol","EngCut","EngPol","EngSym","EngFlo","EngNts","EngMikly","EngLab","EngAmt", | |
"MkblCts","MkblShp","MkblQua","MkblCol","MkblCut","MkblPol","MkblSym","MkblFlo","MkblNts","MkblMikly","MkblLab","MkblAmt"]] | |
# Make predictions | |
classifcation_data["Cts_diff_eng_mkbl"] = round(classifcation_data["EngCts"] - classifcation_data["MkblCts"],2) | |
# Create a new column 'Change_Label' based on the values in 'Cts_diff_eng_mkbl' | |
classifcation_data['Change_cts_value'] = classifcation_data['Cts_diff_eng_mkbl'].apply( | |
lambda x: str(x)+' negative change' if x < 0 else (str(x)+' positive change' if x > 0 else 'no change') | |
) | |
# Create a new column 'Shape_Change' based on the values in 'EngShp' and 'MkblShp' | |
classifcation_data['Change_shape_value'] = classifcation_data.apply( | |
lambda row: str(row['EngShp'])+' to '+str(row['MkblShp'])+' shape change' if row['EngShp'] != row['MkblShp'] else 'shape not change', axis=1 | |
) | |
# Create a new column 'quality_Change' based on the values in 'EngQua' and 'MkblQua' | |
classifcation_data['Change_quality_value'] = classifcation_data.apply( | |
lambda row: str(row['EngQua'])+' to '+str(row['MkblQua'])+' quality change' if row['EngQua'] != row['MkblQua'] else 'quality not change', axis=1 | |
) | |
# Create a new column 'color_Change' based on the values in 'EngCol' and 'MkblCol' | |
classifcation_data['Change_color_value'] = classifcation_data.apply( | |
lambda row: str(row['EngCol'])+' to '+str(row['MkblCol'])+' color change' if row['EngCol'] != row['MkblCol'] else 'color not change', axis=1 | |
) | |
# Create a new column 'cut_Change' based on the values in 'EngCut' and 'MkblCut' | |
classifcation_data['Change_cut_value'] = classifcation_data.apply( | |
lambda row: str(row['EngCut'])+' to '+str(row['MkblCut'])+' cut change' if row['EngCut'] != row['MkblCut'] else 'cut not change', axis=1 | |
) | |
# Generate classification report | |
return classifcation_data | |
except Exception as e: | |
flash(f'Error generating classification report: {e}', 'error') | |
print(f'Error generating classification report: {e}') | |
return None | |
def download_pred(): | |
"""Serve the output.csv file for download.""" | |
return send_file(PRED_OUTPUT_FILE, as_attachment=True) | |
def download_class(): | |
"""Serve the output.csv file for download.""" | |
return send_file(CLASS_OUTPUT_FILE, as_attachment=True) | |
if __name__ == "__main__": | |
app.run(debug=True) |