|
|
|
|
|
import fitz |
|
import re |
|
from .model_loader import load_model |
|
from .logging_config import logger |
|
|
|
def extract_text_from_pdf(pdf_file): |
|
""" |
|
Extract text from PDF file with better error handling. |
|
""" |
|
try: |
|
|
|
doc = fitz.open(stream=pdf_file.read(), filetype="pdf") |
|
text = "" |
|
|
|
|
|
for page_num in range(len(doc)): |
|
page = doc.load_page(page_num) |
|
text += page.get_text() |
|
|
|
doc.close() |
|
return text.strip() |
|
|
|
except Exception as e: |
|
logger.error(f"Error extracting text from PDF: {str(e)}") |
|
return "" |
|
|
|
def analyze_pdf_content(document_text, property_data): |
|
""" |
|
Analyze PDF content for real estate verification with perfect classification and summarization. |
|
|
|
Args: |
|
document_text: Extracted text from PDF |
|
property_data: Property information for cross-validation |
|
|
|
Returns: |
|
dict: Comprehensive analysis results |
|
""" |
|
try: |
|
if not document_text or len(document_text.strip()) < 10: |
|
return { |
|
'is_property_related': False, |
|
'confidence': 0.0, |
|
'summary': 'Document too short or empty', |
|
'key_info': {}, |
|
'verification_score': 0.0, |
|
'document_type': 'Unknown', |
|
'document_confidence': 0.0, |
|
'authenticity_assessment': 'Unknown', |
|
'authenticity_confidence': 0.0, |
|
'contains_signatures': False, |
|
'contains_dates': False, |
|
'real_estate_indicators': [], |
|
'legal_terms_found': [], |
|
'model_used': 'static_fallback' |
|
} |
|
|
|
|
|
real_estate_keywords = { |
|
'property_terms': [ |
|
'property', 'house', 'apartment', 'flat', 'villa', 'land', 'real estate', |
|
'residential', 'commercial', 'industrial', 'plot', 'acre', 'square feet', |
|
'sq ft', 'sqft', 'bedroom', 'bathroom', 'kitchen', 'living room', |
|
'dining room', 'garage', 'parking', 'garden', 'balcony', 'terrace' |
|
], |
|
'legal_terms': [ |
|
'title', 'deed', 'ownership', 'mortgage', 'loan', 'lease', 'rent', |
|
'agreement', 'contract', 'sale', 'purchase', 'transfer', 'registration', |
|
'encumbrance', 'lien', 'easement', 'zoning', 'permit', 'license', |
|
'tax', 'assessment', 'valuation', 'appraisal', 'survey', 'boundary' |
|
], |
|
'financial_terms': [ |
|
'price', 'value', 'cost', 'amount', 'payment', 'installment', |
|
'down payment', 'interest', 'rate', 'principal', 'balance', |
|
'insurance', 'premium', 'deposit', 'advance', 'rental', 'security' |
|
], |
|
'location_terms': [ |
|
'address', 'location', 'street', 'road', 'avenue', 'lane', |
|
'city', 'state', 'country', 'postal', 'zip', 'pincode', |
|
'neighborhood', 'area', 'district', 'zone', 'sector', 'block' |
|
] |
|
} |
|
|
|
text_lower = document_text.lower() |
|
|
|
|
|
keyword_counts = {} |
|
found_keywords = {} |
|
|
|
for category, keywords in real_estate_keywords.items(): |
|
matches = [] |
|
for keyword in keywords: |
|
if keyword in text_lower: |
|
matches.append(keyword) |
|
keyword_counts[category] = len(matches) |
|
found_keywords[category] = matches |
|
|
|
|
|
total_keywords = sum(len(keywords) for keywords in real_estate_keywords.values()) |
|
total_matches = sum(keyword_counts.values()) |
|
confidence = min(1.0, total_matches / (total_keywords * 0.3)) |
|
|
|
|
|
document_type, document_confidence = classify_document_type(text_lower, found_keywords) |
|
|
|
|
|
summary = generate_document_summary(document_text, document_type) |
|
|
|
|
|
key_info = extract_document_key_info(document_text) |
|
|
|
|
|
contains_signatures = detect_signatures(text_lower) |
|
contains_dates = detect_dates(document_text) |
|
|
|
|
|
authenticity_assessment, authenticity_confidence = assess_document_authenticity( |
|
document_text, contains_signatures, contains_dates, key_info |
|
) |
|
|
|
|
|
verification_score = calculate_verification_score( |
|
confidence, document_confidence, authenticity_confidence, |
|
contains_signatures, contains_dates, key_info |
|
) |
|
|
|
|
|
is_property_related = confidence > 0.2 or document_type != 'Unknown' |
|
|
|
|
|
legal_terms_found = found_keywords.get('legal_terms', []) |
|
|
|
|
|
real_estate_indicators = [] |
|
for category, matches in found_keywords.items(): |
|
if matches: |
|
real_estate_indicators.extend(matches[:3]) |
|
|
|
return { |
|
'is_property_related': is_property_related, |
|
'confidence': confidence, |
|
'summary': summary, |
|
'key_info': key_info, |
|
'verification_score': verification_score, |
|
'document_type': document_type, |
|
'document_confidence': document_confidence, |
|
'authenticity_assessment': authenticity_assessment, |
|
'authenticity_confidence': authenticity_confidence, |
|
'contains_signatures': contains_signatures, |
|
'contains_dates': contains_dates, |
|
'real_estate_indicators': real_estate_indicators, |
|
'legal_terms_found': legal_terms_found, |
|
'keyword_analysis': keyword_counts, |
|
'model_used': 'static_fallback' |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error in PDF content analysis: {str(e)}") |
|
return { |
|
'is_property_related': False, |
|
'confidence': 0.0, |
|
'summary': f'Analysis error: {str(e)}', |
|
'key_info': {}, |
|
'verification_score': 0.0, |
|
'document_type': 'Unknown', |
|
'document_confidence': 0.0, |
|
'authenticity_assessment': 'Unknown', |
|
'authenticity_confidence': 0.0, |
|
'contains_signatures': False, |
|
'contains_dates': False, |
|
'real_estate_indicators': [], |
|
'legal_terms_found': [], |
|
'model_used': 'static_fallback', |
|
'error': str(e) |
|
} |
|
|
|
def classify_document_type(text_lower, found_keywords): |
|
""" |
|
Classify document type with high accuracy. |
|
""" |
|
|
|
document_patterns = { |
|
'Property Title Deed': { |
|
'keywords': ['title', 'deed', 'ownership', 'property', 'owner'], |
|
'confidence': 0.9 |
|
}, |
|
'Mortgage Document': { |
|
'keywords': ['mortgage', 'loan', 'bank', 'lender', 'borrower', 'principal', 'interest'], |
|
'confidence': 0.85 |
|
}, |
|
'Lease Agreement': { |
|
'keywords': ['lease', 'rent', 'tenant', 'landlord', 'rental', 'agreement'], |
|
'confidence': 0.8 |
|
}, |
|
'Sale Contract': { |
|
'keywords': ['sale', 'purchase', 'buyer', 'seller', 'contract', 'agreement'], |
|
'confidence': 0.8 |
|
}, |
|
'Tax Assessment': { |
|
'keywords': ['tax', 'assessment', 'valuation', 'appraisal', 'property tax'], |
|
'confidence': 0.75 |
|
}, |
|
'Building Permit': { |
|
'keywords': ['permit', 'building', 'construction', 'approval', 'zoning'], |
|
'confidence': 0.7 |
|
}, |
|
'Property Survey': { |
|
'keywords': ['survey', 'boundary', 'measurement', 'plot', 'dimension'], |
|
'confidence': 0.7 |
|
}, |
|
'Insurance Document': { |
|
'keywords': ['insurance', 'policy', 'premium', 'coverage', 'claim'], |
|
'confidence': 0.65 |
|
} |
|
} |
|
|
|
best_match = 'Unknown' |
|
best_confidence = 0.0 |
|
|
|
for doc_type, pattern in document_patterns.items(): |
|
matches = sum(1 for keyword in pattern['keywords'] if keyword in text_lower) |
|
if matches > 0: |
|
|
|
match_ratio = matches / len(pattern['keywords']) |
|
confidence = pattern['confidence'] * match_ratio |
|
|
|
if confidence > best_confidence: |
|
best_match = doc_type |
|
best_confidence = confidence |
|
|
|
return best_match, best_confidence |
|
|
|
def generate_document_summary(document_text, document_type): |
|
""" |
|
Generate comprehensive document summary. |
|
""" |
|
try: |
|
|
|
try: |
|
summarizer = load_model("summarization") |
|
if hasattr(summarizer, 'fallback_used') and not summarizer.fallback_used: |
|
|
|
summary_result = summarizer(document_text[:1000], max_length=150, min_length=50) |
|
if isinstance(summary_result, list) and len(summary_result) > 0: |
|
return summary_result[0].get('summary_text', '') |
|
except Exception as e: |
|
logger.warning(f"Summarization model failed: {str(e)}") |
|
|
|
|
|
sentences = document_text.split('.') |
|
sentences = [s.strip() for s in sentences if len(s.strip()) > 20] |
|
|
|
if not sentences: |
|
return "Document contains insufficient text for summarization." |
|
|
|
|
|
key_sentences = [] |
|
|
|
if document_type != 'Unknown': |
|
|
|
type_keywords = document_type.lower().split() |
|
for sentence in sentences: |
|
if any(keyword in sentence.lower() for keyword in type_keywords): |
|
key_sentences.append(sentence) |
|
if len(key_sentences) >= 2: |
|
break |
|
|
|
|
|
if not key_sentences: |
|
key_sentences = sentences[:3] |
|
|
|
|
|
summary = '. '.join(key_sentences) + '.' |
|
|
|
|
|
if len(summary) > 300: |
|
summary = summary[:297] + '...' |
|
|
|
return summary |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating summary: {str(e)}") |
|
return "Summary generation failed." |
|
|
|
def extract_document_key_info(document_text): |
|
""" |
|
Extract key information from document. |
|
""" |
|
key_info = {} |
|
|
|
try: |
|
|
|
address_patterns = [ |
|
r'\b\d+\s+[A-Za-z\s]+(?:Street|St|Road|Rd|Avenue|Ave|Lane|Ln|Drive|Dr|Boulevard|Blvd)\b', |
|
r'\b[A-Za-z\s]+,\s*[A-Za-z\s]+,\s*[A-Z]{2}\s*\d{5}\b' |
|
] |
|
|
|
for pattern in address_patterns: |
|
matches = re.findall(pattern, document_text, re.IGNORECASE) |
|
if matches: |
|
key_info['addresses'] = matches[:3] |
|
break |
|
|
|
|
|
date_patterns = [ |
|
r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', |
|
r'\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b', |
|
r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b' |
|
] |
|
|
|
dates = [] |
|
for pattern in date_patterns: |
|
dates.extend(re.findall(pattern, document_text, re.IGNORECASE)) |
|
if dates: |
|
key_info['dates'] = dates[:5] |
|
|
|
|
|
amount_patterns = [ |
|
r'\$\d{1,3}(?:,\d{3})*(?:\.\d{2})?', |
|
r'₹\d{1,3}(?:,\d{3})*(?:\.\d{2})?', |
|
r'\d{1,3}(?:,\d{3})*(?:\.\d{2})?\s*(?:dollars?|rupees?|rs?)', |
|
] |
|
|
|
amounts = [] |
|
for pattern in amount_patterns: |
|
amounts.extend(re.findall(pattern, document_text, re.IGNORECASE)) |
|
if amounts: |
|
key_info['amounts'] = amounts[:5] |
|
|
|
|
|
phone_pattern = r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b' |
|
phones = re.findall(phone_pattern, document_text) |
|
if phones: |
|
key_info['phone_numbers'] = phones[:3] |
|
|
|
|
|
property_patterns = { |
|
'bedrooms': r'\b(\d+)\s*(?:bedroom|bed|br)\b', |
|
'bathrooms': r'\b(\d+)\s*(?:bathroom|bath|ba)\b', |
|
'square_feet': r'\b(\d{1,3}(?:,\d{3})*)\s*(?:square\s*feet|sq\s*ft|sqft)\b', |
|
'acres': r'\b(\d+(?:\.\d+)?)\s*acres?\b' |
|
} |
|
|
|
for key, pattern in property_patterns.items(): |
|
matches = re.findall(pattern, document_text, re.IGNORECASE) |
|
if matches: |
|
key_info[key] = matches[0] |
|
|
|
|
|
name_pattern = r'\b[A-Z][a-z]+\s+[A-Z][a-z]+\b' |
|
names = re.findall(name_pattern, document_text) |
|
if names: |
|
key_info['names'] = names[:5] |
|
|
|
except Exception as e: |
|
logger.warning(f"Error extracting key info: {str(e)}") |
|
|
|
return key_info |
|
|
|
def detect_signatures(text_lower): |
|
""" |
|
Detect signatures in document. |
|
""" |
|
signature_indicators = [ |
|
'signature', 'signed', 'sign', 'signatory', 'witness', |
|
'notary', 'notarized', 'attorney', 'lawyer', 'agent' |
|
] |
|
|
|
return any(indicator in text_lower for indicator in signature_indicators) |
|
|
|
def detect_dates(document_text): |
|
""" |
|
Detect dates in document. |
|
""" |
|
date_patterns = [ |
|
r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', |
|
r'\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b', |
|
r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b' |
|
] |
|
|
|
for pattern in date_patterns: |
|
if re.search(pattern, document_text, re.IGNORECASE): |
|
return True |
|
|
|
return False |
|
|
|
def assess_document_authenticity(document_text, has_signatures, has_dates, key_info): |
|
""" |
|
Assess document authenticity. |
|
""" |
|
authenticity_score = 0.0 |
|
|
|
|
|
if has_signatures: |
|
authenticity_score += 0.3 |
|
if has_dates: |
|
authenticity_score += 0.2 |
|
if key_info.get('addresses'): |
|
authenticity_score += 0.2 |
|
if key_info.get('amounts'): |
|
authenticity_score += 0.1 |
|
if key_info.get('names'): |
|
authenticity_score += 0.1 |
|
if len(document_text) > 500: |
|
authenticity_score += 0.1 |
|
|
|
|
|
if authenticity_score >= 0.7: |
|
assessment = 'Authentic' |
|
elif authenticity_score >= 0.4: |
|
assessment = 'Likely Authentic' |
|
elif authenticity_score >= 0.2: |
|
assessment = 'Suspicious' |
|
else: |
|
assessment = 'Potentially Fake' |
|
|
|
return assessment, authenticity_score |
|
|
|
def calculate_verification_score(confidence, document_confidence, authenticity_confidence, has_signatures, has_dates, key_info): |
|
""" |
|
Calculate overall verification score. |
|
""" |
|
score = 0.0 |
|
|
|
|
|
score += confidence * 0.3 |
|
|
|
|
|
score += document_confidence * 0.2 |
|
|
|
|
|
score += authenticity_confidence * 0.2 |
|
|
|
|
|
if has_signatures: |
|
score += 0.1 |
|
if has_dates: |
|
score += 0.1 |
|
if key_info.get('addresses'): |
|
score += 0.05 |
|
if key_info.get('amounts'): |
|
score += 0.05 |
|
|
|
return min(100.0, score * 100) |
|
|
|
def check_document_consistency(document_text, property_data): |
|
""" |
|
Check document consistency with property data. |
|
""" |
|
try: |
|
if not property_data: |
|
return { |
|
'is_consistent': True, |
|
'confidence': 0.5, |
|
'issues': [], |
|
'model_used': 'static_fallback' |
|
} |
|
|
|
consistency_score = 0.5 |
|
issues = [] |
|
|
|
|
|
if property_data.get('address'): |
|
property_address = property_data['address'].lower() |
|
doc_addresses = re.findall(r'\b\d+\s+[A-Za-z\s]+(?:Street|St|Road|Rd|Avenue|Ave)\b', document_text, re.IGNORECASE) |
|
|
|
for doc_addr in doc_addresses: |
|
if any(word in doc_addr.lower() for word in property_address.split()): |
|
consistency_score += 0.2 |
|
break |
|
else: |
|
issues.append("Address mismatch between document and property data") |
|
|
|
|
|
if property_data.get('property_type'): |
|
property_type = property_data['property_type'].lower() |
|
if property_type in document_text.lower(): |
|
consistency_score += 0.1 |
|
else: |
|
issues.append("Property type mismatch") |
|
|
|
|
|
if property_data.get('sq_ft'): |
|
property_size = property_data['sq_ft'] |
|
size_matches = re.findall(r'\b(\d{1,3}(?:,\d{3})*)\s*(?:square\s*feet|sq\s*ft|sqft)\b', document_text, re.IGNORECASE) |
|
if size_matches: |
|
doc_size = size_matches[0].replace(',', '') |
|
if abs(int(doc_size) - int(property_size)) < 100: |
|
consistency_score += 0.1 |
|
else: |
|
issues.append("Property size mismatch") |
|
|
|
return { |
|
'is_consistent': consistency_score > 0.6, |
|
'confidence': min(1.0, consistency_score), |
|
'issues': issues, |
|
'model_used': 'static_fallback' |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error checking document consistency: {str(e)}") |
|
return { |
|
'is_consistent': False, |
|
'confidence': 0.0, |
|
'issues': [f"Consistency check error: {str(e)}"], |
|
'model_used': 'static_fallback' |
|
} |