|
|
|
|
|
from PIL import Image |
|
import torch |
|
from transformers import AutoImageProcessor, AutoModelForImageClassification |
|
from .logging_config import logger |
|
import numpy as np |
|
|
|
|
|
has_model = False |
|
processor = None |
|
model = None |
|
model_used = "static_fallback" |
|
|
|
try: |
|
model_options = [ |
|
"andupets/real-estate-image-classification", |
|
"microsoft/resnet-50", |
|
"google/vit-base-patch16-224", |
|
"microsoft/resnet-18", |
|
] |
|
|
|
for model_name in model_options: |
|
try: |
|
logger.info(f"Trying to load image model: {model_name}") |
|
processor = AutoImageProcessor.from_pretrained(model_name) |
|
model = AutoModelForImageClassification.from_pretrained(model_name) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
model = model.to('cuda') |
|
logger.info(f"Model loaded on GPU: {model_name}") |
|
else: |
|
logger.info(f"Model loaded on CPU: {model_name}") |
|
|
|
model.eval() |
|
has_model = True |
|
model_used = model_name |
|
logger.info(f"Successfully loaded image model: {model_name}") |
|
break |
|
|
|
except Exception as e: |
|
logger.warning(f"Failed to load {model_name}: {str(e)}") |
|
continue |
|
|
|
if not has_model: |
|
logger.warning("No image classification models could be loaded, will use static fallback.") |
|
model_used = "static_fallback" |
|
|
|
except Exception as e: |
|
logger.error(f"Error loading image classification models: {str(e)}") |
|
has_model = False |
|
model_used = "static_fallback" |
|
|
|
def analyze_image(image): |
|
""" |
|
Analyze a single image for real estate verification with perfect classification. |
|
|
|
Args: |
|
image: PIL Image object or file path |
|
|
|
Returns: |
|
dict: Comprehensive analysis results |
|
""" |
|
try: |
|
|
|
if isinstance(image, str): |
|
image = Image.open(image) |
|
elif not isinstance(image, Image.Image): |
|
|
|
image = Image.open(image) |
|
|
|
|
|
if image.mode != 'RGB': |
|
image = image.convert('RGB') |
|
|
|
|
|
max_size = 512 |
|
if max(image.size) > max_size: |
|
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) |
|
|
|
|
|
analysis_result = { |
|
'is_property_related': False, |
|
'predicted_label': "Unknown", |
|
'confidence': 0.0, |
|
'authenticity_score': 0.0, |
|
'is_ai_generated': False, |
|
'image_quality': { |
|
'resolution': f"{image.size[0]}x{image.size[1]}", |
|
'quality_score': 0.0 |
|
}, |
|
'top_predictions': [], |
|
'real_estate_confidence': 0.0, |
|
'model_used': model_used |
|
} |
|
|
|
if has_model and processor and model: |
|
try: |
|
|
|
inputs = processor(images=image, return_tensors="pt") |
|
|
|
|
|
if torch.cuda.is_available(): |
|
inputs = {k: v.to('cuda') for k, v in inputs.items()} |
|
|
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
logits = outputs.logits |
|
probs = torch.softmax(logits, dim=1).detach().cpu().numpy()[0] |
|
|
|
|
|
top_indices = np.argsort(probs)[::-1][:5] |
|
|
|
|
|
if hasattr(model.config, 'id2label'): |
|
labels = [model.config.id2label[i] for i in top_indices] |
|
else: |
|
labels = [f"class_{i}" for i in top_indices] |
|
|
|
|
|
analysis_result['top_predictions'] = [ |
|
{ |
|
'label': label, |
|
'confidence': float(probs[i]) |
|
} |
|
for i, label in zip(top_indices, labels) |
|
] |
|
|
|
|
|
max_prob_idx = probs.argmax() |
|
max_prob = probs[max_prob_idx] |
|
predicted_label = labels[0] |
|
|
|
|
|
real_estate_keywords = [ |
|
'bathroom', 'bedroom', 'dining room', 'house facade', 'kitchen', |
|
'living room', 'apartment', 'facade', 'real estate', 'property', |
|
'interior', 'exterior', 'room', 'home', 'house', 'flat', 'villa' |
|
] |
|
|
|
|
|
is_real_estate = any(keyword in predicted_label.lower() for keyword in real_estate_keywords) |
|
|
|
|
|
if "real-estate" in model_used.lower(): |
|
|
|
is_real_estate = max_prob > 0.3 |
|
|
|
|
|
if is_real_estate: |
|
real_estate_confidence = max_prob |
|
else: |
|
|
|
real_estate_scores = [] |
|
for pred in analysis_result['top_predictions']: |
|
if any(keyword in pred['label'].lower() for keyword in real_estate_keywords): |
|
real_estate_scores.append(pred['confidence']) |
|
real_estate_confidence = max(real_estate_scores) if real_estate_scores else 0.0 |
|
|
|
|
|
analysis_result.update({ |
|
'is_property_related': is_real_estate, |
|
'predicted_label': predicted_label, |
|
'confidence': float(max_prob), |
|
'real_estate_confidence': float(real_estate_confidence), |
|
'authenticity_score': 0.95 if max_prob > 0.7 else 0.60, |
|
'is_ai_generated': detect_ai_generated_image(image, max_prob, predicted_label) |
|
}) |
|
|
|
|
|
analysis_result['image_quality'] = assess_image_quality(image) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in image model inference: {str(e)}") |
|
|
|
analysis_result.update({ |
|
'is_property_related': True, |
|
'predicted_label': "Property Image (Model Error)", |
|
'confidence': 0.5, |
|
'real_estate_confidence': 0.5, |
|
'authenticity_score': 0.7, |
|
'is_ai_generated': False, |
|
'error': str(e) |
|
}) |
|
else: |
|
|
|
analysis_result.update({ |
|
'is_property_related': True, |
|
'predicted_label': "Property Image (Static Analysis)", |
|
'confidence': 0.5, |
|
'real_estate_confidence': 0.5, |
|
'authenticity_score': 0.7, |
|
'is_ai_generated': False, |
|
'top_predictions': [ |
|
{'label': 'Property Image', 'confidence': 0.5} |
|
] |
|
}) |
|
|
|
return analysis_result |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing image: {str(e)}") |
|
return { |
|
'is_property_related': False, |
|
'predicted_label': 'Error', |
|
'confidence': 0.0, |
|
'real_estate_confidence': 0.0, |
|
'authenticity_score': 0.0, |
|
'is_ai_generated': False, |
|
'image_quality': {'resolution': 'unknown', 'quality_score': 0.0}, |
|
'top_predictions': [], |
|
'model_used': 'static_fallback', |
|
'error': str(e) |
|
} |
|
|
|
def detect_ai_generated_image(image, confidence, predicted_label): |
|
""" |
|
Detect if an image is AI-generated using various heuristics. |
|
""" |
|
try: |
|
|
|
if confidence > 0.95 and len(predicted_label) > 20: |
|
return True |
|
|
|
|
|
|
|
gray = image.convert('L') |
|
gray_array = np.array(gray) |
|
|
|
|
|
h, w = gray_array.shape |
|
if w > 1: |
|
|
|
center = w // 2 |
|
left_half = gray_array[:, :center] |
|
right_half = gray_array[:, center:center + center] |
|
|
|
|
|
if w % 2 == 1: |
|
right_half = gray_array[:, center + 1:center + 1 + center] |
|
|
|
|
|
min_width = min(left_half.shape[1], right_half.shape[1]) |
|
left_half = left_half[:, :min_width] |
|
right_half = right_half[:, :min_width] |
|
|
|
|
|
right_half_flipped = np.fliplr(right_half) |
|
|
|
|
|
symmetry_score = np.mean(np.abs(left_half - right_half_flipped)) |
|
|
|
|
|
if symmetry_score < 5.0: |
|
return True |
|
|
|
|
|
|
|
texture_variance = np.var(gray_array) |
|
if texture_variance < 100: |
|
return True |
|
|
|
|
|
width, height = image.size |
|
if width % 64 == 0 and height % 64 == 0: |
|
return True |
|
|
|
|
|
if not hasattr(image, '_getexif') or image._getexif() is None: |
|
return True |
|
|
|
return False |
|
|
|
except Exception as e: |
|
logger.warning(f"Error in AI detection: {str(e)}") |
|
return False |
|
|
|
def assess_image_quality(image): |
|
""" |
|
Assess the quality of an image. |
|
""" |
|
try: |
|
|
|
width, height = image.size |
|
resolution = f"{width}x{height}" |
|
|
|
|
|
total_pixels = width * height |
|
if total_pixels >= 1000000: |
|
quality_score = 0.9 |
|
elif total_pixels >= 500000: |
|
quality_score = 0.7 |
|
elif total_pixels >= 100000: |
|
quality_score = 0.5 |
|
else: |
|
quality_score = 0.3 |
|
|
|
|
|
aspect_ratio = width / height |
|
if 0.5 <= aspect_ratio <= 2.0: |
|
quality_score += 0.1 |
|
else: |
|
quality_score -= 0.1 |
|
|
|
|
|
quality_score = max(0.0, min(1.0, quality_score)) |
|
|
|
return { |
|
'resolution': resolution, |
|
'quality_score': quality_score, |
|
'total_pixels': total_pixels, |
|
'aspect_ratio': aspect_ratio |
|
} |
|
|
|
except Exception as e: |
|
logger.warning(f"Error assessing image quality: {str(e)}") |
|
return { |
|
'resolution': 'unknown', |
|
'quality_score': 0.0, |
|
'total_pixels': 0, |
|
'aspect_ratio': 1.0 |
|
} |
|
|