Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import openai | |
| import joblib | |
| from PIL import Image | |
| import requests | |
| from io import BytesIO | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from sklearn.preprocessing import LabelEncoder | |
| from huggingface_hub import hf_hub_download | |
| from transformers import AutoFeatureExtractor, AutoModelForImageClassification | |
| import torch | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import re | |
| from datetime import datetime | |
| # Dataset loading function with caching | |
| def load_datasets(): | |
| try: | |
| with st.spinner('Loading dataset...'): | |
| original_data = pd.read_csv('CTP_Model1.csv', low_memory=False) | |
| original_data.columns = original_data.columns.str.strip().str.capitalize() | |
| return original_data | |
| except Exception as e: | |
| st.error(f"Error loading dataset: {str(e)}") | |
| raise e | |
| def load_image(image_file): | |
| return Image.open(image_file) | |
| def classify_image(image): | |
| try: | |
| # Load the model and feature extractor | |
| model_name = "dima806/car_models_image_detection" | |
| feature_extractor = AutoFeatureExtractor.from_pretrained(model_name) | |
| model = AutoModelForImageClassification.from_pretrained(model_name) | |
| # Preprocess the image | |
| inputs = feature_extractor(images=image, return_tensors="pt") | |
| # Perform inference | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| # Get the predicted class | |
| logits = outputs.logits | |
| predicted_class_idx = logits.argmax(-1).item() | |
| # Get the class label and score | |
| predicted_class_label = model.config.id2label[predicted_class_idx] | |
| score = torch.nn.functional.softmax(logits, dim=-1)[0, predicted_class_idx].item() | |
| # Return the top prediction | |
| return [{'label': predicted_class_label, 'score': score}] | |
| except Exception as e: | |
| st.error(f"Classification error: {e}") | |
| return None | |
| def find_closest_match(df, brand, model): | |
| # Combine brand and model names from the dataset | |
| df['full_name'] = df['Make'] + ' ' + df['Model'] | |
| # Create a list of all car names | |
| car_names = df['full_name'].tolist() | |
| # Add the query car name | |
| query_car = f"{brand} {model}" | |
| car_names.append(query_car) | |
| # Create TF-IDF vectorizer | |
| vectorizer = TfidfVectorizer() | |
| tfidf_matrix = vectorizer.fit_transform(car_names) | |
| # Compute cosine similarity | |
| cosine_similarities = cosine_similarity(tfidf_matrix[-1], tfidf_matrix[:-1]).flatten() | |
| # Get the index of the most similar car | |
| most_similar_index = cosine_similarities.argmax() | |
| # Return the most similar car's data | |
| return df.iloc[most_similar_index] | |
| def get_car_overview(car_data): | |
| prompt = f"Provide an overview of the following car:\nYear: {car_data['Year']}\nMake: {car_data['Make']}\nModel: {car_data['Model']}\nTrim: {car_data['Trim']}\nPrice: ${car_data['Price']}\nCondition: {car_data['Condition']}\n" | |
| response = openai.ChatCompletion.create( | |
| model="gpt-3.5-turbo", | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| return response.choices[0].message['content'] | |
| def load_model_and_encodings(): | |
| try: | |
| with st.spinner('Loading model...'): | |
| model_content = hf_hub_download(repo_id="EdBoy2202/car_prediction_model", filename="car_price_modelv3.pkl") | |
| model = joblib.load(model_content) | |
| original_data = load_datasets() | |
| label_encoders = {} | |
| categorical_features = original_data.select_dtypes(include=['object']).columns.tolist() | |
| for feature in categorical_features: | |
| le = LabelEncoder() | |
| unique_values = original_data[feature].fillna('unknown').str.strip().unique() | |
| le.fit(unique_values) | |
| label_encoders[feature.lower()] = le | |
| return model, label_encoders, categorical_features | |
| except Exception as e: | |
| st.error(f"Error loading model: {str(e)}") | |
| raise e | |
| def calculate_age(year): | |
| current_year = datetime.now().year | |
| return current_year - year | |
| def predict_price(model, encoders, categorical_features, user_input): | |
| encoded_features = {} | |
| current_year = datetime.now().year | |
| for feature, value in user_input.items(): | |
| if feature.lower() in encoders: | |
| encoded_features[feature.capitalize()] = encoders[feature.lower()].transform([value])[0] | |
| elif feature in categorical_features: | |
| # If it's a categorical feature but not in encoders, set to 0 (unknown) | |
| encoded_features[feature.capitalize()] = 0 | |
| else: | |
| # For numerical features, use the value as is | |
| encoded_features[feature.capitalize()] = value | |
| # Calculate additional features | |
| encoded_features['Age'] = calculate_age(encoded_features['Year']) | |
| encoded_features['Age_squared'] = encoded_features['Age'] ** 2 | |
| # Assume average mileage per year (you may want to adjust this) | |
| avg_mileage_per_year = 12000 | |
| encoded_features['Mileage_per_year'] = avg_mileage_per_year | |
| # Assume odometer reading (you may want to adjust this) | |
| encoded_features['Odometer'] = encoded_features['Age'] * avg_mileage_per_year | |
| input_data = pd.DataFrame([encoded_features]) | |
| # Ensure all expected columns are present | |
| expected_columns = ['Make', 'Model', 'Year', 'Condition', 'Fuel', 'Odometer', 'Title_status', 'Transmission', 'Drive', 'Size', 'Type', 'Paint_color', 'Age', 'Age_squared', 'Mileage_per_year'] | |
| for col in expected_columns: | |
| if col not in input_data.columns: | |
| input_data[col] = 0 # or some default value | |
| predicted_price = model.predict(input_data) | |
| return predicted_price[0] | |
| # Streamlit App | |
| st.title("Auto Appraise") | |
| st.write("Upload a car image or take a picture to get its brand, model, overview, and expected price!") | |
| # Load model and encoders | |
| model, label_encoders, categorical_features = load_model_and_encodings() | |
| # Initialize OpenAI API key | |
| openai.api_key = st.secrets["GPT_TOKEN"] | |
| # File uploader for image | |
| uploaded_file = st.file_uploader("Choose a car image", type=["jpg", "jpeg", "png"]) | |
| # Camera input as an alternative (optional) | |
| camera_image = st.camera_input("Or take a picture of the car") | |
| # Process the image (either uploaded or from camera) | |
| image = None | |
| if uploaded_file is not None: | |
| image = Image.open(uploaded_file) | |
| st.write("Image uploaded successfully.") | |
| elif camera_image is not None: | |
| image = Image.open(camera_image) | |
| st.write("Image captured successfully.") | |
| if image is not None: | |
| st.image(image, caption='Processed Image', use_container_width=True) | |
| # Classify the car image | |
| with st.spinner('Analyzing image...'): | |
| car_classifications = classify_image(image) | |
| if car_classifications: | |
| st.write("Image classification successful.") | |
| st.subheader("Car Classification Results:") | |
| for classification in car_classifications: | |
| st.write(f"Model: {classification['label']}") | |
| st.write(f"Confidence: {classification['score']*100:.2f}%") | |
| # Use the top prediction for further processing | |
| top_prediction = car_classifications[0]['label'] | |
| brand, model_name = top_prediction.split(' ', 1) | |
| st.write(f"Identified Car: {brand} {model_name}") | |
| # Find the closest match in the CSV | |
| df = load_datasets() | |
| match = find_closest_match(df, brand, model_name) | |
| if match is not None: | |
| st.write("Closest Match Found:") | |
| st.write(f"Make: {match['Make']}") | |
| st.write(f"Model: {match['Model']}") | |
| st.write(f"Year: {match['Year']}") | |
| st.write(f"Price: ${match['Price']}") | |
| # Get additional information using GPT-3.5-turbo | |
| overview = get_car_overview(match) | |
| st.write("Car Overview:") | |
| st.write(overview) | |
| # Interactive Price Prediction | |
| st.subheader("Price Prediction Over Time") | |
| selected_years = st.slider("Select range of years for price prediction", | |
| min_value=2000, max_value=2023, value=(2010, 2023)) | |
| years = np.arange(selected_years[0], selected_years[1] + 1) | |
| predicted_prices = [] | |
| for year in years: | |
| user_input = { | |
| 'make': match['Make'], | |
| 'model': match['Model'], | |
| 'year': year, | |
| 'condition': match.get('Condition', 'unknown'), | |
| 'fuel': match.get('Fuel', 'unknown'), | |
| 'title_status': match.get('Title_status', 'unknown'), | |
| 'transmission': match.get('Transmission', 'unknown'), | |
| 'drive': match.get('Drive', 'unknown'), | |
| 'size': match.get('Size', 'unknown'), | |
| 'type': match.get('Type', 'unknown'), | |
| 'paint_color': match.get('Paint_color', 'unknown'), | |
| } | |
| price = predict_price(model, label_encoders, categorical_features, user_input) | |
| predicted_prices.append(price) | |
| # Plotting the results | |
| plt.figure(figsize=(10, 5)) | |
| plt.plot(years, predicted_prices, marker='o') | |
| plt.title(f"Predicted Price of {match['Make']} {match['Model']} Over Time") | |
| plt.xlabel("Year") | |
| plt.ylabel("Predicted Price ($)") | |
| plt.grid() | |
| st.pyplot(plt) | |
| else: | |
| st.write("No match found in the database.") | |
| else: | |
| st.error("Could not classify the image. Please try again with a different image.") | |
| else: | |
| st.write("Please upload an image or take a picture to proceed.") |