Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import openai | |
import joblib | |
from PIL import Image | |
import requests | |
from io import BytesIO | |
import matplotlib.pyplot as plt | |
import numpy as np | |
from sklearn.preprocessing import LabelEncoder | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from huggingface_hub import hf_hub_download | |
from transformers import AutoFeatureExtractor, AutoModelForImageClassification | |
import torch | |
from datetime import datetime | |
# Dataset loading function with caching | |
def load_datasets(): | |
try: | |
with st.spinner('Loading dataset...'): | |
original_data = pd.read_csv('CTP_Model1.csv', low_memory=False) | |
return original_data | |
except Exception as e: | |
st.error(f"Error loading dataset: {str(e)}") | |
raise e | |
def load_image(image_file): | |
return Image.open(image_file) | |
def classify_image(image): | |
try: | |
# Load the model and feature extractor | |
model_name = "dima806/car_models_image_detection" | |
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name) | |
model = AutoModelForImageClassification.from_pretrained(model_name) | |
# Preprocess the image | |
inputs = feature_extractor(images=image, return_tensors="pt") | |
# Perform inference | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
# Get the predicted class | |
logits = outputs.logits | |
predicted_class_idx = logits.argmax(-1).item() | |
# Get the class label and score | |
predicted_class_label = model.config.id2label[predicted_class_idx] | |
score = torch.nn.functional.softmax(logits, dim=-1)[0, predicted_class_idx].item() | |
# Return the top prediction | |
return [{'label': predicted_class_label, 'score': score}] | |
except Exception as e: | |
st.error(f"Classification error: {e}") | |
return None | |
def find_closest_match(df, brand, model): | |
# Combine brand and model names from the dataset | |
df['full_name'] = df['Make'] + ' ' + df['Model'] | |
# Create a list of all car names | |
car_names = df['full_name'].tolist() | |
# Add the query car name | |
query_car = f"{brand} {model}" | |
car_names.append(query_car) | |
# Create TF-IDF vectorizer | |
vectorizer = TfidfVectorizer() | |
tfidf_matrix = vectorizer.fit_transform(car_names) | |
# Compute cosine similarity | |
cosine_similarities = cosine_similarity(tfidf_matrix[-1], tfidf_matrix[:-1]).flatten() | |
# Get the index of the most similar car | |
most_similar_index = cosine_similarities.argmax() | |
# Return the most similar car's data | |
return df.iloc[most_similar_index] | |
def get_car_overview(car_data): | |
prompt = f"Provide an overview of the following car:\nYear: {car_data['Year']}\nMake: {car_data['Make']}\nModel: {car_data['Model']}\nTrim: {car_data['Trim']}\nPrice: ${car_data['Price']}\nCondition: {car_data['Condition']}\n" | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": prompt}] | |
) | |
return response.choices[0].message['content'] | |
def load_model_and_encodings(): | |
try: | |
with st.spinner('Loading model...'): | |
model_content = hf_hub_download(repo_id="EdBoy2202/car_prediction_model", filename="car_price_modelv3.pkl") | |
model = joblib.load(model_content) | |
return model | |
except Exception as e: | |
st.error(f"Error loading model: {str(e)}") | |
raise e | |
def predict_price(model, match, year): | |
# Start with the data from the closest match | |
input_data = match.copy() | |
# Update the year | |
input_data['Year'] = year | |
# Calculate age | |
current_year = datetime.now().year | |
input_data['Age'] = current_year - year | |
input_data['Age_squared'] = input_data['Age'] ** 2 | |
# If odometer is missing, estimate it based on age and average yearly mileage | |
if 'Odometer' not in input_data or pd.isna(input_data['Odometer']): | |
avg_yearly_mileage = 12000 # Adjust this value as needed | |
input_data['Odometer'] = input_data['Age'] * avg_yearly_mileage | |
# Ensure all required columns are present | |
required_columns = ['Make', 'Model', 'Year', 'Condition', 'Fuel', 'Odometer', 'Title_status', 'Transmission', 'Drive', 'Size', 'Type', 'Paint_color', 'Age', 'Age_squared'] | |
for col in required_columns: | |
if col not in input_data or pd.isna(input_data[col]): | |
# If a required column is missing, fill it with the most common value from the dataset | |
input_data[col] = df[col].mode().iloc[0] | |
# Prepare the input for the model | |
input_df = pd.DataFrame([input_data]) | |
# Make sure to only include columns that the model expects | |
model_columns = model.feature_names_in_ | |
input_df = input_df[model_columns] | |
# Predict the price | |
predicted_price = model.predict(input_df) | |
return predicted_price[0] | |
# Streamlit App | |
st.title("Auto Appraise") | |
st.write("Upload a car image or take a picture to get its brand, model, overview, and expected price!") | |
# Load model and encodings | |
model = load_model_and_encodings() | |
# Initialize OpenAI API key | |
openai.api_key = st.secrets["GPT_TOKEN"] | |
# File uploader for image | |
uploaded_file = st.file_uploader("Choose a car image", type=["jpg", "jpeg", "png"]) | |
# Camera input as an alternative (optional) | |
camera_image = st.camera_input("Or take a picture of the car") | |
# Process the image (either uploaded or from camera) | |
image = None | |
if uploaded_file is not None: | |
image = Image.open(uploaded_file) | |
st.write("Image uploaded successfully.") | |
elif camera_image is not None: | |
image = Image.open(camera_image) | |
st.write("Image captured successfully.") | |
if image is not None: | |
st.image(image, caption='Processed Image', use_container_width=True) | |
# Classify the car image | |
with st.spinner('Analyzing image...'): | |
car_classifications = classify_image(image) | |
if car_classifications: | |
st.write("Image classification successful.") | |
st.subheader("Car Classification Results:") | |
for classification in car_classifications: | |
st.write(f"Model: {classification['label']}") | |
st.write(f"Confidence: {classification['score']*100:.2f}%") | |
# Use the top prediction for further processing | |
top_prediction = car_classifications[0]['label'] | |
brand, model_name = top_prediction.split(' ', 1) | |
st.write(f"Identified Car: {brand} {model_name}") | |
# Find the closest match in the CSV | |
df = load_datasets() | |
match = find_closest_match(df, brand, model_name) | |
if match is not None: | |
st.write("Closest Match Found:") | |
st.write(f"Make: {match['Make']}") | |
st.write(f"Model: {match['Model']}") | |
st.write(f"Year: {match['Year']}") | |
st.write(f"Price: ${match['Price']}") | |
# Get additional information using GPT-3.5-turbo | |
overview = get_car_overview(match) | |
st.write("Car Overview:") | |
st.write(overview) | |
# Interactive Price Prediction | |
st.subheader("Price Prediction Over Time") | |
selected_years = st.slider("Select range of years for price prediction", | |
min_value=2000, max_value=2023, value=(2010, 2023)) | |
years = np.arange(selected_years[0], selected_years[1] + 1) | |
predicted_prices = [] | |
for year in years: | |
price = predict_price(model, match, year) | |
predicted_prices.append(price) | |
# Plotting the results | |
plt.figure(figsize=(10, 5)) | |
plt.plot(years, predicted_prices, marker='o') | |
plt.title(f"Predicted Price of {match['Make']} {match['Model']} Over Time") | |
plt.xlabel("Year") | |
plt.ylabel("Predicted Price ($)") | |
plt.grid() | |
st.pyplot(plt) | |
else: | |
st.write("No match found in the database.") | |
else: | |
st.error("Could not classify the image. Please try again with a different image.") | |
else: | |
st.write("Please upload an image or take a picture to proceed.") |