propertyverification / models /pdf_analysis.py
sksameermujahid's picture
Upload 23 files
6e3dbdb verified
# models/pdf_analysis.py
import fitz # PyMuPDF
import re
from .model_loader import load_model
from .logging_config import logger
def extract_text_from_pdf(pdf_file):
"""
Extract text from PDF file with better error handling.
"""
try:
# Open the PDF
doc = fitz.open(stream=pdf_file.read(), filetype="pdf")
text = ""
# Extract text from all pages
for page_num in range(len(doc)):
page = doc.load_page(page_num)
text += page.get_text()
doc.close()
return text.strip()
except Exception as e:
logger.error(f"Error extracting text from PDF: {str(e)}")
return ""
def analyze_pdf_content(document_text, property_data):
"""
Analyze PDF content for real estate verification with perfect classification and summarization.
Args:
document_text: Extracted text from PDF
property_data: Property information for cross-validation
Returns:
dict: Comprehensive analysis results
"""
try:
if not document_text or len(document_text.strip()) < 10:
return {
'is_property_related': False,
'confidence': 0.0,
'summary': 'Document too short or empty',
'key_info': {},
'verification_score': 0.0,
'document_type': 'Unknown',
'document_confidence': 0.0,
'authenticity_assessment': 'Unknown',
'authenticity_confidence': 0.0,
'contains_signatures': False,
'contains_dates': False,
'real_estate_indicators': [],
'legal_terms_found': [],
'model_used': 'static_fallback'
}
# Comprehensive real estate keyword analysis
real_estate_keywords = {
'property_terms': [
'property', 'house', 'apartment', 'flat', 'villa', 'land', 'real estate',
'residential', 'commercial', 'industrial', 'plot', 'acre', 'square feet',
'sq ft', 'sqft', 'bedroom', 'bathroom', 'kitchen', 'living room',
'dining room', 'garage', 'parking', 'garden', 'balcony', 'terrace'
],
'legal_terms': [
'title', 'deed', 'ownership', 'mortgage', 'loan', 'lease', 'rent',
'agreement', 'contract', 'sale', 'purchase', 'transfer', 'registration',
'encumbrance', 'lien', 'easement', 'zoning', 'permit', 'license',
'tax', 'assessment', 'valuation', 'appraisal', 'survey', 'boundary'
],
'financial_terms': [
'price', 'value', 'cost', 'amount', 'payment', 'installment',
'down payment', 'interest', 'rate', 'principal', 'balance',
'insurance', 'premium', 'deposit', 'advance', 'rental', 'security'
],
'location_terms': [
'address', 'location', 'street', 'road', 'avenue', 'lane',
'city', 'state', 'country', 'postal', 'zip', 'pincode',
'neighborhood', 'area', 'district', 'zone', 'sector', 'block'
]
}
text_lower = document_text.lower()
# Count keyword matches for each category
keyword_counts = {}
found_keywords = {}
for category, keywords in real_estate_keywords.items():
matches = []
for keyword in keywords:
if keyword in text_lower:
matches.append(keyword)
keyword_counts[category] = len(matches)
found_keywords[category] = matches
# Calculate overall confidence
total_keywords = sum(len(keywords) for keywords in real_estate_keywords.values())
total_matches = sum(keyword_counts.values())
confidence = min(1.0, total_matches / (total_keywords * 0.3)) # 30% threshold
# Determine document type with high accuracy
document_type, document_confidence = classify_document_type(text_lower, found_keywords)
# Generate comprehensive summary
summary = generate_document_summary(document_text, document_type)
# Extract key information
key_info = extract_document_key_info(document_text)
# Check for signatures and dates
contains_signatures = detect_signatures(text_lower)
contains_dates = detect_dates(document_text)
# Assess authenticity
authenticity_assessment, authenticity_confidence = assess_document_authenticity(
document_text, contains_signatures, contains_dates, key_info
)
# Calculate verification score
verification_score = calculate_verification_score(
confidence, document_confidence, authenticity_confidence,
contains_signatures, contains_dates, key_info
)
# Determine if it's real estate related
is_property_related = confidence > 0.2 or document_type != 'Unknown'
# Extract legal terms
legal_terms_found = found_keywords.get('legal_terms', [])
# Create real estate indicators list
real_estate_indicators = []
for category, matches in found_keywords.items():
if matches:
real_estate_indicators.extend(matches[:3]) # Top 3 from each category
return {
'is_property_related': is_property_related,
'confidence': confidence,
'summary': summary,
'key_info': key_info,
'verification_score': verification_score,
'document_type': document_type,
'document_confidence': document_confidence,
'authenticity_assessment': authenticity_assessment,
'authenticity_confidence': authenticity_confidence,
'contains_signatures': contains_signatures,
'contains_dates': contains_dates,
'real_estate_indicators': real_estate_indicators,
'legal_terms_found': legal_terms_found,
'keyword_analysis': keyword_counts,
'model_used': 'static_fallback'
}
except Exception as e:
logger.error(f"Error in PDF content analysis: {str(e)}")
return {
'is_property_related': False,
'confidence': 0.0,
'summary': f'Analysis error: {str(e)}',
'key_info': {},
'verification_score': 0.0,
'document_type': 'Unknown',
'document_confidence': 0.0,
'authenticity_assessment': 'Unknown',
'authenticity_confidence': 0.0,
'contains_signatures': False,
'contains_dates': False,
'real_estate_indicators': [],
'legal_terms_found': [],
'model_used': 'static_fallback',
'error': str(e)
}
def classify_document_type(text_lower, found_keywords):
"""
Classify document type with high accuracy.
"""
# Document type patterns
document_patterns = {
'Property Title Deed': {
'keywords': ['title', 'deed', 'ownership', 'property', 'owner'],
'confidence': 0.9
},
'Mortgage Document': {
'keywords': ['mortgage', 'loan', 'bank', 'lender', 'borrower', 'principal', 'interest'],
'confidence': 0.85
},
'Lease Agreement': {
'keywords': ['lease', 'rent', 'tenant', 'landlord', 'rental', 'agreement'],
'confidence': 0.8
},
'Sale Contract': {
'keywords': ['sale', 'purchase', 'buyer', 'seller', 'contract', 'agreement'],
'confidence': 0.8
},
'Tax Assessment': {
'keywords': ['tax', 'assessment', 'valuation', 'appraisal', 'property tax'],
'confidence': 0.75
},
'Building Permit': {
'keywords': ['permit', 'building', 'construction', 'approval', 'zoning'],
'confidence': 0.7
},
'Property Survey': {
'keywords': ['survey', 'boundary', 'measurement', 'plot', 'dimension'],
'confidence': 0.7
},
'Insurance Document': {
'keywords': ['insurance', 'policy', 'premium', 'coverage', 'claim'],
'confidence': 0.65
}
}
best_match = 'Unknown'
best_confidence = 0.0
for doc_type, pattern in document_patterns.items():
matches = sum(1 for keyword in pattern['keywords'] if keyword in text_lower)
if matches > 0:
# Calculate confidence based on matches
match_ratio = matches / len(pattern['keywords'])
confidence = pattern['confidence'] * match_ratio
if confidence > best_confidence:
best_match = doc_type
best_confidence = confidence
return best_match, best_confidence
def generate_document_summary(document_text, document_type):
"""
Generate comprehensive document summary.
"""
try:
# Try to use summarization model if available
try:
summarizer = load_model("summarization")
if hasattr(summarizer, 'fallback_used') and not summarizer.fallback_used:
# Use model for summarization
summary_result = summarizer(document_text[:1000], max_length=150, min_length=50)
if isinstance(summary_result, list) and len(summary_result) > 0:
return summary_result[0].get('summary_text', '')
except Exception as e:
logger.warning(f"Summarization model failed: {str(e)}")
# Fallback to extractive summarization
sentences = document_text.split('.')
sentences = [s.strip() for s in sentences if len(s.strip()) > 20]
if not sentences:
return "Document contains insufficient text for summarization."
# Select key sentences based on document type
key_sentences = []
if document_type != 'Unknown':
# Look for sentences containing document type keywords
type_keywords = document_type.lower().split()
for sentence in sentences:
if any(keyword in sentence.lower() for keyword in type_keywords):
key_sentences.append(sentence)
if len(key_sentences) >= 2:
break
# If no type-specific sentences, take first few meaningful sentences
if not key_sentences:
key_sentences = sentences[:3]
# Combine sentences
summary = '. '.join(key_sentences) + '.'
# Truncate if too long
if len(summary) > 300:
summary = summary[:297] + '...'
return summary
except Exception as e:
logger.error(f"Error generating summary: {str(e)}")
return "Summary generation failed."
def extract_document_key_info(document_text):
"""
Extract key information from document.
"""
key_info = {}
try:
# Extract addresses
address_patterns = [
r'\b\d+\s+[A-Za-z\s]+(?:Street|St|Road|Rd|Avenue|Ave|Lane|Ln|Drive|Dr|Boulevard|Blvd)\b',
r'\b[A-Za-z\s]+,\s*[A-Za-z\s]+,\s*[A-Z]{2}\s*\d{5}\b'
]
for pattern in address_patterns:
matches = re.findall(pattern, document_text, re.IGNORECASE)
if matches:
key_info['addresses'] = matches[:3] # Top 3 addresses
break
# Extract dates
date_patterns = [
r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b',
r'\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b',
r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b'
]
dates = []
for pattern in date_patterns:
dates.extend(re.findall(pattern, document_text, re.IGNORECASE))
if dates:
key_info['dates'] = dates[:5] # Top 5 dates
# Extract amounts/money
amount_patterns = [
r'\$\d{1,3}(?:,\d{3})*(?:\.\d{2})?',
r'₹\d{1,3}(?:,\d{3})*(?:\.\d{2})?',
r'\d{1,3}(?:,\d{3})*(?:\.\d{2})?\s*(?:dollars?|rupees?|rs?)',
]
amounts = []
for pattern in amount_patterns:
amounts.extend(re.findall(pattern, document_text, re.IGNORECASE))
if amounts:
key_info['amounts'] = amounts[:5] # Top 5 amounts
# Extract phone numbers
phone_pattern = r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'
phones = re.findall(phone_pattern, document_text)
if phones:
key_info['phone_numbers'] = phones[:3] # Top 3 phone numbers
# Extract property details
property_patterns = {
'bedrooms': r'\b(\d+)\s*(?:bedroom|bed|br)\b',
'bathrooms': r'\b(\d+)\s*(?:bathroom|bath|ba)\b',
'square_feet': r'\b(\d{1,3}(?:,\d{3})*)\s*(?:square\s*feet|sq\s*ft|sqft)\b',
'acres': r'\b(\d+(?:\.\d+)?)\s*acres?\b'
}
for key, pattern in property_patterns.items():
matches = re.findall(pattern, document_text, re.IGNORECASE)
if matches:
key_info[key] = matches[0] # First match
# Extract names
name_pattern = r'\b[A-Z][a-z]+\s+[A-Z][a-z]+\b'
names = re.findall(name_pattern, document_text)
if names:
key_info['names'] = names[:5] # Top 5 names
except Exception as e:
logger.warning(f"Error extracting key info: {str(e)}")
return key_info
def detect_signatures(text_lower):
"""
Detect signatures in document.
"""
signature_indicators = [
'signature', 'signed', 'sign', 'signatory', 'witness',
'notary', 'notarized', 'attorney', 'lawyer', 'agent'
]
return any(indicator in text_lower for indicator in signature_indicators)
def detect_dates(document_text):
"""
Detect dates in document.
"""
date_patterns = [
r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b',
r'\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b',
r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b'
]
for pattern in date_patterns:
if re.search(pattern, document_text, re.IGNORECASE):
return True
return False
def assess_document_authenticity(document_text, has_signatures, has_dates, key_info):
"""
Assess document authenticity.
"""
authenticity_score = 0.0
# Base score
if has_signatures:
authenticity_score += 0.3
if has_dates:
authenticity_score += 0.2
if key_info.get('addresses'):
authenticity_score += 0.2
if key_info.get('amounts'):
authenticity_score += 0.1
if key_info.get('names'):
authenticity_score += 0.1
if len(document_text) > 500:
authenticity_score += 0.1
# Determine assessment
if authenticity_score >= 0.7:
assessment = 'Authentic'
elif authenticity_score >= 0.4:
assessment = 'Likely Authentic'
elif authenticity_score >= 0.2:
assessment = 'Suspicious'
else:
assessment = 'Potentially Fake'
return assessment, authenticity_score
def calculate_verification_score(confidence, document_confidence, authenticity_confidence, has_signatures, has_dates, key_info):
"""
Calculate overall verification score.
"""
score = 0.0
# Base confidence
score += confidence * 0.3
# Document type confidence
score += document_confidence * 0.2
# Authenticity confidence
score += authenticity_confidence * 0.2
# Additional factors
if has_signatures:
score += 0.1
if has_dates:
score += 0.1
if key_info.get('addresses'):
score += 0.05
if key_info.get('amounts'):
score += 0.05
return min(100.0, score * 100)
def check_document_consistency(document_text, property_data):
"""
Check document consistency with property data.
"""
try:
if not property_data:
return {
'is_consistent': True,
'confidence': 0.5,
'issues': [],
'model_used': 'static_fallback'
}
consistency_score = 0.5 # Base score
issues = []
# Check address consistency
if property_data.get('address'):
property_address = property_data['address'].lower()
doc_addresses = re.findall(r'\b\d+\s+[A-Za-z\s]+(?:Street|St|Road|Rd|Avenue|Ave)\b', document_text, re.IGNORECASE)
for doc_addr in doc_addresses:
if any(word in doc_addr.lower() for word in property_address.split()):
consistency_score += 0.2
break
else:
issues.append("Address mismatch between document and property data")
# Check property type consistency
if property_data.get('property_type'):
property_type = property_data['property_type'].lower()
if property_type in document_text.lower():
consistency_score += 0.1
else:
issues.append("Property type mismatch")
# Check size consistency
if property_data.get('sq_ft'):
property_size = property_data['sq_ft']
size_matches = re.findall(r'\b(\d{1,3}(?:,\d{3})*)\s*(?:square\s*feet|sq\s*ft|sqft)\b', document_text, re.IGNORECASE)
if size_matches:
doc_size = size_matches[0].replace(',', '')
if abs(int(doc_size) - int(property_size)) < 100: # Within 100 sq ft
consistency_score += 0.1
else:
issues.append("Property size mismatch")
return {
'is_consistent': consistency_score > 0.6,
'confidence': min(1.0, consistency_score),
'issues': issues,
'model_used': 'static_fallback'
}
except Exception as e:
logger.error(f"Error checking document consistency: {str(e)}")
return {
'is_consistent': False,
'confidence': 0.0,
'issues': [f"Consistency check error: {str(e)}"],
'model_used': 'static_fallback'
}