|
|
|
|
|
import re |
|
from .model_loader import load_model |
|
from .logging_config import logger |
|
from typing import Dict, Any, List, Tuple |
|
|
|
def analyze_legal_details(legal_text: str) -> Dict[str, Any]: |
|
"""Analyze legal details of a property with comprehensive validation.""" |
|
try: |
|
if not legal_text or len(str(legal_text).strip()) < 5: |
|
return { |
|
'assessment': 'insufficient', |
|
'confidence': 0.1, |
|
'summary': 'No legal details provided', |
|
'completeness_score': 5, |
|
'potential_issues': False, |
|
'legal_metrics': { |
|
'text_length': 0, |
|
'word_count': 0, |
|
'legal_terms_found': 0 |
|
}, |
|
'reasoning': 'No legal details provided for analysis', |
|
'top_classifications': [], |
|
'document_verification': {}, |
|
'compliance_status': {}, |
|
'risk_assessment': {} |
|
} |
|
|
|
|
|
try: |
|
classifier = load_model("zero-shot-classification") |
|
except Exception as e: |
|
logger.error(f"Error loading model in legal analysis: {str(e)}") |
|
|
|
legal_text_str = str(legal_text) |
|
legal_terms = ['title', 'deed', 'registration', 'tax', 'permit', 'approval', 'certificate', 'compliance', 'legal'] |
|
legal_terms_found = sum(1 for term in legal_terms if term in legal_text_str.lower()) |
|
|
|
fallback_score = min(50, legal_terms_found * 10) |
|
|
|
return { |
|
'assessment': 'basic', |
|
'confidence': 0.3, |
|
'summary': f'Model loading error, using fallback analysis. Found {legal_terms_found} legal terms.', |
|
'completeness_score': fallback_score, |
|
'potential_issues': False, |
|
'legal_metrics': { |
|
'text_length': len(legal_text_str), |
|
'word_count': len(legal_text_str.split()), |
|
'legal_terms_found': legal_terms_found |
|
}, |
|
'reasoning': f'Model loading error: {str(e)}. Using fallback scoring based on legal terms found.', |
|
'top_classifications': [], |
|
'document_verification': {}, |
|
'compliance_status': {}, |
|
'risk_assessment': {} |
|
} |
|
|
|
|
|
categories = [ |
|
|
|
"clear title documentation", |
|
"title verification documents", |
|
"ownership transfer documents", |
|
"inheritance documents", |
|
"gift deed documents", |
|
"power of attorney documents", |
|
|
|
|
|
"property registration documents", |
|
"sale deed documents", |
|
"conveyance deed documents", |
|
"development agreement documents", |
|
"joint development agreement documents", |
|
|
|
|
|
"property tax records", |
|
"tax clearance certificates", |
|
"encumbrance certificates", |
|
"bank loan documents", |
|
"mortgage documents", |
|
|
|
|
|
"building permits", |
|
"construction approvals", |
|
"occupation certificates", |
|
"completion certificates", |
|
"environmental clearances", |
|
|
|
|
|
"land use certificates", |
|
"zoning certificates", |
|
"layout approvals", |
|
"master plan compliance", |
|
"land conversion documents", |
|
|
|
|
|
"legal compliance certificates", |
|
"no objection certificates", |
|
"fire safety certificates", |
|
"structural stability certificates", |
|
"water and electricity compliance", |
|
|
|
|
|
"property dispute records", |
|
"litigation history", |
|
"court orders", |
|
"settlement agreements", |
|
"pending legal cases" |
|
] |
|
|
|
|
|
legal_context = f""" |
|
Legal Documentation Analysis: |
|
{legal_text} |
|
|
|
Please analyze the above legal documentation for: |
|
1. Completeness of legal information |
|
2. Presence of required documents |
|
3. Compliance with regulations |
|
4. Potential legal issues |
|
5. Risk assessment |
|
""" |
|
|
|
|
|
try: |
|
legal_result = classifier(legal_context[:1000], categories, multi_label=True) |
|
except Exception as e: |
|
logger.error(f"Error in legal classification: {str(e)}") |
|
|
|
return simple_legal_analysis(legal_text, categories) |
|
|
|
|
|
legal_metrics = calculate_legal_metrics(legal_result, categories) |
|
|
|
|
|
top_classifications = [] |
|
for label, score in zip(legal_result['labels'][:5], legal_result['scores'][:5]): |
|
if score > 0.2: |
|
top_classifications.append({ |
|
'classification': label, |
|
'confidence': float(score) |
|
}) |
|
|
|
|
|
positive_categories = [ |
|
"clear title documentation", "property registration documents", "sale deed documents", |
|
"property tax records", "building permits", "occupation certificates", |
|
"legal compliance certificates", "no objection certificates" |
|
] |
|
|
|
positive_score = sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) |
|
if label in positive_categories) |
|
completeness_score = min(100, int(positive_score * 100)) |
|
|
|
|
|
if completeness_score < 10 and len(legal_text) > 20: |
|
completeness_score = 10 |
|
|
|
|
|
if completeness_score >= 80: |
|
assessment = 'excellent' |
|
confidence = 0.9 |
|
elif completeness_score >= 60: |
|
assessment = 'good' |
|
confidence = 0.7 |
|
elif completeness_score >= 40: |
|
assessment = 'adequate' |
|
confidence = 0.5 |
|
elif completeness_score >= 20: |
|
assessment = 'basic' |
|
confidence = 0.3 |
|
else: |
|
assessment = 'basic' |
|
confidence = 0.2 |
|
|
|
|
|
summary = summarize_text(legal_text) |
|
|
|
return { |
|
'assessment': assessment, |
|
'confidence': confidence, |
|
'summary': summary, |
|
'completeness_score': completeness_score, |
|
'potential_issues': legal_metrics.get('potential_issues', False), |
|
'legal_metrics': legal_metrics, |
|
'reasoning': f'Legal analysis completed with {completeness_score}% completeness score.', |
|
'top_classifications': top_classifications, |
|
'document_verification': { |
|
'title_docs': legal_metrics.get('title_docs', 0), |
|
'registration_docs': legal_metrics.get('registration_docs', 0), |
|
'tax_docs': legal_metrics.get('tax_docs', 0), |
|
'approval_docs': legal_metrics.get('approval_docs', 0) |
|
}, |
|
'compliance_status': { |
|
'overall_compliance': legal_metrics.get('compliance_score', 0), |
|
'missing_documents': legal_metrics.get('missing_docs', []) |
|
}, |
|
'risk_assessment': { |
|
'risk_level': legal_metrics.get('risk_level', 'low'), |
|
'risk_factors': legal_metrics.get('risk_factors', []) |
|
} |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error in legal analysis: {str(e)}") |
|
|
|
return { |
|
'assessment': 'basic', |
|
'confidence': 0.2, |
|
'summary': 'Legal analysis failed due to technical error', |
|
'completeness_score': 10, |
|
'potential_issues': False, |
|
'legal_metrics': { |
|
'text_length': len(str(legal_text)) if legal_text else 0, |
|
'word_count': len(str(legal_text).split()) if legal_text else 0, |
|
'legal_terms_found': 0 |
|
}, |
|
'reasoning': f'Legal analysis error: {str(e)}. Using fallback scoring.', |
|
'top_classifications': [], |
|
'document_verification': {}, |
|
'compliance_status': {}, |
|
'risk_assessment': {} |
|
} |
|
|
|
def calculate_legal_metrics(legal_result, categories): |
|
"""Calculate legal metrics from classification results.""" |
|
try: |
|
if not isinstance(legal_result, dict) or 'scores' not in legal_result: |
|
|
|
return { |
|
'title_and_ownership': 0.5, |
|
'property_registration': 0.5, |
|
'tax_and_financial': 0.5, |
|
'approvals_and_permits': 0.5, |
|
'land_and_usage': 0.5, |
|
'compliance_and_legal': 0.5, |
|
'disputes_and_litigation': 0.1 |
|
} |
|
|
|
scores = legal_result.get('scores', []) |
|
labels = legal_result.get('labels', []) |
|
|
|
|
|
label_scores = dict(zip(labels, scores)) |
|
|
|
return { |
|
'title_and_ownership': sum(label_scores.get(label, 0) for label in |
|
['clear title documentation', 'title verification documents', |
|
'ownership transfer documents', 'inheritance documents']) / 4, |
|
'property_registration': sum(label_scores.get(label, 0) for label in |
|
['property registration documents', 'sale deed documents', |
|
'conveyance deed documents', 'development agreement documents']) / 4, |
|
'tax_and_financial': sum(label_scores.get(label, 0) for label in |
|
['property tax records', 'tax clearance certificates', |
|
'encumbrance certificates', 'bank loan documents']) / 4, |
|
'approvals_and_permits': sum(label_scores.get(label, 0) for label in |
|
['building permits', 'construction approvals', |
|
'occupation certificates', 'completion certificates']) / 4, |
|
'land_and_usage': sum(label_scores.get(label, 0) for label in |
|
['land use certificates', 'zoning certificates', |
|
'layout approvals', 'master plan compliance']) / 4, |
|
'compliance_and_legal': sum(label_scores.get(label, 0) for label in |
|
['legal compliance certificates', 'no objection certificates', |
|
'fire safety certificates', 'structural stability certificates']) / 4, |
|
'disputes_and_litigation': sum(label_scores.get(label, 0) for label in |
|
['property dispute records', 'litigation history', |
|
'court orders', 'pending legal cases']) / 4 |
|
} |
|
except Exception as e: |
|
logger.error(f"Error calculating legal metrics: {str(e)}") |
|
return { |
|
'title_and_ownership': 0.5, |
|
'property_registration': 0.5, |
|
'tax_and_financial': 0.5, |
|
'approvals_and_permits': 0.5, |
|
'land_and_usage': 0.5, |
|
'compliance_and_legal': 0.5, |
|
'disputes_and_litigation': 0.1 |
|
} |
|
|
|
def simple_legal_analysis(legal_text, categories): |
|
"""Simple keyword-based legal analysis fallback.""" |
|
text_lower = legal_text.lower() |
|
|
|
|
|
category_keywords = { |
|
"clear title documentation": ["title", "clear", "documentation", "ownership"], |
|
"property registration documents": ["registration", "property", "documents", "registered"], |
|
"property tax records": ["tax", "property", "records", "assessment"], |
|
"building permits": ["permit", "building", "construction", "approval"], |
|
"legal compliance certificates": ["compliance", "legal", "certificate", "approved"], |
|
"property dispute records": ["dispute", "litigation", "court", "case"], |
|
"legitimate listing": ["real", "genuine", "authentic", "verified"] |
|
} |
|
|
|
scores = [] |
|
for category in categories: |
|
keywords = category_keywords.get(category, [category.split()[0]]) |
|
score = sum(1 for keyword in keywords if keyword in text_lower) / len(keywords) if keywords else 0.1 |
|
scores.append(min(1.0, score)) |
|
|
|
return { |
|
"labels": categories, |
|
"scores": scores |
|
} |
|
|
|
def summarize_text(text): |
|
"""Generate summary using model or fallback.""" |
|
try: |
|
summarizer = load_model("summarization") |
|
if hasattr(summarizer, 'task_type') and summarizer.task_type == "summarization": |
|
|
|
result = summarizer(text) |
|
return result[0]['summary_text'] if result else text[:200] + "..." |
|
else: |
|
|
|
result = summarizer(text, max_length=130, min_length=30, do_sample=False) |
|
return result[0]['summary_text'] |
|
except Exception as e: |
|
logger.warning(f"Model generation failed, using static summary: {str(e)}") |
|
|
|
sentences = text.split('.') |
|
if len(sentences) > 3: |
|
return '. '.join(sentences[:2]) + '.' |
|
else: |
|
return text[:200] + '...' if len(text) > 200 else text |
|
|