import requests from PIL import Image import io import os import torch import torch.nn.functional as F from transformers import CLIPProcessor, CLIPModel, AutoImageProcessor, AutoModelForImageClassification import numpy as np import chromadb from flask import Flask, request, jsonify, render_template, send_file from werkzeug.utils import secure_filename import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed import multiprocessing from functools import partial import tempfile import shutil import warnings from pathlib import Path import asyncio import aiohttp import aiofiles from typing import List, Dict, Any, Optional import logging import schedule # Suppress warnings warnings.filterwarnings("ignore", category=FutureWarning) warnings.filterwarnings("ignore", category=UserWarning) # Set up cache directories for Hugging Face deployment os.environ['HF_HOME'] = '/tmp/huggingface_cache' os.environ['XDG_CACHE_HOME'] = '/tmp/huggingface_cache' # Create cache directories if they don't exist os.makedirs('/tmp/huggingface_cache', exist_ok=True) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize Flask app app = Flask(__name__, template_folder='templates') app.config['UPLOAD_FOLDER'] = '/tmp/uploads' app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # Global variables for model and collection clip_model = None clip_processor = None property_classifier = None property_processor = None collection = None client = None downloaded_images = [] # Make this global initialization_status = "Initializing..." # Performance configuration MAX_DOWNLOAD_WORKERS = 32 # Increased from 16 MAX_EMBEDDING_WORKERS = 24 # Increased from 12 MAX_PROCESS_WORKERS = min(16, multiprocessing.cpu_count()) BATCH_SIZE = 100 # Process embeddings in batches CHUNK_SIZE = 50 # Download images in chunks # Cache for embeddings to avoid reprocessing embedding_cache = {} cache_lock = threading.Lock() # Progress tracking initialization_progress = 0 initialization_start_time = time.time() # Function to fetch image data from the API def fetch_image_data(api_url): """Fetch image data from API with retry mechanism without timeouts""" max_retries = 5 # Increased retries for attempt in range(max_retries): try: # Remove timeout for maximum reliability response = requests.get(api_url, timeout=None) response.raise_for_status() data = response.json() logger.info(f"Successfully fetched {len(data)} images from API") return data except requests.exceptions.Timeout: logger.warning(f"Timeout fetching data (attempt {attempt + 1})") if attempt == max_retries - 1: logger.error(f"Failed to fetch image data after {max_retries} attempts due to timeouts") return [] time.sleep(5 * (attempt + 1)) # Progressive backoff except requests.exceptions.RequestException as e: logger.warning(f"Request error (attempt {attempt + 1}): {e}") if attempt == max_retries - 1: logger.error(f"Failed to fetch image data after {max_retries} attempts") return [] time.sleep(5 * (attempt + 1)) # Progressive backoff except Exception as e: logger.error(f"Unexpected error fetching data (attempt {attempt + 1}): {e}") if attempt == max_retries - 1: return [] time.sleep(5 * (attempt + 1)) return [] async def download_single_image_async(session, item): """Async version of image download for better performance without timeouts""" try: # Try different possible field names for image URL image_url = item.get('cloudinaryUrl') or item.get('imageUrl') or item.get('image_url') or item.get('url') image_id = item.get('id') property_id = item.get('propertyId') if not image_url or not image_id: return None # Create temp directory if it doesn't exist temp_dir = Path('/tmp/property_images') temp_dir.mkdir(exist_ok=True) # Generate filename file_extension = image_url.split('.')[-1].split('?')[0] if file_extension not in ['jpg', 'jpeg', 'png', 'webp']: file_extension = 'jpg' filename = f"{image_id}.{file_extension}" filepath = temp_dir / filename # Check if file already exists if filepath.exists(): return { 'id': image_id, 'propertyId': property_id, 'filepath': str(filepath), 'imageUrl': image_url } # Download image without timeout for maximum reliability async with session.get(image_url) as response: if response.status == 200: content = await response.read() # Save image async with aiofiles.open(filepath, 'wb') as f: await f.write(content) logger.debug(f"Successfully downloaded {image_url} -> {filepath}") return { 'id': image_id, 'propertyId': property_id, 'filepath': str(filepath), 'imageUrl': image_url } else: logger.warning(f"Failed to download {image_url}: HTTP {response.status}") return None except asyncio.TimeoutError: logger.warning(f"Timeout downloading {image_url}") return None except aiohttp.ClientError as e: logger.warning(f"Client error downloading {image_url}: {e}") return None except Exception as e: logger.error(f"Error downloading {image_url}: {e}") return None def download_single_image(item): """Synchronous version for ThreadPoolExecutor fallback without timeouts""" try: # Try different possible field names for image URL image_url = item.get('cloudinaryUrl') or item.get('imageUrl') or item.get('image_url') or item.get('url') image_id = item.get('id') property_id = item.get('propertyId') if not image_url or not image_id: return None # Create temp directory if it doesn't exist temp_dir = Path('/tmp/property_images') temp_dir.mkdir(exist_ok=True) # Generate filename file_extension = image_url.split('.')[-1].split('?')[0] if file_extension not in ['jpg', 'jpeg', 'png', 'webp']: file_extension = 'jpg' filename = f"{image_id}.{file_extension}" filepath = temp_dir / filename # Check if file already exists if filepath.exists(): return { 'id': image_id, 'propertyId': property_id, 'filepath': str(filepath), 'imageUrl': image_url } # Download image without timeout for maximum reliability for attempt in range(3): try: # Remove timeout for maximum reliability response = requests.get(image_url, stream=True, timeout=None) response.raise_for_status() with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return { 'id': image_id, 'propertyId': property_id, 'filepath': str(filepath), 'imageUrl': image_url } except requests.exceptions.Timeout: logger.warning(f"Timeout downloading {image_url} (attempt {attempt + 1})") if attempt == 2: # Last attempt return None time.sleep(2) except requests.exceptions.RequestException as e: logger.warning(f"Request error downloading {image_url} (attempt {attempt + 1}): {e}") if attempt == 2: # Last attempt return None time.sleep(2) except Exception as e: logger.error(f"Unexpected error downloading {image_url} (attempt {attempt + 1}): {e}") if attempt == 2: # Last attempt return None time.sleep(2) except Exception as e: logger.error(f"Error processing image from {image_url}: {e}") return None # Function to download and process images with optimized parallel processing def download_and_process_images(image_data, num_properties=600, max_workers=MAX_DOWNLOAD_WORKERS): """Download and process images with optimized parallel processing""" temp_dir = Path('/tmp/property_images') temp_dir.mkdir(exist_ok=True) downloaded_images = [] processed_property_ids = set() property_image_data = {} # Group images by property for item in image_data: property_id = item.get('propertyId') if property_id is not None: if property_id not in property_image_data: property_image_data[property_id] = [] property_image_data[property_id].append(item) # Get properties to process properties_to_process = list(property_image_data.items())[:num_properties] all_images_to_process = [] for property_id, images in properties_to_process: processed_property_ids.add(property_id) all_images_to_process.extend(images) logger.info(f"Starting optimized parallel download of {len(all_images_to_process)} images using {max_workers} workers...") # Use ThreadPoolExecutor with increased workers for faster processing with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all download tasks future_to_item = {executor.submit(download_single_image, item): item for item in all_images_to_process} # Collect results as they complete with better progress tracking completed_count = 0 for future in as_completed(future_to_item): result = future.result() if result is not None: downloaded_images.append(result) completed_count += 1 if completed_count % 50 == 0: # Show progress every 50 images logger.info(f"Downloaded {completed_count}/{len(all_images_to_process)} images ({completed_count/len(all_images_to_process)*100:.1f}%)") logger.info(f"✅ Finished downloading. Successfully processed images from {len(processed_property_ids)} properties. Total images downloaded: {len(downloaded_images)}") return downloaded_images async def download_images_async(image_data, num_properties=600): """Ultra-fast async image downloading with optimized performance""" temp_dir = Path('/tmp/property_images') temp_dir.mkdir(exist_ok=True) downloaded_images = [] processed_property_ids = set() property_image_data = {} # Group images by property for item in image_data: property_id = item.get('propertyId') if property_id is not None: if property_id not in property_image_data: property_image_data[property_id] = [] property_image_data[property_id].append(item) # Get properties to process properties_to_process = list(property_image_data.items())[:num_properties] all_images_to_process = [] for property_id, images in properties_to_process: processed_property_ids.add(property_id) all_images_to_process.extend(images) logger.info(f"🚀 Starting ultra-fast async download of {len(all_images_to_process)} images...") # Optimized semaphore for maximum concurrency max_concurrent = min(64, len(all_images_to_process)) # Dynamic concurrency semaphore = asyncio.Semaphore(max_concurrent) logger.info(f"⚡ Using {max_concurrent} concurrent downloads") # Ultra-optimized session configuration connector = aiohttp.TCPConnector( limit=max_concurrent, limit_per_host=50, # Very high per-host limit ttl_dns_cache=600, # Cache DNS for 10 minutes use_dns_cache=True, keepalive_timeout=60, enable_cleanup_closed=True, force_close=False, # Keep connections alive ssl=False # Disable SSL verification for speed (if needed) ) # Minimal timeout for faster processing timeout = aiohttp.ClientTimeout( total=30, # 30 second total timeout connect=10, # 10 second connect timeout sock_read=20 # 20 second read timeout ) async with aiohttp.ClientSession( connector=connector, timeout=timeout, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive' } ) as session: async def download_with_semaphore(item): async with semaphore: return await download_single_image_async(session, item) # Process all images at once for maximum parallelism try: tasks = [download_with_semaphore(item) for item in all_images_to_process] results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, dict) and result is not None: downloaded_images.append(result) elif isinstance(result, Exception): logger.debug(f"Download failed: {result}") # Progress update completed = len(downloaded_images) total = len(all_images_to_process) logger.info(f"✅ Downloaded {completed}/{total} images ({completed/total*100:.1f}%)") except Exception as e: logger.error(f"Error in async download: {e}") logger.info(f"✅ Ultra-fast async download complete: {len(downloaded_images)} images from {len(processed_property_ids)} properties") return downloaded_images # Function to generate embeddings for a single image using CLIP def get_image_embedding_clip(image_path, clip_model, clip_processor): """Generate CLIP embedding for a single image with caching""" # Check cache first with cache_lock: if image_path in embedding_cache: return embedding_cache[image_path] if clip_model is None or clip_processor is None: return None try: # Load and preprocess image image = Image.open(image_path).convert('RGB') # Resize image for faster processing if too large max_size = 512 if max(image.size) > max_size: ratio = max_size / max(image.size) new_size = tuple(int(dim * ratio) for dim in image.size) image = image.resize(new_size, Image.Resampling.LANCZOS) inputs = clip_processor(images=image, return_tensors="pt", padding=True) # Generate embedding with torch.no_grad(): image_features = clip_model.get_image_features(**inputs) embedding = image_features.numpy().flatten() # Cache the result with cache_lock: embedding_cache[image_path] = embedding return embedding except Exception as e: logger.error(f"Error processing image {image_path}: {e}") return None def process_single_embedding(image_info, clip_model, clip_processor): """Process a single image for embedding generation""" filepath = image_info['filepath'] image_id = image_info['id'] embedding = get_image_embedding_clip(filepath, clip_model, clip_processor) if embedding is not None: return image_id, embedding return None def process_embeddings_batch(batch, clip_model, clip_processor): """Process a batch of images for embedding generation using multiprocessing""" results = [] for image_info in batch: result = process_single_embedding(image_info, clip_model, clip_processor) if result is not None: results.append(result) return results def generate_embeddings_parallel(downloaded_images, clip_model, clip_processor): """Generate embeddings with ultra-fast parallel processing using multiple strategies""" if not downloaded_images or clip_model is None or clip_processor is None: logger.warning("No images or CLIP model available for embedding generation") return {} logger.info("🚀 Starting ultra-fast parallel embedding generation...") # Optimized configuration for maximum speed max_workers = min(32, multiprocessing.cpu_count() * 4) # Aggressive worker count batch_size = 50 # Smaller batches for better memory management logger.info(f"⚡ Using {max_workers} workers with batch size {batch_size}") image_embeddings = {} completed_count = 0 total_images = len(downloaded_images) # Pre-load model to GPU if available for faster inference device = 'cuda' if torch.cuda.is_available() else 'cpu' if device == 'cuda': clip_model = clip_model.to(device) clip_model.eval() logger.info("🔥 Model loaded on GPU for maximum speed") def process_single_image_fast(image_info): """Ultra-fast single image processing with GPU acceleration""" try: filepath = image_info['filepath'] image_id = image_info['id'] # Check cache first with cache_lock: if filepath in embedding_cache: return image_id, embedding_cache[filepath] # Load and preprocess image with optimized settings image = Image.open(filepath).convert('RGB') # Resize for faster processing (smaller size = faster) max_size = 224 # Reduced from 512 for speed if max(image.size) > max_size: ratio = max_size / max(image.size) new_size = tuple(int(dim * ratio) for dim in image.size) image = image.resize(new_size, Image.Resampling.LANCZOS) # Process with optimized settings inputs = clip_processor(images=image, return_tensors="pt", padding=True) # Move to GPU if available if device == 'cuda': inputs = {k: v.to(device) for k, v in inputs.items()} # Generate embedding with no_grad for speed with torch.no_grad(): image_features = clip_model.get_image_features(**inputs) embedding = image_features.cpu().numpy().flatten() # Cache the result with cache_lock: embedding_cache[filepath] = embedding return image_id, embedding except Exception as e: logger.debug(f"Error processing {filepath}: {e}") return None # Use ThreadPoolExecutor with maximum workers with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks at once for maximum parallelism future_to_image = {executor.submit(process_single_image_fast, img): img for img in downloaded_images} # Collect results as they complete with real-time progress for future in as_completed(future_to_image): try: result = future.result(timeout=60) # 1 minute timeout per image if result is not None: image_id, embedding = result image_embeddings[image_id] = embedding completed_count += 1 # Show progress every 25 images for better UX if completed_count % 25 == 0: progress = (completed_count / total_images) * 100 logger.info(f"⚡ Generated embeddings: {completed_count}/{total_images} ({progress:.1f}%)") except Exception as e: logger.warning(f"Error processing image: {e}") continue logger.info(f"✅ Ultra-fast embedding generation complete: {len(image_embeddings)} images processed") return image_embeddings # Function to search for similar images with optimized performance def search_similar_images(query_image_path, collection, clip_model, clip_processor, n_results=30): if clip_model is None or clip_processor is None: print("CLIP model is not loaded. Cannot perform search.") return None # Generate query embedding query_embedding = get_image_embedding_clip(query_image_path, clip_model, clip_processor) if query_embedding is None: print(f"Could not generate embedding for query image: {query_image_path}") return None # Perform the search in ChromaDB with optimized settings try: results = collection.query( query_embeddings=[query_embedding.tolist()], n_results=n_results, include=['metadatas', 'distances'], # Add distance threshold for better quality results where=None # No filtering for now, but can be optimized later ) return results except Exception as e: print(f"Error during ChromaDB search: {e}") return None # Function to check if image is property/real estate related using the best model def is_property_related_image(image_path, threshold=0.4): """ Check if the uploaded image is property/real estate related using andupets/real-estate-image-classification This model is specifically trained for real estate classification with 89.6% accuracy Using 0.4 threshold for more lenient property detection """ try: if property_classifier is None or property_processor is None: print("Property classifier not loaded, proceeding with search...") return True, 0.5, "Classifier unavailable" # Load and preprocess image image = Image.open(image_path).convert('RGB') # Resize for faster processing max_size = 224 if max(image.size) > max_size: image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) # Process image inputs = property_processor(images=image, return_tensors="pt") # Get predictions with torch.no_grad(): outputs = property_classifier(**inputs) logits = outputs.logits probs = torch.softmax(logits, dim=1).detach().numpy()[0] # Get the highest probability and label max_prob_idx = probs.argmax() max_prob = probs[max_prob_idx] # Get predicted label if hasattr(property_classifier.config, 'id2label'): predicted_label = property_classifier.config.id2label[max_prob_idx] else: predicted_label = f"class_{max_prob_idx}" # The andupets/real-estate-image-classification model has these specific classes: # ['bathroom', 'bedroom', 'dining room', 'house facade', 'kitchen', 'living room', 'sao paulo apartment facade'] # All of these are real estate related # More lenient logic: if it's predicted as any real estate class, accept it # Even with lower confidence, since the model is specifically trained for real estate is_property = max_prob > threshold # Additional check: if confidence is very low but still a real estate class, be more lenient if max_prob > 0.3 and predicted_label.lower() in ['bathroom', 'bedroom', 'dining room', 'house facade', 'kitchen', 'living room', 'sao paulo apartment facade']: is_property = True print(f"Property classification: {predicted_label} (confidence: {max_prob:.3f}, is_property: {is_property})") return is_property, float(max_prob), predicted_label except Exception as e: print(f"Error in property classification: {e}") # Fallback: allow the image to proceed if there's an error return True, 0.5, f"Error: {str(e)}" # Function to load property classification model with caching def load_property_classifier(): """Load a lightweight property classification model with optimized caching""" global property_classifier, property_processor try: print("Loading property classification model...") # Use the best real estate classification model model_options = [ "andupets/real-estate-image-classification", # Best specialized real estate model "microsoft/resnet-50", # Fallback general purpose "google/vit-base-patch16-224" # Alternative fallback ] for model_name in model_options: try: print(f"Trying to load: {model_name}") # Use optimized cache settings cache_dir = '/tmp/huggingface_cache' os.makedirs(cache_dir, exist_ok=True) property_processor = AutoImageProcessor.from_pretrained( model_name, cache_dir=cache_dir, local_files_only=False # Allow downloading if not cached ) property_classifier = AutoModelForImageClassification.from_pretrained( model_name, cache_dir=cache_dir, local_files_only=False ) # Move to GPU if available for faster inference if torch.cuda.is_available(): property_classifier = property_classifier.to('cuda') property_classifier.eval() # Set to evaluation mode for faster inference print(f"✅ Property classifier loaded on GPU: {model_name}") else: property_classifier.eval() # Set to evaluation mode for faster inference print(f"✅ Property classifier loaded on CPU: {model_name}") return True except Exception as e: print(f"Failed to load {model_name}: {e}") continue print("⚠️ Warning: Could not load any property classification model") return False except Exception as e: print(f"Error loading property classifier: {e}") return False # Initialize Flask app app = Flask(__name__, template_folder='templates') app.config['UPLOAD_FOLDER'] = '/tmp/uploads' app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # Global variables for model and collection clip_model = None clip_processor = None property_classifier = None property_processor = None collection = None client = None downloaded_images = [] # Make this global initialization_status = "Initializing..." def initialize_visual_search(): """Initialize the visual search system with aggressive parallel processing""" global clip_model, clip_processor, property_classifier, property_processor, collection, client, downloaded_images, initialization_status, initialization_progress initialization_status = "Fetching image data..." initialization_progress = 10 logger.info("🚀 Initializing visual search system with ultra-fast parallel processing...") # API URL for property images api_url = "https://hivepropapi.azurewebsites.net/api/PropertyImage/list" collection_name = "property_image_embeddings" # Fetch image data with retry mechanism logger.info("📡 Fetching image data from API...") image_data = fetch_image_data(api_url) initialization_progress = 20 if not image_data: logger.warning("No image data fetched. Using sample data for testing.") initialization_status = "No image data available" initialization_progress = 100 return # Download and process images with aggressive parallel processing initialization_status = "Downloading property images..." initialization_progress = 30 logger.info("⬇️ Downloading and processing images with ultra-fast parallel processing...") # Use async download for maximum performance try: # Try async download first loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) downloaded_images = loop.run_until_complete(download_images_async(image_data, num_properties=300)) loop.close() # Check if async download was successful if not downloaded_images: logger.warning("Async download returned no images, falling back to threaded download") downloaded_images = download_and_process_images(image_data, num_properties=300, max_workers=MAX_DOWNLOAD_WORKERS) else: logger.info(f"✅ Async download successful: {len(downloaded_images)} images downloaded") initialization_progress = 50 except Exception as e: logger.warning(f"Async download failed, falling back to threaded download: {e}") downloaded_images = download_and_process_images(image_data, num_properties=300, max_workers=MAX_DOWNLOAD_WORKERS) initialization_progress = 50 # Load property classification model first (lightweight) initialization_status = "Loading property classifier..." initialization_progress = 60 try: logger.info("🔍 Loading property classification model...") property_classifier_loaded = load_property_classifier() if property_classifier_loaded: logger.info("✅ Property classification model loaded successfully.") else: logger.warning("⚠️ Property classification model could not be loaded, will proceed without it.") except Exception as e: logger.error(f"Error loading property classifier: {e}") # Load CLIP model and processor initialization_status = "Loading AI model..." initialization_progress = 70 try: logger.info("🧠 Loading CLIP model and processor...") # Use cache directory cache_dir = '/tmp/huggingface_cache' clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32", cache_dir=cache_dir) clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32", cache_dir=cache_dir) # Move model to GPU if available if torch.cuda.is_available(): clip_model = clip_model.to('cuda') logger.info("🔥 CLIP model loaded on GPU") else: logger.info("💻 CLIP model loaded on CPU") logger.info("✅ CLIP model and processor loaded successfully.") initialization_progress = 80 except Exception as e: logger.error(f"Error loading CLIP model: {e}") clip_model = None clip_processor = None initialization_status = "Failed to load AI model" initialization_progress = 100 return # Generate embeddings with aggressive parallel processing image_embeddings = {} if clip_model is not None and clip_processor is not None and downloaded_images: initialization_status = "Generating image embeddings..." initialization_progress = 85 logger.info("🧠 Generating embeddings with ultra-fast parallel processing...") # Use the new parallel embedding generation function image_embeddings = generate_embeddings_parallel(downloaded_images, clip_model, clip_processor) if not image_embeddings: logger.warning("No embeddings generated. Skipping database setup.") initialization_status = "No embeddings generated" initialization_progress = 100 return else: initialization_progress = 90 else: logger.warning("Skipping embedding generation as the CLIP model could not be loaded or no images downloaded.") initialization_status = "No embeddings generated" initialization_progress = 100 return # Initialize ChromaDB client and collection with optimized settings initialization_status = "Setting up database..." initialization_progress = 95 try: # Disable ChromaDB telemetry and optimize settings client = chromadb.Client(settings=chromadb.config.Settings( anonymized_telemetry=False, allow_reset=True )) # Try to get existing collection first try: collection = client.get_collection(name=collection_name) logger.info(f"✅ Using existing collection '{collection_name}' with {collection.count()} items") initialization_status = "Ready!" initialization_progress = 100 return # Skip re-processing if collection exists except: # Create new collection if it doesn't exist collection = client.create_collection(name=collection_name) logger.info(f"✅ Created new collection '{collection_name}'") # Prepare data for insertion with optimized batch processing if image_embeddings: logger.info("💾 Preparing data for ChromaDB insertion...") # Prepare data in batches for better performance batch_size = 1000 total_embeddings = len(image_embeddings) for i in range(0, total_embeddings, batch_size): batch_end = min(i + batch_size, total_embeddings) batch_images = list(downloaded_images)[i:batch_end] # Filter images that have embeddings batch_data = [] for image_info in batch_images: if image_info['id'] in image_embeddings: batch_data.append({ 'id': str(image_info['id']), 'embedding': image_embeddings[image_info['id']].tolist(), 'metadata': {"property_id": image_info['propertyId']} }) if batch_data: # Add batch to collection collection.add( embeddings=[item['embedding'] for item in batch_data], ids=[item['id'] for item in batch_data], metadatas=[item['metadata'] for item in batch_data] ) logger.info(f"✅ Added batch {i//batch_size + 1}: {len(batch_data)} embeddings") logger.info(f"✅ Successfully added {len(image_embeddings)} embeddings to ChromaDB") logger.info(f"📊 Total items in ChromaDB collection: {collection.count()}") initialization_status = "Ready!" initialization_progress = 100 else: logger.warning("No embeddings generated. Collection not populated.") initialization_status = "No data available" initialization_progress = 100 except Exception as e: logger.error(f"Error initializing ChromaDB: {e}") collection = None initialization_status = "Database error" initialization_progress = 100 @app.route('/') def index(): return render_template('index.html') @app.route('/search', methods=['POST']) def search(): """Search endpoint with graceful handling of initialization""" if 'file' not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 # Check if system is ready if not collection or not clip_model or not clip_processor: return jsonify({ "error": "System initializing", "message": "The visual search system is still initializing. Please try again in a few moments.", "status": initialization_status, "can_retry": True }), 503 # Service Unavailable try: filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # Check if the uploaded image is property-related using the best real estate model try: is_property, confidence, predicted_label = is_property_related_image(filepath, threshold=0.4) print(f"Uploaded image '{filename}' is property-related: {is_property} (Confidence: {confidence:.2f}, Predicted Label: {predicted_label})") if not is_property: return jsonify({ "error": "Non-property image detected", "message": f"The uploaded image appears to be '{predicted_label}' with low confidence ({confidence:.2f}). This doesn't seem to be a real estate property image.", "details": { "predicted_label": predicted_label, "confidence": f"{confidence:.2f}", "threshold": "0.4", "suggestion": "Please upload an image of a property (bathroom, bedroom, kitchen, living room, house facade, etc.)" } }), 400 except Exception as e: print(f"Error during property classification: {e}") # If property classification fails, proceed with search anyway is_property, confidence, predicted_label = True, 0.5, "Classification failed" search_results = search_similar_images(filepath, collection, clip_model, clip_processor, n_results=30) if search_results and search_results['ids'] and search_results['ids'][0]: results = [] for i in range(len(search_results['ids'][0])): image_id = search_results['ids'][0][i] distance = search_results['distances'][0][i] property_id = search_results['metadatas'][0][i]['property_id'] # Find the corresponding image file path image_filepath = None for img_info in downloaded_images: if str(img_info['id']) == str(image_id): image_filepath = img_info['filepath'] break # Convert distance to similarity score (CLIP uses cosine similarity) # Distance is 1 - cosine_similarity, so similarity = 1 - distance similarity_score = max(0, (1 - distance) * 100) results.append({ 'image_id': image_id, 'property_id': property_id, 'distance': f"{distance:.4f}", 'similarity_score': f"{similarity_score:.1f}%", 'image_path': f"/property_image/{image_id}" if image_filepath else None }) return jsonify({ "results": results, "property_check": { "is_property": True, "confidence": f"{confidence:.2f}", "predicted_label": predicted_label } }) else: return jsonify({"message": "No similar images found."}) except Exception as e: return jsonify({ "error": "Search failed", "message": str(e), "can_retry": True }), 500 return jsonify({"error": "Visual search system not initialized"}), 500 @app.route('/health') def health(): """Simple health check endpoint""" return jsonify({ "status": "healthy", "timestamp": time.time(), "app_running": True, "uptime_seconds": round(time.time() - initialization_start_time, 1) }) @app.route('/test') def test(): """Test endpoint to verify app is responding""" return jsonify({ "message": "App is working! 🚀", "timestamp": time.time(), "status": "operational" }) @app.route('/property_image/') def serve_property_image(image_id): """Serve property images from the property_images directory""" try: # Find the image file for this image_id image_filepath = None for img_info in downloaded_images: if str(img_info['id']) == str(image_id): image_filepath = img_info['filepath'] break if image_filepath and os.path.exists(image_filepath): return send_file(image_filepath, mimetype='image/jpeg') else: return jsonify({"error": "Image not found"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/status') def status(): """Check if the visual search system is ready - app always responds""" try: total_images = collection.count() if collection else 0 # Calculate progress and timing elapsed_time = time.time() - initialization_start_time progress_percentage = min(100, initialization_progress) # Determine if background loading is happening background_status = "idle" if total_images > 0 and total_images < 1000: # If we have some images but not full dataset background_status = "loading" return jsonify({ "app_status": "running", # App is always running "model_loaded": clip_model is not None and clip_processor is not None, "property_classifier_loaded": property_classifier is not None and property_processor is not None, "collection_ready": collection is not None, "total_images": total_images, "background_loading": background_status, "initialization_status": initialization_status, "initialization_progress": progress_percentage, "elapsed_time_seconds": round(elapsed_time, 1), "can_search": collection is not None and clip_model is not None and clip_processor is not None, "estimated_time_remaining": "calculating..." if progress_percentage < 100 else "complete" }) except Exception as e: # Even if there's an error, return a response return jsonify({ "app_status": "running", "error": str(e), "can_search": False, "initialization_status": initialization_status }) def load_additional_properties_background(): """Load additional properties in the background with parallel processing""" global downloaded_images, collection, client try: logger.info("🔄 Loading additional properties in background with parallel processing...") # Fetch more image data with retry api_url = "https://hivepropapi.azurewebsites.net/api/PropertyImage/list" image_data = fetch_image_data(api_url) if image_data: # Download additional properties with aggressive parallel processing try: # Try async download first loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) additional_images = loop.run_until_complete(download_images_async(image_data, num_properties=600)) loop.close() # Check if async download was successful if not additional_images: logger.warning("Async download returned no images, falling back to threaded download") additional_images = download_and_process_images(image_data, num_properties=600, max_workers=MAX_DOWNLOAD_WORKERS) else: logger.info(f"Async download successful: {len(additional_images)} images downloaded") except Exception as e: logger.warning(f"Async download failed, falling back to threaded download: {e}") additional_images = download_and_process_images(image_data, num_properties=600, max_workers=MAX_DOWNLOAD_WORKERS) # Filter out duplicates existing_ids = {img['id'] for img in downloaded_images} new_images = [img for img in additional_images if img['id'] not in existing_ids] if new_images: downloaded_images.extend(new_images) logger.info(f"✅ Added {len(new_images)} additional images in background") # Generate embeddings for new images with parallel processing if clip_model is not None and clip_processor is not None: logger.info(f"Generating embeddings for {len(new_images)} new images...") new_embeddings = generate_embeddings_parallel(new_images, clip_model, clip_processor) # Add new embeddings to ChromaDB if new_embeddings and collection: batch_data = [] for img in new_images: if img['id'] in new_embeddings: batch_data.append({ 'id': str(img['id']), 'embedding': new_embeddings[img['id']].tolist(), 'metadata': {"property_id": img['propertyId']} }) if batch_data: collection.add( embeddings=[item['embedding'] for item in batch_data], ids=[item['id'] for item in batch_data], metadatas=[item['metadata'] for item in batch_data] ) logger.info(f"✅ Added {len(batch_data)} new embeddings to ChromaDB") logger.info(f"Total items in collection: {collection.count()}") except Exception as e: logger.error(f"⚠️ Background property loading failed: {e}") def automated_daily_refresh(): """Automated 24-hour refresh that runs in background without affecting app performance""" global downloaded_images, collection, client # Initialize background_refresh_running if not defined if 'background_refresh_running' not in globals(): global background_refresh_running background_refresh_running = False if background_refresh_running: logger.info("🔄 Background refresh already running, skipping...") return background_refresh_running = True logger.info("🔄 Starting automated 24-hour background refresh...") try: # Step 1: Fetch fresh data from API logger.info("📡 Fetching fresh property data from API...") api_url = "https://hivepropapi.azurewebsites.net/api/PropertyImage/list" image_data = fetch_image_data(api_url) if not image_data: logger.warning("❌ No fresh data available for refresh") background_refresh_running = False return # Step 2: Download new images with parallel processing logger.info("⬇️ Downloading new images with parallel processing...") try: # Try async download first for maximum performance loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) fresh_images = loop.run_until_complete(download_images_async(image_data, num_properties=600)) loop.close() if not fresh_images: logger.warning("Async download failed, falling back to threaded download") fresh_images = download_and_process_images(image_data, num_properties=600, max_workers=MAX_DOWNLOAD_WORKERS) except Exception as e: logger.warning(f"Async download failed, falling back to threaded download: {e}") fresh_images = download_and_process_images(image_data, num_properties=600, max_workers=MAX_DOWNLOAD_WORKERS) if not fresh_images: logger.error("❌ Failed to download any images during refresh") background_refresh_running = False return # Step 3: Identify new images (not already in database) existing_ids = {img['id'] for img in downloaded_images} new_images = [img for img in fresh_images if img['id'] not in existing_ids] if not new_images: logger.info("✅ No new images found during refresh") background_refresh_running = False return logger.info(f"🆕 Found {len(new_images)} new images to process") # Step 4: Generate embeddings for new images if clip_model is not None and clip_processor is not None: logger.info(f"🧠 Generating embeddings for {len(new_images)} new images...") new_embeddings = generate_embeddings_parallel(new_images, clip_model, clip_processor) if not new_embeddings: logger.warning("❌ Failed to generate embeddings for new images") background_refresh_running = False return # Step 5: Update database with new embeddings if collection: logger.info("💾 Updating database with new embeddings...") batch_data = [] for img in new_images: if img['id'] in new_embeddings: batch_data.append({ 'id': str(img['id']), 'embedding': new_embeddings[img['id']].tolist(), 'metadata': {"property_id": img['propertyId']} }) if batch_data: # Add new embeddings to ChromaDB collection.add( embeddings=[item['embedding'] for item in batch_data], ids=[item['id'] for item in batch_data], metadatas=[item['metadata'] for item in batch_data] ) # Update global downloaded_images list downloaded_images.extend(new_images) logger.info(f"✅ Successfully added {len(batch_data)} new embeddings to database") logger.info(f"📊 Total items in collection: {collection.count()}") logger.info(f"📊 Total images in memory: {len(downloaded_images)}") else: logger.warning("❌ No valid embeddings to add to database") else: logger.error("❌ Database collection not available") else: logger.error("❌ CLIP model not available for embedding generation") logger.info("✅ Automated 24-hour refresh completed successfully!") except Exception as e: logger.error(f"❌ Automated refresh failed: {e}") finally: background_refresh_running = False def start_automated_refresh_scheduler(): """Start the automated 24-hour refresh scheduler""" logger.info("⏰ Setting up automated 24-hour refresh scheduler...") # Schedule daily refresh at 2:00 AM (when traffic is low) schedule.every().day.at("02:00").do(automated_daily_refresh) # Also schedule a refresh every 24 hours from now schedule.every(24).hours.do(automated_daily_refresh) logger.info("✅ Automated refresh scheduler started - will refresh every 24 hours at 2:00 AM") # Run the scheduler in a separate thread def run_scheduler(): while True: schedule.run_pending() time.sleep(60) # Check every minute scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) scheduler_thread.start() logger.info("🔄 Background scheduler thread started") if __name__ == '__main__': # Start Flask app immediately without blocking port = int(os.environ.get('PORT', 7860)) # Initialize visual search in background thread def background_init(): initialize_visual_search() init_thread = threading.Thread(target=background_init) init_thread.daemon = True init_thread.start() # Load additional properties in background after 30 seconds def start_background_loading(): time.sleep(30) # Wait for initial startup load_additional_properties_background() background_thread = threading.Thread(target=start_background_loading) background_thread.daemon = True background_thread.start() # Start automated 24-hour refresh scheduler def start_automated_scheduler(): time.sleep(60) # Wait 1 minute for initial setup start_automated_refresh_scheduler() scheduler_startup_thread = threading.Thread(target=start_automated_scheduler) scheduler_startup_thread.daemon = True scheduler_startup_thread.start() # Run Flask app immediately app.run(host='0.0.0.0', port=port, debug=False, threaded=True)