|
|
|
|
|
import multiprocessing as mp |
|
import concurrent.futures |
|
import asyncio |
|
import threading |
|
from functools import partial |
|
from typing import Dict, Any, List, Tuple |
|
from .logging_config import logger |
|
|
|
class ParallelProcessor: |
|
"""Handles parallel processing of property verification analyses""" |
|
|
|
def __init__(self, max_workers=None): |
|
self.max_workers = max_workers or min(mp.cpu_count(), 8) |
|
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) |
|
self.process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=min(4, mp.cpu_count())) |
|
|
|
def __del__(self): |
|
self.thread_pool.shutdown(wait=True) |
|
self.process_pool.shutdown(wait=True) |
|
|
|
def process_images_parallel(self, image_files): |
|
"""Process multiple images in parallel""" |
|
try: |
|
max_workers = min(8, mp.cpu_count(), len(image_files)) if image_files else 1 |
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
futures = [] |
|
for img_file in image_files: |
|
future = executor.submit(self._process_single_image, img_file) |
|
futures.append(future) |
|
results = [] |
|
for future in concurrent.futures.as_completed(futures): |
|
try: |
|
result = future.result(timeout=30) |
|
if isinstance(result, dict): |
|
result['parallelization_info'] = {'worker_count': max_workers} |
|
results.append(result) |
|
except Exception as e: |
|
logger.error(f"Error processing image: {str(e)}") |
|
results.append({'error': str(e), 'is_property_related': False, 'parallelization_info': {'worker_count': max_workers}}) |
|
return results |
|
except Exception as e: |
|
logger.error(f"Error in parallel image processing: {str(e)}") |
|
return [] |
|
|
|
def _process_single_image(self, img_file): |
|
"""Process a single image""" |
|
try: |
|
from PIL import Image |
|
import base64 |
|
import io |
|
from .image_analysis import analyze_image |
|
|
|
img = Image.open(img_file) |
|
buffered = io.BytesIO() |
|
img.save(buffered, format="JPEG") |
|
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
|
analysis = analyze_image(img) |
|
return { |
|
'image_data': img_str, |
|
'analysis': analysis |
|
} |
|
except Exception as e: |
|
logger.error(f"Error processing image {img_file.filename}: {str(e)}") |
|
return {'error': str(e), 'is_property_related': False} |
|
|
|
def process_pdfs_parallel(self, pdf_files): |
|
"""Process multiple PDFs in parallel""" |
|
try: |
|
max_workers = min(8, mp.cpu_count(), len(pdf_files)) if pdf_files else 1 |
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
futures = [] |
|
for pdf_file in pdf_files: |
|
future = executor.submit(self._process_single_pdf, pdf_file) |
|
futures.append(future) |
|
results = [] |
|
for future in concurrent.futures.as_completed(futures): |
|
try: |
|
result = future.result(timeout=60) |
|
if isinstance(result, dict): |
|
result['parallelization_info'] = {'worker_count': max_workers} |
|
results.append(result) |
|
except Exception as e: |
|
logger.error(f"Error processing PDF: {str(e)}") |
|
results.append({'error': str(e), 'parallelization_info': {'worker_count': max_workers}}) |
|
return results |
|
except Exception as e: |
|
logger.error(f"Error in parallel PDF processing: {str(e)}") |
|
return [] |
|
|
|
def _process_single_pdf(self, pdf_file): |
|
"""Process a single PDF""" |
|
try: |
|
from .pdf_analysis import extract_text_from_pdf, analyze_pdf_content |
|
|
|
|
|
if hasattr(pdf_file, 'read'): |
|
pdf_text = extract_text_from_pdf(pdf_file) |
|
else: |
|
logger.error(f"Invalid PDF file object: {type(pdf_file)}") |
|
return { |
|
'filename': getattr(pdf_file, 'filename', 'unknown.pdf'), |
|
'text': '', |
|
'analysis': { |
|
'is_property_related': False, |
|
'confidence': 0.0, |
|
'summary': 'Invalid PDF file object', |
|
'verification_score': 0.0, |
|
'model_used': 'static_fallback', |
|
'error': 'Invalid PDF file object' |
|
} |
|
} |
|
|
|
analysis = analyze_pdf_content(pdf_text, {}) |
|
|
|
return { |
|
'filename': pdf_file.filename, |
|
'text': pdf_text, |
|
'analysis': analysis |
|
} |
|
except Exception as e: |
|
logger.error(f"Error processing PDF {getattr(pdf_file, 'filename', 'unknown.pdf')}: {str(e)}") |
|
return { |
|
'filename': getattr(pdf_file, 'filename', 'unknown.pdf'), |
|
'text': '', |
|
'analysis': { |
|
'is_property_related': False, |
|
'confidence': 0.0, |
|
'summary': f'Error processing PDF: {str(e)}', |
|
'verification_score': 0.0, |
|
'model_used': 'static_fallback', |
|
'error': str(e) |
|
} |
|
} |
|
|
|
async def run_analyses_parallel(self, data, consolidated_text, image_analysis, pdf_analysis): |
|
"""Run all analyses in parallel using asyncio and thread pools""" |
|
try: |
|
|
|
property_data = self._prepare_property_data(data) |
|
price_context = self._create_price_context(data) |
|
|
|
|
|
analysis_tasks = [ |
|
('summary', self._run_summary_analysis, data), |
|
('fraud', self._run_fraud_analysis, consolidated_text, data), |
|
('trust', self._run_trust_analysis, consolidated_text, image_analysis, pdf_analysis), |
|
('suggestions', self._run_suggestions_analysis, consolidated_text, data), |
|
('quality', self._run_quality_analysis, data.get('description_translated', '')), |
|
('address', self._run_address_analysis, data), |
|
('cross_validation', self._run_cross_validation_analysis, data), |
|
('location', self._run_location_analysis, data), |
|
('price', self._run_price_analysis, data, price_context, property_data), |
|
('legal', self._run_legal_analysis, data.get('legal_details', '')), |
|
('specs', self._run_specs_analysis, data), |
|
('market', self._run_market_analysis, data) |
|
] |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
tasks = [] |
|
|
|
for task_name, func, *args in analysis_tasks: |
|
task = loop.run_in_executor( |
|
self.thread_pool, |
|
func, |
|
*args |
|
) |
|
tasks.append((task_name, task)) |
|
|
|
|
|
results = {} |
|
for task_name, task in tasks: |
|
try: |
|
result = await asyncio.wait_for(task, timeout=60) |
|
results[task_name] = result |
|
except asyncio.TimeoutError: |
|
logger.error(f"Task {task_name} timed out") |
|
results[task_name] = self._get_error_result(f"Task {task_name} timed out") |
|
except Exception as e: |
|
logger.error(f"Task {task_name} failed: {str(e)}") |
|
results[task_name] = self._get_error_result(f"Task {task_name} failed: {str(e)}") |
|
|
|
return results |
|
|
|
except Exception as e: |
|
logger.error(f"Error in parallel analyses: {str(e)}") |
|
return self._get_all_error_results(str(e)) |
|
|
|
def _prepare_property_data(self, data): |
|
"""Prepare property data for price analysis""" |
|
property_data = {} |
|
try: |
|
if data.get('sq_ft'): |
|
property_data['size'] = float(data['sq_ft']) |
|
if data.get('market_value'): |
|
property_data['price'] = float(data['market_value'].replace('₹', '').replace(',', '')) |
|
if data.get('year_built'): |
|
from datetime import datetime |
|
current_year = datetime.now().year |
|
property_data['property_age'] = current_year - int(data['year_built']) |
|
except Exception as e: |
|
logger.warning(f"Error preparing property data: {str(e)}") |
|
return property_data |
|
|
|
def _create_price_context(self, data): |
|
"""Create context text for price analysis""" |
|
return f""" |
|
Property: {data.get('property_name', '')} |
|
Type: {data.get('property_type', '')} |
|
Location: {data.get('address', '')}, {data.get('city', '')}, {data.get('state', '')} |
|
Size: {data.get('sq_ft', '')} sq ft |
|
Market Value: ₹{data.get('market_value', '')} |
|
Description: {data.get('description', '')} |
|
Amenities: {data.get('amenities', '')} |
|
""" |
|
|
|
def _run_summary_analysis(self, data): |
|
"""Run property summary analysis""" |
|
try: |
|
from .property_summary import generate_property_summary |
|
return generate_property_summary(data) |
|
except Exception as e: |
|
logger.error(f"Error in summary analysis: {str(e)}") |
|
return "Property summary unavailable." |
|
|
|
def _run_fraud_analysis(self, consolidated_text, data): |
|
"""Run fraud classification analysis""" |
|
try: |
|
from .fraud_classification import classify_fraud |
|
return classify_fraud(data, consolidated_text) |
|
except Exception as e: |
|
logger.error(f"Error in fraud analysis: {str(e)}") |
|
return self._get_error_result("Fraud analysis failed") |
|
|
|
def _run_trust_analysis(self, consolidated_text, image_analysis, pdf_analysis): |
|
"""Run trust score analysis""" |
|
try: |
|
from .trust_score import generate_trust_score |
|
return generate_trust_score(consolidated_text, image_analysis, pdf_analysis) |
|
except Exception as e: |
|
logger.error(f"Error in trust analysis: {str(e)}") |
|
return (0.0, "Trust analysis failed") |
|
|
|
def _run_suggestions_analysis(self, consolidated_text, data): |
|
"""Run suggestions analysis""" |
|
try: |
|
from .suggestions import generate_suggestions |
|
return generate_suggestions(consolidated_text, data) |
|
except Exception as e: |
|
logger.error(f"Error in suggestions analysis: {str(e)}") |
|
return self._get_error_result("Suggestions analysis failed") |
|
|
|
def _run_quality_analysis(self, description): |
|
"""Run text quality analysis""" |
|
try: |
|
from .text_quality import assess_text_quality |
|
return assess_text_quality(description) |
|
except Exception as e: |
|
logger.error(f"Error in quality analysis: {str(e)}") |
|
return self._get_error_result("Quality analysis failed") |
|
|
|
def _run_address_analysis(self, data): |
|
"""Run address verification analysis""" |
|
try: |
|
from .address_verification import verify_address |
|
return verify_address(data) |
|
except Exception as e: |
|
logger.error(f"Error in address analysis: {str(e)}") |
|
return self._get_error_result("Address analysis failed") |
|
|
|
def _run_cross_validation_analysis(self, data): |
|
"""Run cross validation analysis""" |
|
try: |
|
from .cross_validation import perform_cross_validation |
|
return perform_cross_validation(data) |
|
except Exception as e: |
|
logger.error(f"Error in cross validation analysis: {str(e)}") |
|
return self._get_error_result("Cross validation analysis failed") |
|
|
|
def _run_location_analysis(self, data): |
|
"""Run location analysis""" |
|
try: |
|
from .location_analysis import analyze_location |
|
return analyze_location(data) |
|
except Exception as e: |
|
logger.error(f"Error in location analysis: {str(e)}") |
|
return self._get_error_result("Location analysis failed") |
|
|
|
def _run_price_analysis(self, data, price_context, property_data): |
|
"""Run price analysis""" |
|
try: |
|
from .price_analysis import analyze_price |
|
|
|
return analyze_price(data, price_context, data.get('latitude'), data.get('longitude'), property_data) |
|
except Exception as e: |
|
logger.error(f"Error in price analysis: {str(e)}") |
|
return self._get_error_result("Price analysis failed") |
|
|
|
def _run_legal_analysis(self, legal_details): |
|
"""Run legal analysis""" |
|
try: |
|
from .legal_analysis import analyze_legal_details |
|
return analyze_legal_details(legal_details) |
|
except Exception as e: |
|
logger.error(f"Error in legal analysis: {str(e)}") |
|
return self._get_error_result("Legal analysis failed") |
|
|
|
def _run_specs_analysis(self, data): |
|
"""Run property specs analysis""" |
|
try: |
|
from .property_specs import verify_property_specs |
|
return verify_property_specs(data) |
|
except Exception as e: |
|
logger.error(f"Error in specs analysis: {str(e)}") |
|
return self._get_error_result("Specs analysis failed") |
|
|
|
def _run_market_analysis(self, data): |
|
"""Run market value analysis""" |
|
try: |
|
from .market_value import analyze_market_value |
|
return analyze_market_value(data) |
|
except Exception as e: |
|
logger.error(f"Error in market analysis: {str(e)}") |
|
return self._get_error_result("Market analysis failed") |
|
|
|
def _get_error_result(self, error_message): |
|
"""Get a standardized error result""" |
|
return { |
|
'error': error_message, |
|
'status': 'error', |
|
'confidence': 0.0 |
|
} |
|
|
|
def _get_all_error_results(self, error_message): |
|
"""Get error results for all analyses""" |
|
return { |
|
'summary': "Analysis failed", |
|
'fraud': self._get_error_result(error_message), |
|
'trust': (0.0, error_message), |
|
'suggestions': self._get_error_result(error_message), |
|
'quality': self._get_error_result(error_message), |
|
'address': self._get_error_result(error_message), |
|
'cross_validation': self._get_error_result(error_message), |
|
'location': self._get_error_result(error_message), |
|
'price': self._get_error_result(error_message), |
|
'legal': self._get_error_result(error_message), |
|
'specs': self._get_error_result(error_message), |
|
'market': self._get_error_result(error_message) |
|
} |
|
|
|
async def _process_pdf_async(self, pdf_file, property_data): |
|
"""Process a single PDF file asynchronously""" |
|
try: |
|
from .pdf_analysis import extract_text_from_pdf, analyze_pdf_content |
|
|
|
|
|
if hasattr(pdf_file, 'read'): |
|
|
|
text = extract_text_from_pdf(pdf_file) |
|
if not text: |
|
return { |
|
'filename': pdf_file.filename, |
|
'text': '', |
|
'analysis': { |
|
'is_property_related': False, |
|
'confidence': 0.0, |
|
'summary': 'No text extracted from PDF', |
|
'verification_score': 0.0, |
|
'model_used': 'static_fallback' |
|
} |
|
} |
|
|
|
|
|
analysis = analyze_pdf_content(text, property_data) |
|
|
|
return { |
|
'filename': pdf_file.filename, |
|
'text': text, |
|
'analysis': analysis |
|
} |
|
else: |
|
logger.error(f"Invalid PDF file object in async processing: {type(pdf_file)}") |
|
return { |
|
'filename': getattr(pdf_file, 'filename', 'unknown.pdf'), |
|
'text': '', |
|
'analysis': { |
|
'is_property_related': False, |
|
'confidence': 0.0, |
|
'summary': 'Invalid PDF file object', |
|
'verification_score': 0.0, |
|
'model_used': 'static_fallback', |
|
'error': 'Invalid PDF file object' |
|
} |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing PDF {getattr(pdf_file, 'filename', 'unknown.pdf')}: {str(e)}") |
|
return { |
|
'filename': getattr(pdf_file, 'filename', 'unknown.pdf'), |
|
'text': '', |
|
'analysis': { |
|
'is_property_related': False, |
|
'confidence': 0.0, |
|
'summary': f'Error processing PDF: {str(e)}', |
|
'verification_score': 0.0, |
|
'model_used': 'static_fallback', |
|
'error': str(e) |
|
} |
|
} |
|
|
|
|
|
parallel_processor = ParallelProcessor() |