|
import requests |
|
from PIL import Image |
|
import io |
|
import os |
|
import torch |
|
import torch.nn.functional as F |
|
from transformers import CLIPProcessor, CLIPModel, AutoImageProcessor, AutoModelForImageClassification |
|
import numpy as np |
|
import chromadb |
|
from flask import Flask, request, jsonify, render_template, send_file |
|
from werkzeug.utils import secure_filename |
|
import threading |
|
import time |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
import multiprocessing |
|
from functools import partial |
|
import tempfile |
|
import shutil |
|
import warnings |
|
from pathlib import Path |
|
import asyncio |
|
import aiohttp |
|
import aiofiles |
|
from typing import List, Dict, Any, Optional |
|
import logging |
|
import schedule |
|
|
|
|
|
warnings.filterwarnings("ignore", category=FutureWarning) |
|
warnings.filterwarnings("ignore", category=UserWarning) |
|
|
|
|
|
os.environ['HF_HOME'] = '/tmp/huggingface_cache' |
|
os.environ['XDG_CACHE_HOME'] = '/tmp/huggingface_cache' |
|
|
|
|
|
os.makedirs('/tmp/huggingface_cache', exist_ok=True) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = Flask(__name__, template_folder='templates') |
|
app.config['UPLOAD_FOLDER'] = '/tmp/uploads' |
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 |
|
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
|
|
|
|
|
clip_model = None |
|
clip_processor = None |
|
property_classifier = None |
|
property_processor = None |
|
collection = None |
|
client = None |
|
downloaded_images = [] |
|
initialization_status = "Initializing..." |
|
|
|
|
|
MAX_DOWNLOAD_WORKERS = 32 |
|
MAX_EMBEDDING_WORKERS = 24 |
|
MAX_PROCESS_WORKERS = min(16, multiprocessing.cpu_count()) |
|
BATCH_SIZE = 100 |
|
CHUNK_SIZE = 50 |
|
|
|
|
|
embedding_cache = {} |
|
cache_lock = threading.Lock() |
|
|
|
|
|
initialization_progress = 0 |
|
initialization_start_time = time.time() |
|
|
|
|
|
def fetch_image_data(api_url): |
|
"""Fetch image data from API with retry mechanism without timeouts""" |
|
max_retries = 5 |
|
for attempt in range(max_retries): |
|
try: |
|
|
|
response = requests.get(api_url, timeout=None) |
|
response.raise_for_status() |
|
data = response.json() |
|
logger.info(f"Successfully fetched {len(data)} images from API") |
|
return data |
|
except requests.exceptions.Timeout: |
|
logger.warning(f"Timeout fetching data (attempt {attempt + 1})") |
|
if attempt == max_retries - 1: |
|
logger.error(f"Failed to fetch image data after {max_retries} attempts due to timeouts") |
|
return [] |
|
time.sleep(5 * (attempt + 1)) |
|
except requests.exceptions.RequestException as e: |
|
logger.warning(f"Request error (attempt {attempt + 1}): {e}") |
|
if attempt == max_retries - 1: |
|
logger.error(f"Failed to fetch image data after {max_retries} attempts") |
|
return [] |
|
time.sleep(5 * (attempt + 1)) |
|
except Exception as e: |
|
logger.error(f"Unexpected error fetching data (attempt {attempt + 1}): {e}") |
|
if attempt == max_retries - 1: |
|
return [] |
|
time.sleep(5 * (attempt + 1)) |
|
return [] |
|
|
|
async def download_single_image_async(session, item): |
|
"""Async version of image download for better performance without timeouts""" |
|
try: |
|
|
|
image_url = item.get('cloudinaryUrl') or item.get('imageUrl') or item.get('image_url') or item.get('url') |
|
image_id = item.get('id') |
|
property_id = item.get('propertyId') |
|
|
|
if not image_url or not image_id: |
|
return None |
|
|
|
|
|
temp_dir = Path('/tmp/property_images') |
|
temp_dir.mkdir(exist_ok=True) |
|
|
|
|
|
file_extension = image_url.split('.')[-1].split('?')[0] |
|
if file_extension not in ['jpg', 'jpeg', 'png', 'webp']: |
|
file_extension = 'jpg' |
|
filename = f"{image_id}.{file_extension}" |
|
filepath = temp_dir / filename |
|
|
|
|
|
if filepath.exists(): |
|
return { |
|
'id': image_id, |
|
'propertyId': property_id, |
|
'filepath': str(filepath), |
|
'imageUrl': image_url |
|
} |
|
|
|
|
|
async with session.get(image_url) as response: |
|
if response.status == 200: |
|
content = await response.read() |
|
|
|
|
|
async with aiofiles.open(filepath, 'wb') as f: |
|
await f.write(content) |
|
|
|
logger.debug(f"Successfully downloaded {image_url} -> {filepath}") |
|
return { |
|
'id': image_id, |
|
'propertyId': property_id, |
|
'filepath': str(filepath), |
|
'imageUrl': image_url |
|
} |
|
else: |
|
logger.warning(f"Failed to download {image_url}: HTTP {response.status}") |
|
return None |
|
|
|
except asyncio.TimeoutError: |
|
logger.warning(f"Timeout downloading {image_url}") |
|
return None |
|
except aiohttp.ClientError as e: |
|
logger.warning(f"Client error downloading {image_url}: {e}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Error downloading {image_url}: {e}") |
|
return None |
|
|
|
def download_single_image(item): |
|
"""Synchronous version for ThreadPoolExecutor fallback without timeouts""" |
|
try: |
|
|
|
image_url = item.get('cloudinaryUrl') or item.get('imageUrl') or item.get('image_url') or item.get('url') |
|
image_id = item.get('id') |
|
property_id = item.get('propertyId') |
|
|
|
if not image_url or not image_id: |
|
return None |
|
|
|
|
|
temp_dir = Path('/tmp/property_images') |
|
temp_dir.mkdir(exist_ok=True) |
|
|
|
|
|
file_extension = image_url.split('.')[-1].split('?')[0] |
|
if file_extension not in ['jpg', 'jpeg', 'png', 'webp']: |
|
file_extension = 'jpg' |
|
filename = f"{image_id}.{file_extension}" |
|
filepath = temp_dir / filename |
|
|
|
|
|
if filepath.exists(): |
|
return { |
|
'id': image_id, |
|
'propertyId': property_id, |
|
'filepath': str(filepath), |
|
'imageUrl': image_url |
|
} |
|
|
|
|
|
for attempt in range(3): |
|
try: |
|
|
|
response = requests.get(image_url, stream=True, timeout=None) |
|
response.raise_for_status() |
|
|
|
with open(filepath, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
|
|
return { |
|
'id': image_id, |
|
'propertyId': property_id, |
|
'filepath': str(filepath), |
|
'imageUrl': image_url |
|
} |
|
except requests.exceptions.Timeout: |
|
logger.warning(f"Timeout downloading {image_url} (attempt {attempt + 1})") |
|
if attempt == 2: |
|
return None |
|
time.sleep(2) |
|
except requests.exceptions.RequestException as e: |
|
logger.warning(f"Request error downloading {image_url} (attempt {attempt + 1}): {e}") |
|
if attempt == 2: |
|
return None |
|
time.sleep(2) |
|
except Exception as e: |
|
logger.error(f"Unexpected error downloading {image_url} (attempt {attempt + 1}): {e}") |
|
if attempt == 2: |
|
return None |
|
time.sleep(2) |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing image from {image_url}: {e}") |
|
return None |
|
|
|
|
|
def download_and_process_images(image_data, num_properties=600, max_workers=MAX_DOWNLOAD_WORKERS): |
|
"""Download and process images with optimized parallel processing""" |
|
temp_dir = Path('/tmp/property_images') |
|
temp_dir.mkdir(exist_ok=True) |
|
|
|
downloaded_images = [] |
|
processed_property_ids = set() |
|
property_image_data = {} |
|
|
|
|
|
for item in image_data: |
|
property_id = item.get('propertyId') |
|
if property_id is not None: |
|
if property_id not in property_image_data: |
|
property_image_data[property_id] = [] |
|
property_image_data[property_id].append(item) |
|
|
|
|
|
properties_to_process = list(property_image_data.items())[:num_properties] |
|
all_images_to_process = [] |
|
for property_id, images in properties_to_process: |
|
processed_property_ids.add(property_id) |
|
all_images_to_process.extend(images) |
|
|
|
logger.info(f"Starting optimized parallel download of {len(all_images_to_process)} images using {max_workers} workers...") |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
future_to_item = {executor.submit(download_single_image, item): item for item in all_images_to_process} |
|
|
|
|
|
completed_count = 0 |
|
for future in as_completed(future_to_item): |
|
result = future.result() |
|
if result is not None: |
|
downloaded_images.append(result) |
|
completed_count += 1 |
|
if completed_count % 50 == 0: |
|
logger.info(f"Downloaded {completed_count}/{len(all_images_to_process)} images ({completed_count/len(all_images_to_process)*100:.1f}%)") |
|
|
|
logger.info(f"β
Finished downloading. Successfully processed images from {len(processed_property_ids)} properties. Total images downloaded: {len(downloaded_images)}") |
|
return downloaded_images |
|
|
|
async def download_images_async(image_data, num_properties=600): |
|
"""Ultra-fast async image downloading with optimized performance""" |
|
temp_dir = Path('/tmp/property_images') |
|
temp_dir.mkdir(exist_ok=True) |
|
|
|
downloaded_images = [] |
|
processed_property_ids = set() |
|
property_image_data = {} |
|
|
|
|
|
for item in image_data: |
|
property_id = item.get('propertyId') |
|
if property_id is not None: |
|
if property_id not in property_image_data: |
|
property_image_data[property_id] = [] |
|
property_image_data[property_id].append(item) |
|
|
|
|
|
properties_to_process = list(property_image_data.items())[:num_properties] |
|
all_images_to_process = [] |
|
for property_id, images in properties_to_process: |
|
processed_property_ids.add(property_id) |
|
all_images_to_process.extend(images) |
|
|
|
logger.info(f"π Starting ultra-fast async download of {len(all_images_to_process)} images...") |
|
|
|
|
|
max_concurrent = min(64, len(all_images_to_process)) |
|
semaphore = asyncio.Semaphore(max_concurrent) |
|
|
|
logger.info(f"β‘ Using {max_concurrent} concurrent downloads") |
|
|
|
|
|
connector = aiohttp.TCPConnector( |
|
limit=max_concurrent, |
|
limit_per_host=50, |
|
ttl_dns_cache=600, |
|
use_dns_cache=True, |
|
keepalive_timeout=60, |
|
enable_cleanup_closed=True, |
|
force_close=False, |
|
ssl=False |
|
) |
|
|
|
|
|
timeout = aiohttp.ClientTimeout( |
|
total=30, |
|
connect=10, |
|
sock_read=20 |
|
) |
|
|
|
async with aiohttp.ClientSession( |
|
connector=connector, |
|
timeout=timeout, |
|
headers={ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', |
|
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8', |
|
'Accept-Encoding': 'gzip, deflate, br', |
|
'Connection': 'keep-alive' |
|
} |
|
) as session: |
|
|
|
async def download_with_semaphore(item): |
|
async with semaphore: |
|
return await download_single_image_async(session, item) |
|
|
|
|
|
try: |
|
tasks = [download_with_semaphore(item) for item in all_images_to_process] |
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
for result in results: |
|
if isinstance(result, dict) and result is not None: |
|
downloaded_images.append(result) |
|
elif isinstance(result, Exception): |
|
logger.debug(f"Download failed: {result}") |
|
|
|
|
|
completed = len(downloaded_images) |
|
total = len(all_images_to_process) |
|
logger.info(f"β
Downloaded {completed}/{total} images ({completed/total*100:.1f}%)") |
|
|
|
except Exception as e: |
|
logger.error(f"Error in async download: {e}") |
|
|
|
logger.info(f"β
Ultra-fast async download complete: {len(downloaded_images)} images from {len(processed_property_ids)} properties") |
|
return downloaded_images |
|
|
|
|
|
def get_image_embedding_clip(image_path, clip_model, clip_processor): |
|
"""Generate CLIP embedding for a single image with caching""" |
|
|
|
with cache_lock: |
|
if image_path in embedding_cache: |
|
return embedding_cache[image_path] |
|
|
|
if clip_model is None or clip_processor is None: |
|
return None |
|
|
|
try: |
|
|
|
image = Image.open(image_path).convert('RGB') |
|
|
|
|
|
max_size = 512 |
|
if max(image.size) > max_size: |
|
ratio = max_size / max(image.size) |
|
new_size = tuple(int(dim * ratio) for dim in image.size) |
|
image = image.resize(new_size, Image.Resampling.LANCZOS) |
|
|
|
inputs = clip_processor(images=image, return_tensors="pt", padding=True) |
|
|
|
|
|
with torch.no_grad(): |
|
image_features = clip_model.get_image_features(**inputs) |
|
embedding = image_features.numpy().flatten() |
|
|
|
|
|
with cache_lock: |
|
embedding_cache[image_path] = embedding |
|
|
|
return embedding |
|
except Exception as e: |
|
logger.error(f"Error processing image {image_path}: {e}") |
|
return None |
|
|
|
def process_single_embedding(image_info, clip_model, clip_processor): |
|
"""Process a single image for embedding generation""" |
|
filepath = image_info['filepath'] |
|
image_id = image_info['id'] |
|
embedding = get_image_embedding_clip(filepath, clip_model, clip_processor) |
|
if embedding is not None: |
|
return image_id, embedding |
|
return None |
|
|
|
def process_embeddings_batch(batch, clip_model, clip_processor): |
|
"""Process a batch of images for embedding generation using multiprocessing""" |
|
results = [] |
|
for image_info in batch: |
|
result = process_single_embedding(image_info, clip_model, clip_processor) |
|
if result is not None: |
|
results.append(result) |
|
return results |
|
|
|
def generate_embeddings_parallel(downloaded_images, clip_model, clip_processor): |
|
"""Generate embeddings with ultra-fast parallel processing using multiple strategies""" |
|
if not downloaded_images or clip_model is None or clip_processor is None: |
|
logger.warning("No images or CLIP model available for embedding generation") |
|
return {} |
|
|
|
logger.info("π Starting ultra-fast parallel embedding generation...") |
|
|
|
|
|
max_workers = min(32, multiprocessing.cpu_count() * 4) |
|
batch_size = 50 |
|
|
|
logger.info(f"β‘ Using {max_workers} workers with batch size {batch_size}") |
|
|
|
image_embeddings = {} |
|
completed_count = 0 |
|
total_images = len(downloaded_images) |
|
|
|
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
if device == 'cuda': |
|
clip_model = clip_model.to(device) |
|
clip_model.eval() |
|
logger.info("π₯ Model loaded on GPU for maximum speed") |
|
|
|
def process_single_image_fast(image_info): |
|
"""Ultra-fast single image processing with GPU acceleration""" |
|
try: |
|
filepath = image_info['filepath'] |
|
image_id = image_info['id'] |
|
|
|
|
|
with cache_lock: |
|
if filepath in embedding_cache: |
|
return image_id, embedding_cache[filepath] |
|
|
|
|
|
image = Image.open(filepath).convert('RGB') |
|
|
|
|
|
max_size = 224 |
|
if max(image.size) > max_size: |
|
ratio = max_size / max(image.size) |
|
new_size = tuple(int(dim * ratio) for dim in image.size) |
|
image = image.resize(new_size, Image.Resampling.LANCZOS) |
|
|
|
|
|
inputs = clip_processor(images=image, return_tensors="pt", padding=True) |
|
|
|
|
|
if device == 'cuda': |
|
inputs = {k: v.to(device) for k, v in inputs.items()} |
|
|
|
|
|
with torch.no_grad(): |
|
image_features = clip_model.get_image_features(**inputs) |
|
embedding = image_features.cpu().numpy().flatten() |
|
|
|
|
|
with cache_lock: |
|
embedding_cache[filepath] = embedding |
|
|
|
return image_id, embedding |
|
|
|
except Exception as e: |
|
logger.debug(f"Error processing {filepath}: {e}") |
|
return None |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
future_to_image = {executor.submit(process_single_image_fast, img): img for img in downloaded_images} |
|
|
|
|
|
for future in as_completed(future_to_image): |
|
try: |
|
result = future.result(timeout=60) |
|
if result is not None: |
|
image_id, embedding = result |
|
image_embeddings[image_id] = embedding |
|
completed_count += 1 |
|
|
|
|
|
if completed_count % 25 == 0: |
|
progress = (completed_count / total_images) * 100 |
|
logger.info(f"β‘ Generated embeddings: {completed_count}/{total_images} ({progress:.1f}%)") |
|
|
|
except Exception as e: |
|
logger.warning(f"Error processing image: {e}") |
|
continue |
|
|
|
logger.info(f"β
Ultra-fast embedding generation complete: {len(image_embeddings)} images processed") |
|
return image_embeddings |
|
|
|
|
|
def search_similar_images(query_image_path, collection, clip_model, clip_processor, n_results=30): |
|
if clip_model is None or clip_processor is None: |
|
print("CLIP model is not loaded. Cannot perform search.") |
|
return None |
|
|
|
|
|
query_embedding = get_image_embedding_clip(query_image_path, clip_model, clip_processor) |
|
|
|
if query_embedding is None: |
|
print(f"Could not generate embedding for query image: {query_image_path}") |
|
return None |
|
|
|
|
|
try: |
|
results = collection.query( |
|
query_embeddings=[query_embedding.tolist()], |
|
n_results=n_results, |
|
include=['metadatas', 'distances'], |
|
|
|
where=None |
|
) |
|
return results |
|
except Exception as e: |
|
print(f"Error during ChromaDB search: {e}") |
|
return None |
|
|
|
|
|
def is_property_related_image(image_path, threshold=0.4): |
|
""" |
|
Check if the uploaded image is property/real estate related using andupets/real-estate-image-classification |
|
This model is specifically trained for real estate classification with 89.6% accuracy |
|
Using 0.4 threshold for more lenient property detection |
|
""" |
|
try: |
|
if property_classifier is None or property_processor is None: |
|
print("Property classifier not loaded, proceeding with search...") |
|
return True, 0.5, "Classifier unavailable" |
|
|
|
|
|
image = Image.open(image_path).convert('RGB') |
|
|
|
|
|
max_size = 224 |
|
if max(image.size) > max_size: |
|
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) |
|
|
|
|
|
inputs = property_processor(images=image, return_tensors="pt") |
|
|
|
|
|
with torch.no_grad(): |
|
outputs = property_classifier(**inputs) |
|
logits = outputs.logits |
|
probs = torch.softmax(logits, dim=1).detach().numpy()[0] |
|
|
|
|
|
max_prob_idx = probs.argmax() |
|
max_prob = probs[max_prob_idx] |
|
|
|
|
|
if hasattr(property_classifier.config, 'id2label'): |
|
predicted_label = property_classifier.config.id2label[max_prob_idx] |
|
else: |
|
predicted_label = f"class_{max_prob_idx}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_property = max_prob > threshold |
|
|
|
|
|
if max_prob > 0.3 and predicted_label.lower() in ['bathroom', 'bedroom', 'dining room', 'house facade', 'kitchen', 'living room', 'sao paulo apartment facade']: |
|
is_property = True |
|
|
|
print(f"Property classification: {predicted_label} (confidence: {max_prob:.3f}, is_property: {is_property})") |
|
|
|
return is_property, float(max_prob), predicted_label |
|
|
|
except Exception as e: |
|
print(f"Error in property classification: {e}") |
|
|
|
return True, 0.5, f"Error: {str(e)}" |
|
|
|
|
|
def load_property_classifier(): |
|
"""Load a lightweight property classification model with optimized caching""" |
|
global property_classifier, property_processor |
|
|
|
try: |
|
print("Loading property classification model...") |
|
|
|
|
|
model_options = [ |
|
"andupets/real-estate-image-classification", |
|
"microsoft/resnet-50", |
|
"google/vit-base-patch16-224" |
|
] |
|
|
|
for model_name in model_options: |
|
try: |
|
print(f"Trying to load: {model_name}") |
|
|
|
|
|
cache_dir = '/tmp/huggingface_cache' |
|
os.makedirs(cache_dir, exist_ok=True) |
|
|
|
property_processor = AutoImageProcessor.from_pretrained( |
|
model_name, |
|
cache_dir=cache_dir, |
|
local_files_only=False |
|
) |
|
property_classifier = AutoModelForImageClassification.from_pretrained( |
|
model_name, |
|
cache_dir=cache_dir, |
|
local_files_only=False |
|
) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
property_classifier = property_classifier.to('cuda') |
|
property_classifier.eval() |
|
print(f"β
Property classifier loaded on GPU: {model_name}") |
|
else: |
|
property_classifier.eval() |
|
print(f"β
Property classifier loaded on CPU: {model_name}") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
print(f"Failed to load {model_name}: {e}") |
|
continue |
|
|
|
print("β οΈ Warning: Could not load any property classification model") |
|
return False |
|
|
|
except Exception as e: |
|
print(f"Error loading property classifier: {e}") |
|
return False |
|
|
|
|
|
app = Flask(__name__, template_folder='templates') |
|
app.config['UPLOAD_FOLDER'] = '/tmp/uploads' |
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 |
|
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
|
|
|
|
|
clip_model = None |
|
clip_processor = None |
|
property_classifier = None |
|
property_processor = None |
|
collection = None |
|
client = None |
|
downloaded_images = [] |
|
initialization_status = "Initializing..." |
|
|
|
def initialize_visual_search(): |
|
"""Initialize the visual search system with aggressive parallel processing""" |
|
global clip_model, clip_processor, property_classifier, property_processor, collection, client, downloaded_images, initialization_status, initialization_progress |
|
|
|
initialization_status = "Fetching image data..." |
|
initialization_progress = 10 |
|
logger.info("π Initializing visual search system with ultra-fast parallel processing...") |
|
|
|
|
|
api_url = "https://hivepropapi.azurewebsites.net/api/PropertyImage/list" |
|
collection_name = "property_image_embeddings" |
|
|
|
|
|
logger.info("π‘ Fetching image data from API...") |
|
image_data = fetch_image_data(api_url) |
|
initialization_progress = 20 |
|
|
|
if not image_data: |
|
logger.warning("No image data fetched. Using sample data for testing.") |
|
initialization_status = "No image data available" |
|
initialization_progress = 100 |
|
return |
|
|
|
|
|
initialization_status = "Downloading property images..." |
|
initialization_progress = 30 |
|
logger.info("β¬οΈ Downloading and processing images with ultra-fast parallel processing...") |
|
|
|
|
|
try: |
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
downloaded_images = loop.run_until_complete(download_images_async(image_data, num_properties=300)) |
|
loop.close() |
|
|
|
|
|
if not downloaded_images: |
|
logger.warning("Async download returned no images, falling back to threaded download") |
|
downloaded_images = download_and_process_images(image_data, num_properties=300, max_workers=MAX_DOWNLOAD_WORKERS) |
|
else: |
|
logger.info(f"β
Async download successful: {len(downloaded_images)} images downloaded") |
|
|
|
initialization_progress = 50 |
|
|
|
except Exception as e: |
|
logger.warning(f"Async download failed, falling back to threaded download: {e}") |
|
downloaded_images = download_and_process_images(image_data, num_properties=300, max_workers=MAX_DOWNLOAD_WORKERS) |
|
initialization_progress = 50 |
|
|
|
|
|
initialization_status = "Loading property classifier..." |
|
initialization_progress = 60 |
|
try: |
|
logger.info("π Loading property classification model...") |
|
property_classifier_loaded = load_property_classifier() |
|
if property_classifier_loaded: |
|
logger.info("β
Property classification model loaded successfully.") |
|
else: |
|
logger.warning("β οΈ Property classification model could not be loaded, will proceed without it.") |
|
except Exception as e: |
|
logger.error(f"Error loading property classifier: {e}") |
|
|
|
|
|
initialization_status = "Loading AI model..." |
|
initialization_progress = 70 |
|
try: |
|
logger.info("π§ Loading CLIP model and processor...") |
|
|
|
cache_dir = '/tmp/huggingface_cache' |
|
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32", cache_dir=cache_dir) |
|
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32", cache_dir=cache_dir) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
clip_model = clip_model.to('cuda') |
|
logger.info("π₯ CLIP model loaded on GPU") |
|
else: |
|
logger.info("π» CLIP model loaded on CPU") |
|
|
|
logger.info("β
CLIP model and processor loaded successfully.") |
|
initialization_progress = 80 |
|
except Exception as e: |
|
logger.error(f"Error loading CLIP model: {e}") |
|
clip_model = None |
|
clip_processor = None |
|
initialization_status = "Failed to load AI model" |
|
initialization_progress = 100 |
|
return |
|
|
|
|
|
image_embeddings = {} |
|
if clip_model is not None and clip_processor is not None and downloaded_images: |
|
initialization_status = "Generating image embeddings..." |
|
initialization_progress = 85 |
|
logger.info("π§ Generating embeddings with ultra-fast parallel processing...") |
|
|
|
|
|
image_embeddings = generate_embeddings_parallel(downloaded_images, clip_model, clip_processor) |
|
|
|
if not image_embeddings: |
|
logger.warning("No embeddings generated. Skipping database setup.") |
|
initialization_status = "No embeddings generated" |
|
initialization_progress = 100 |
|
return |
|
else: |
|
initialization_progress = 90 |
|
else: |
|
logger.warning("Skipping embedding generation as the CLIP model could not be loaded or no images downloaded.") |
|
initialization_status = "No embeddings generated" |
|
initialization_progress = 100 |
|
return |
|
|
|
|
|
initialization_status = "Setting up database..." |
|
initialization_progress = 95 |
|
try: |
|
|
|
client = chromadb.Client(settings=chromadb.config.Settings( |
|
anonymized_telemetry=False, |
|
allow_reset=True |
|
)) |
|
|
|
|
|
try: |
|
collection = client.get_collection(name=collection_name) |
|
logger.info(f"β
Using existing collection '{collection_name}' with {collection.count()} items") |
|
initialization_status = "Ready!" |
|
initialization_progress = 100 |
|
return |
|
except: |
|
|
|
collection = client.create_collection(name=collection_name) |
|
logger.info(f"β
Created new collection '{collection_name}'") |
|
|
|
|
|
if image_embeddings: |
|
logger.info("πΎ Preparing data for ChromaDB insertion...") |
|
|
|
|
|
batch_size = 1000 |
|
total_embeddings = len(image_embeddings) |
|
|
|
for i in range(0, total_embeddings, batch_size): |
|
batch_end = min(i + batch_size, total_embeddings) |
|
batch_images = list(downloaded_images)[i:batch_end] |
|
|
|
|
|
batch_data = [] |
|
for image_info in batch_images: |
|
if image_info['id'] in image_embeddings: |
|
batch_data.append({ |
|
'id': str(image_info['id']), |
|
'embedding': image_embeddings[image_info['id']].tolist(), |
|
'metadata': {"property_id": image_info['propertyId']} |
|
}) |
|
|
|
if batch_data: |
|
|
|
collection.add( |
|
embeddings=[item['embedding'] for item in batch_data], |
|
ids=[item['id'] for item in batch_data], |
|
metadatas=[item['metadata'] for item in batch_data] |
|
) |
|
logger.info(f"β
Added batch {i//batch_size + 1}: {len(batch_data)} embeddings") |
|
|
|
logger.info(f"β
Successfully added {len(image_embeddings)} embeddings to ChromaDB") |
|
logger.info(f"π Total items in ChromaDB collection: {collection.count()}") |
|
initialization_status = "Ready!" |
|
initialization_progress = 100 |
|
else: |
|
logger.warning("No embeddings generated. Collection not populated.") |
|
initialization_status = "No data available" |
|
initialization_progress = 100 |
|
except Exception as e: |
|
logger.error(f"Error initializing ChromaDB: {e}") |
|
collection = None |
|
initialization_status = "Database error" |
|
initialization_progress = 100 |
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('index.html') |
|
|
|
@app.route('/search', methods=['POST']) |
|
def search(): |
|
"""Search endpoint with graceful handling of initialization""" |
|
if 'file' not in request.files: |
|
return jsonify({"error": "No file part"}), 400 |
|
|
|
file = request.files['file'] |
|
if file.filename == '': |
|
return jsonify({"error": "No selected file"}), 400 |
|
|
|
|
|
if not collection or not clip_model or not clip_processor: |
|
return jsonify({ |
|
"error": "System initializing", |
|
"message": "The visual search system is still initializing. Please try again in a few moments.", |
|
"status": initialization_status, |
|
"can_retry": True |
|
}), 503 |
|
|
|
try: |
|
filename = secure_filename(file.filename) |
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
file.save(filepath) |
|
|
|
|
|
try: |
|
is_property, confidence, predicted_label = is_property_related_image(filepath, threshold=0.4) |
|
print(f"Uploaded image '{filename}' is property-related: {is_property} (Confidence: {confidence:.2f}, Predicted Label: {predicted_label})") |
|
|
|
if not is_property: |
|
return jsonify({ |
|
"error": "Non-property image detected", |
|
"message": f"The uploaded image appears to be '{predicted_label}' with low confidence ({confidence:.2f}). This doesn't seem to be a real estate property image.", |
|
"details": { |
|
"predicted_label": predicted_label, |
|
"confidence": f"{confidence:.2f}", |
|
"threshold": "0.4", |
|
"suggestion": "Please upload an image of a property (bathroom, bedroom, kitchen, living room, house facade, etc.)" |
|
} |
|
}), 400 |
|
except Exception as e: |
|
print(f"Error during property classification: {e}") |
|
|
|
is_property, confidence, predicted_label = True, 0.5, "Classification failed" |
|
|
|
search_results = search_similar_images(filepath, collection, clip_model, clip_processor, n_results=30) |
|
|
|
if search_results and search_results['ids'] and search_results['ids'][0]: |
|
results = [] |
|
for i in range(len(search_results['ids'][0])): |
|
image_id = search_results['ids'][0][i] |
|
distance = search_results['distances'][0][i] |
|
property_id = search_results['metadatas'][0][i]['property_id'] |
|
|
|
|
|
image_filepath = None |
|
for img_info in downloaded_images: |
|
if str(img_info['id']) == str(image_id): |
|
image_filepath = img_info['filepath'] |
|
break |
|
|
|
|
|
|
|
similarity_score = max(0, (1 - distance) * 100) |
|
|
|
results.append({ |
|
'image_id': image_id, |
|
'property_id': property_id, |
|
'distance': f"{distance:.4f}", |
|
'similarity_score': f"{similarity_score:.1f}%", |
|
'image_path': f"/property_image/{image_id}" if image_filepath else None |
|
}) |
|
|
|
return jsonify({ |
|
"results": results, |
|
"property_check": { |
|
"is_property": True, |
|
"confidence": f"{confidence:.2f}", |
|
"predicted_label": predicted_label |
|
} |
|
}) |
|
else: |
|
return jsonify({"message": "No similar images found."}) |
|
|
|
except Exception as e: |
|
return jsonify({ |
|
"error": "Search failed", |
|
"message": str(e), |
|
"can_retry": True |
|
}), 500 |
|
|
|
return jsonify({"error": "Visual search system not initialized"}), 500 |
|
|
|
@app.route('/health') |
|
def health(): |
|
"""Simple health check endpoint""" |
|
return jsonify({ |
|
"status": "healthy", |
|
"timestamp": time.time(), |
|
"app_running": True, |
|
"uptime_seconds": round(time.time() - initialization_start_time, 1) |
|
}) |
|
|
|
@app.route('/test') |
|
def test(): |
|
"""Test endpoint to verify app is responding""" |
|
return jsonify({ |
|
"message": "App is working! π", |
|
"timestamp": time.time(), |
|
"status": "operational" |
|
}) |
|
|
|
@app.route('/property_image/<image_id>') |
|
def serve_property_image(image_id): |
|
"""Serve property images from the property_images directory""" |
|
try: |
|
|
|
image_filepath = None |
|
for img_info in downloaded_images: |
|
if str(img_info['id']) == str(image_id): |
|
image_filepath = img_info['filepath'] |
|
break |
|
|
|
if image_filepath and os.path.exists(image_filepath): |
|
return send_file(image_filepath, mimetype='image/jpeg') |
|
else: |
|
return jsonify({"error": "Image not found"}), 404 |
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route('/status') |
|
def status(): |
|
"""Check if the visual search system is ready - app always responds""" |
|
try: |
|
total_images = collection.count() if collection else 0 |
|
|
|
|
|
elapsed_time = time.time() - initialization_start_time |
|
progress_percentage = min(100, initialization_progress) |
|
|
|
|
|
background_status = "idle" |
|
if total_images > 0 and total_images < 1000: |
|
background_status = "loading" |
|
|
|
return jsonify({ |
|
"app_status": "running", |
|
"model_loaded": clip_model is not None and clip_processor is not None, |
|
"property_classifier_loaded": property_classifier is not None and property_processor is not None, |
|
"collection_ready": collection is not None, |
|
"total_images": total_images, |
|
"background_loading": background_status, |
|
"initialization_status": initialization_status, |
|
"initialization_progress": progress_percentage, |
|
"elapsed_time_seconds": round(elapsed_time, 1), |
|
"can_search": collection is not None and clip_model is not None and clip_processor is not None, |
|
"estimated_time_remaining": "calculating..." if progress_percentage < 100 else "complete" |
|
}) |
|
except Exception as e: |
|
|
|
return jsonify({ |
|
"app_status": "running", |
|
"error": str(e), |
|
"can_search": False, |
|
"initialization_status": initialization_status |
|
}) |
|
|
|
def load_additional_properties_background(): |
|
"""Load additional properties in the background with parallel processing""" |
|
global downloaded_images, collection, client |
|
|
|
try: |
|
logger.info("π Loading additional properties in background with parallel processing...") |
|
|
|
|
|
api_url = "https://hivepropapi.azurewebsites.net/api/PropertyImage/list" |
|
image_data = fetch_image_data(api_url) |
|
|
|
if image_data: |
|
|
|
try: |
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
additional_images = loop.run_until_complete(download_images_async(image_data, num_properties=600)) |
|
loop.close() |
|
|
|
|
|
if not additional_images: |
|
logger.warning("Async download returned no images, falling back to threaded download") |
|
additional_images = download_and_process_images(image_data, num_properties=600, max_workers=MAX_DOWNLOAD_WORKERS) |
|
else: |
|
logger.info(f"Async download successful: {len(additional_images)} images downloaded") |
|
|
|
except Exception as e: |
|
logger.warning(f"Async download failed, falling back to threaded download: {e}") |
|
additional_images = download_and_process_images(image_data, num_properties=600, max_workers=MAX_DOWNLOAD_WORKERS) |
|
|
|
|
|
existing_ids = {img['id'] for img in downloaded_images} |
|
new_images = [img for img in additional_images if img['id'] not in existing_ids] |
|
|
|
if new_images: |
|
downloaded_images.extend(new_images) |
|
logger.info(f"β
Added {len(new_images)} additional images in background") |
|
|
|
|
|
if clip_model is not None and clip_processor is not None: |
|
logger.info(f"Generating embeddings for {len(new_images)} new images...") |
|
new_embeddings = generate_embeddings_parallel(new_images, clip_model, clip_processor) |
|
|
|
|
|
if new_embeddings and collection: |
|
batch_data = [] |
|
for img in new_images: |
|
if img['id'] in new_embeddings: |
|
batch_data.append({ |
|
'id': str(img['id']), |
|
'embedding': new_embeddings[img['id']].tolist(), |
|
'metadata': {"property_id": img['propertyId']} |
|
}) |
|
|
|
if batch_data: |
|
collection.add( |
|
embeddings=[item['embedding'] for item in batch_data], |
|
ids=[item['id'] for item in batch_data], |
|
metadatas=[item['metadata'] for item in batch_data] |
|
) |
|
logger.info(f"β
Added {len(batch_data)} new embeddings to ChromaDB") |
|
logger.info(f"Total items in collection: {collection.count()}") |
|
|
|
except Exception as e: |
|
logger.error(f"β οΈ Background property loading failed: {e}") |
|
|
|
def automated_daily_refresh(): |
|
"""Automated 24-hour refresh that runs in background without affecting app performance""" |
|
global downloaded_images, collection, client |
|
|
|
|
|
if 'background_refresh_running' not in globals(): |
|
global background_refresh_running |
|
background_refresh_running = False |
|
|
|
if background_refresh_running: |
|
logger.info("π Background refresh already running, skipping...") |
|
return |
|
|
|
background_refresh_running = True |
|
logger.info("π Starting automated 24-hour background refresh...") |
|
|
|
try: |
|
|
|
logger.info("π‘ Fetching fresh property data from API...") |
|
api_url = "https://hivepropapi.azurewebsites.net/api/PropertyImage/list" |
|
image_data = fetch_image_data(api_url) |
|
|
|
if not image_data: |
|
logger.warning("β No fresh data available for refresh") |
|
background_refresh_running = False |
|
return |
|
|
|
|
|
logger.info("β¬οΈ Downloading new images with parallel processing...") |
|
try: |
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
fresh_images = loop.run_until_complete(download_images_async(image_data, num_properties=600)) |
|
loop.close() |
|
|
|
if not fresh_images: |
|
logger.warning("Async download failed, falling back to threaded download") |
|
fresh_images = download_and_process_images(image_data, num_properties=600, max_workers=MAX_DOWNLOAD_WORKERS) |
|
|
|
except Exception as e: |
|
logger.warning(f"Async download failed, falling back to threaded download: {e}") |
|
fresh_images = download_and_process_images(image_data, num_properties=600, max_workers=MAX_DOWNLOAD_WORKERS) |
|
|
|
if not fresh_images: |
|
logger.error("β Failed to download any images during refresh") |
|
background_refresh_running = False |
|
return |
|
|
|
|
|
existing_ids = {img['id'] for img in downloaded_images} |
|
new_images = [img for img in fresh_images if img['id'] not in existing_ids] |
|
|
|
if not new_images: |
|
logger.info("β
No new images found during refresh") |
|
background_refresh_running = False |
|
return |
|
|
|
logger.info(f"π Found {len(new_images)} new images to process") |
|
|
|
|
|
if clip_model is not None and clip_processor is not None: |
|
logger.info(f"π§ Generating embeddings for {len(new_images)} new images...") |
|
new_embeddings = generate_embeddings_parallel(new_images, clip_model, clip_processor) |
|
|
|
if not new_embeddings: |
|
logger.warning("β Failed to generate embeddings for new images") |
|
background_refresh_running = False |
|
return |
|
|
|
|
|
if collection: |
|
logger.info("πΎ Updating database with new embeddings...") |
|
batch_data = [] |
|
for img in new_images: |
|
if img['id'] in new_embeddings: |
|
batch_data.append({ |
|
'id': str(img['id']), |
|
'embedding': new_embeddings[img['id']].tolist(), |
|
'metadata': {"property_id": img['propertyId']} |
|
}) |
|
|
|
if batch_data: |
|
|
|
collection.add( |
|
embeddings=[item['embedding'] for item in batch_data], |
|
ids=[item['id'] for item in batch_data], |
|
metadatas=[item['metadata'] for item in batch_data] |
|
) |
|
|
|
|
|
downloaded_images.extend(new_images) |
|
|
|
logger.info(f"β
Successfully added {len(batch_data)} new embeddings to database") |
|
logger.info(f"π Total items in collection: {collection.count()}") |
|
logger.info(f"π Total images in memory: {len(downloaded_images)}") |
|
else: |
|
logger.warning("β No valid embeddings to add to database") |
|
else: |
|
logger.error("β Database collection not available") |
|
else: |
|
logger.error("β CLIP model not available for embedding generation") |
|
|
|
logger.info("β
Automated 24-hour refresh completed successfully!") |
|
|
|
except Exception as e: |
|
logger.error(f"β Automated refresh failed: {e}") |
|
finally: |
|
background_refresh_running = False |
|
|
|
def start_automated_refresh_scheduler(): |
|
"""Start the automated 24-hour refresh scheduler""" |
|
logger.info("β° Setting up automated 24-hour refresh scheduler...") |
|
|
|
|
|
schedule.every().day.at("02:00").do(automated_daily_refresh) |
|
|
|
|
|
schedule.every(24).hours.do(automated_daily_refresh) |
|
|
|
logger.info("β
Automated refresh scheduler started - will refresh every 24 hours at 2:00 AM") |
|
|
|
|
|
def run_scheduler(): |
|
while True: |
|
schedule.run_pending() |
|
time.sleep(60) |
|
|
|
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) |
|
scheduler_thread.start() |
|
logger.info("π Background scheduler thread started") |
|
|
|
if __name__ == '__main__': |
|
|
|
port = int(os.environ.get('PORT', 7860)) |
|
|
|
|
|
def background_init(): |
|
initialize_visual_search() |
|
|
|
init_thread = threading.Thread(target=background_init) |
|
init_thread.daemon = True |
|
init_thread.start() |
|
|
|
|
|
def start_background_loading(): |
|
time.sleep(30) |
|
load_additional_properties_background() |
|
|
|
background_thread = threading.Thread(target=start_background_loading) |
|
background_thread.daemon = True |
|
background_thread.start() |
|
|
|
|
|
def start_automated_scheduler(): |
|
time.sleep(60) |
|
start_automated_refresh_scheduler() |
|
|
|
scheduler_startup_thread = threading.Thread(target=start_automated_scheduler) |
|
scheduler_startup_thread.daemon = True |
|
scheduler_startup_thread.start() |
|
|
|
|
|
app.run(host='0.0.0.0', port=port, debug=False, threaded=True) |