propertyverification / models /parallel_processor.py
sksameermujahid's picture
Upload 22 files
01dfef8 verified
# models/parallel_processor.py
import multiprocessing as mp
import concurrent.futures
import asyncio
import threading
from functools import partial
from typing import Dict, Any, List, Tuple
from .logging_config import logger
class ParallelProcessor:
"""Handles parallel processing of property verification analyses"""
def __init__(self, max_workers=None):
self.max_workers = max_workers or min(mp.cpu_count(), 8)
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
self.process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=min(4, mp.cpu_count()))
def __del__(self):
self.thread_pool.shutdown(wait=True)
self.process_pool.shutdown(wait=True)
def process_images_parallel(self, image_files):
"""Process multiple images in parallel"""
try:
max_workers = min(8, mp.cpu_count(), len(image_files)) if image_files else 1
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for img_file in image_files:
future = executor.submit(self._process_single_image, img_file)
futures.append(future)
results = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result(timeout=30)
if isinstance(result, dict):
result['parallelization_info'] = {'worker_count': max_workers}
results.append(result)
except Exception as e:
logger.error(f"Error processing image: {str(e)}")
results.append({'error': str(e), 'is_property_related': False, 'parallelization_info': {'worker_count': max_workers}})
return results
except Exception as e:
logger.error(f"Error in parallel image processing: {str(e)}")
return []
def _process_single_image(self, img_file):
"""Process a single image"""
try:
from PIL import Image
import base64
import io
from .image_analysis import analyze_image
img = Image.open(img_file)
buffered = io.BytesIO()
img.save(buffered, format="JPEG")
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
analysis = analyze_image(img)
return {
'image_data': img_str,
'analysis': analysis
}
except Exception as e:
logger.error(f"Error processing image {img_file.filename}: {str(e)}")
return {'error': str(e), 'is_property_related': False}
def process_pdfs_parallel(self, pdf_files):
"""Process multiple PDFs in parallel"""
try:
max_workers = min(8, mp.cpu_count(), len(pdf_files)) if pdf_files else 1
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for pdf_file in pdf_files:
future = executor.submit(self._process_single_pdf, pdf_file)
futures.append(future)
results = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result(timeout=60)
if isinstance(result, dict):
result['parallelization_info'] = {'worker_count': max_workers}
results.append(result)
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
results.append({'error': str(e), 'parallelization_info': {'worker_count': max_workers}})
return results
except Exception as e:
logger.error(f"Error in parallel PDF processing: {str(e)}")
return []
def _process_single_pdf(self, pdf_file):
"""Process a single PDF"""
try:
from .pdf_analysis import extract_text_from_pdf, analyze_pdf_content
# Ensure pdf_file is a file object, not a dict
if hasattr(pdf_file, 'read'):
pdf_text = extract_text_from_pdf(pdf_file)
else:
logger.error(f"Invalid PDF file object: {type(pdf_file)}")
return {
'filename': getattr(pdf_file, 'filename', 'unknown.pdf'),
'text': '',
'analysis': {
'is_property_related': False,
'confidence': 0.0,
'summary': 'Invalid PDF file object',
'verification_score': 0.0,
'model_used': 'static_fallback',
'error': 'Invalid PDF file object'
}
}
analysis = analyze_pdf_content(pdf_text, {})
return {
'filename': pdf_file.filename,
'text': pdf_text,
'analysis': analysis
}
except Exception as e:
logger.error(f"Error processing PDF {getattr(pdf_file, 'filename', 'unknown.pdf')}: {str(e)}")
return {
'filename': getattr(pdf_file, 'filename', 'unknown.pdf'),
'text': '',
'analysis': {
'is_property_related': False,
'confidence': 0.0,
'summary': f'Error processing PDF: {str(e)}',
'verification_score': 0.0,
'model_used': 'static_fallback',
'error': str(e)
}
}
async def run_analyses_parallel(self, data, consolidated_text, image_analysis, pdf_analysis):
"""Run all analyses in parallel using asyncio and thread pools"""
try:
# Prepare property data for price analysis
property_data = self._prepare_property_data(data)
price_context = self._create_price_context(data)
# Define analysis tasks with their respective functions
analysis_tasks = [
('summary', self._run_summary_analysis, data),
('fraud', self._run_fraud_analysis, consolidated_text, data),
('trust', self._run_trust_analysis, consolidated_text, image_analysis, pdf_analysis),
('suggestions', self._run_suggestions_analysis, consolidated_text, data),
('quality', self._run_quality_analysis, data.get('description_translated', '')),
('address', self._run_address_analysis, data),
('cross_validation', self._run_cross_validation_analysis, data),
('location', self._run_location_analysis, data),
('price', self._run_price_analysis, data, price_context, property_data),
('legal', self._run_legal_analysis, data.get('legal_details', '')),
('specs', self._run_specs_analysis, data),
('market', self._run_market_analysis, data)
]
# Run tasks in parallel with timeout
loop = asyncio.get_event_loop()
tasks = []
for task_name, func, *args in analysis_tasks:
task = loop.run_in_executor(
self.thread_pool,
func,
*args
)
tasks.append((task_name, task))
# Wait for all tasks to complete with timeout
results = {}
for task_name, task in tasks:
try:
result = await asyncio.wait_for(task, timeout=60) # Reduced from 120 to 60 seconds
results[task_name] = result
except asyncio.TimeoutError:
logger.error(f"Task {task_name} timed out")
results[task_name] = self._get_error_result(f"Task {task_name} timed out")
except Exception as e:
logger.error(f"Task {task_name} failed: {str(e)}")
results[task_name] = self._get_error_result(f"Task {task_name} failed: {str(e)}")
return results
except Exception as e:
logger.error(f"Error in parallel analyses: {str(e)}")
return self._get_all_error_results(str(e))
def _prepare_property_data(self, data):
"""Prepare property data for price analysis"""
property_data = {}
try:
if data.get('sq_ft'):
property_data['size'] = float(data['sq_ft'])
if data.get('market_value'):
property_data['price'] = float(data['market_value'].replace('₹', '').replace(',', ''))
if data.get('year_built'):
from datetime import datetime
current_year = datetime.now().year
property_data['property_age'] = current_year - int(data['year_built'])
except Exception as e:
logger.warning(f"Error preparing property data: {str(e)}")
return property_data
def _create_price_context(self, data):
"""Create context text for price analysis"""
return f"""
Property: {data.get('property_name', '')}
Type: {data.get('property_type', '')}
Location: {data.get('address', '')}, {data.get('city', '')}, {data.get('state', '')}
Size: {data.get('sq_ft', '')} sq ft
Market Value: ₹{data.get('market_value', '')}
Description: {data.get('description', '')}
Amenities: {data.get('amenities', '')}
"""
def _run_summary_analysis(self, data):
"""Run property summary analysis"""
try:
from .property_summary import generate_property_summary
return generate_property_summary(data)
except Exception as e:
logger.error(f"Error in summary analysis: {str(e)}")
return "Property summary unavailable."
def _run_fraud_analysis(self, consolidated_text, data):
"""Run fraud classification analysis"""
try:
from .fraud_classification import classify_fraud
return classify_fraud(data, consolidated_text)
except Exception as e:
logger.error(f"Error in fraud analysis: {str(e)}")
return self._get_error_result("Fraud analysis failed")
def _run_trust_analysis(self, consolidated_text, image_analysis, pdf_analysis):
"""Run trust score analysis"""
try:
from .trust_score import generate_trust_score
return generate_trust_score(consolidated_text, image_analysis, pdf_analysis)
except Exception as e:
logger.error(f"Error in trust analysis: {str(e)}")
return (0.0, "Trust analysis failed")
def _run_suggestions_analysis(self, consolidated_text, data):
"""Run suggestions analysis"""
try:
from .suggestions import generate_suggestions
return generate_suggestions(consolidated_text, data)
except Exception as e:
logger.error(f"Error in suggestions analysis: {str(e)}")
return self._get_error_result("Suggestions analysis failed")
def _run_quality_analysis(self, description):
"""Run text quality analysis"""
try:
from .text_quality import assess_text_quality
return assess_text_quality(description)
except Exception as e:
logger.error(f"Error in quality analysis: {str(e)}")
return self._get_error_result("Quality analysis failed")
def _run_address_analysis(self, data):
"""Run address verification analysis"""
try:
from .address_verification import verify_address
return verify_address(data)
except Exception as e:
logger.error(f"Error in address analysis: {str(e)}")
return self._get_error_result("Address analysis failed")
def _run_cross_validation_analysis(self, data):
"""Run cross validation analysis"""
try:
from .cross_validation import perform_cross_validation
return perform_cross_validation(data)
except Exception as e:
logger.error(f"Error in cross validation analysis: {str(e)}")
return self._get_error_result("Cross validation analysis failed")
def _run_location_analysis(self, data):
"""Run location analysis"""
try:
from .location_analysis import analyze_location
return analyze_location(data)
except Exception as e:
logger.error(f"Error in location analysis: {str(e)}")
return self._get_error_result("Location analysis failed")
def _run_price_analysis(self, data, price_context, property_data):
"""Run price analysis"""
try:
from .price_analysis import analyze_price
# Pass rental information to price analysis
return analyze_price(data, price_context, data.get('latitude'), data.get('longitude'), property_data)
except Exception as e:
logger.error(f"Error in price analysis: {str(e)}")
return self._get_error_result("Price analysis failed")
def _run_legal_analysis(self, legal_details):
"""Run legal analysis"""
try:
from .legal_analysis import analyze_legal_details
return analyze_legal_details(legal_details)
except Exception as e:
logger.error(f"Error in legal analysis: {str(e)}")
return self._get_error_result("Legal analysis failed")
def _run_specs_analysis(self, data):
"""Run property specs analysis"""
try:
from .property_specs import verify_property_specs
return verify_property_specs(data)
except Exception as e:
logger.error(f"Error in specs analysis: {str(e)}")
return self._get_error_result("Specs analysis failed")
def _run_market_analysis(self, data):
"""Run market value analysis"""
try:
from .market_value import analyze_market_value
return analyze_market_value(data)
except Exception as e:
logger.error(f"Error in market analysis: {str(e)}")
return self._get_error_result("Market analysis failed")
def _get_error_result(self, error_message):
"""Get a standardized error result"""
return {
'error': error_message,
'status': 'error',
'confidence': 0.0
}
def _get_all_error_results(self, error_message):
"""Get error results for all analyses"""
return {
'summary': "Analysis failed",
'fraud': self._get_error_result(error_message),
'trust': (0.0, error_message),
'suggestions': self._get_error_result(error_message),
'quality': self._get_error_result(error_message),
'address': self._get_error_result(error_message),
'cross_validation': self._get_error_result(error_message),
'location': self._get_error_result(error_message),
'price': self._get_error_result(error_message),
'legal': self._get_error_result(error_message),
'specs': self._get_error_result(error_message),
'market': self._get_error_result(error_message)
}
async def _process_pdf_async(self, pdf_file, property_data):
"""Process a single PDF file asynchronously"""
try:
from .pdf_analysis import extract_text_from_pdf, analyze_pdf_content
# Ensure pdf_file is a file object, not a dict
if hasattr(pdf_file, 'read'):
# Extract text from PDF
text = extract_text_from_pdf(pdf_file)
if not text:
return {
'filename': pdf_file.filename,
'text': '',
'analysis': {
'is_property_related': False,
'confidence': 0.0,
'summary': 'No text extracted from PDF',
'verification_score': 0.0,
'model_used': 'static_fallback'
}
}
# Analyze the content
analysis = analyze_pdf_content(text, property_data)
return {
'filename': pdf_file.filename,
'text': text,
'analysis': analysis
}
else:
logger.error(f"Invalid PDF file object in async processing: {type(pdf_file)}")
return {
'filename': getattr(pdf_file, 'filename', 'unknown.pdf'),
'text': '',
'analysis': {
'is_property_related': False,
'confidence': 0.0,
'summary': 'Invalid PDF file object',
'verification_score': 0.0,
'model_used': 'static_fallback',
'error': 'Invalid PDF file object'
}
}
except Exception as e:
logger.error(f"Error processing PDF {getattr(pdf_file, 'filename', 'unknown.pdf')}: {str(e)}")
return {
'filename': getattr(pdf_file, 'filename', 'unknown.pdf'),
'text': '',
'analysis': {
'is_property_related': False,
'confidence': 0.0,
'summary': f'Error processing PDF: {str(e)}',
'verification_score': 0.0,
'model_used': 'static_fallback',
'error': str(e)
}
}
# Global instance for easy import
parallel_processor = ParallelProcessor()