|
|
|
|
|
from .model_loader import load_model |
|
from .logging_config import logger |
|
import re |
|
|
|
def classify_fraud(property_details, description): |
|
""" |
|
Classify the fraud risk of a property listing using AI. |
|
""" |
|
try: |
|
|
|
text_to_analyze = f"{property_details} {description}" |
|
|
|
|
|
fake_patterns = [ |
|
r'^\d+$', |
|
r'price.*\d{1,2}', |
|
r'size.*\d{1,2}', |
|
] |
|
|
|
fake_detected = False |
|
for pattern in fake_patterns: |
|
if re.search(pattern, text_to_analyze.lower()): |
|
|
|
if pattern == r'^\d+$' and len(text_to_analyze.strip()) <= 3: |
|
fake_detected = True |
|
break |
|
|
|
elif pattern in [r'price.*\d{1,2}', r'size.*\d{1,2}']: |
|
|
|
continue |
|
|
|
|
|
numbers = re.findall(r'\b\d+\b', text_to_analyze.lower()) |
|
if len(numbers) >= 5: |
|
unique_numbers = set(numbers) |
|
if len(unique_numbers) <= 1: |
|
fake_detected = True |
|
|
|
|
|
if any(word in text_to_analyze.lower() for word in ['₹1', '₹2']): |
|
fake_detected = True |
|
|
|
|
|
if any(word in text_to_analyze.lower() for word in ['1 sq ft', '2 sq ft']): |
|
fake_detected = True |
|
|
|
|
|
if fake_detected: |
|
return { |
|
'alert_level': 'medium', |
|
'alert_score': 0.6, |
|
'confidence_scores': { |
|
'high risk listing': 0.6, |
|
'potential fraud': 0.5, |
|
'suspicious listing': 0.4, |
|
'legitimate listing': 0.2 |
|
}, |
|
'high_risk': ['Fake data patterns detected'], |
|
'medium_risk': [], |
|
'low_risk': [], |
|
'reasoning': 'This property was classified as medium risk due to detected fake data patterns.' |
|
} |
|
|
|
|
|
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") |
|
|
|
|
|
risk_categories = [ |
|
"legitimate listing", |
|
"suspicious listing", |
|
"potential fraud", |
|
"high risk listing" |
|
] |
|
|
|
|
|
result = classifier(text_to_analyze[:1000], risk_categories, multi_label=False) |
|
|
|
fraud_classification = { |
|
'alert_level': 'minimal', |
|
'alert_score': 0.0, |
|
'confidence_scores': {}, |
|
'high_risk': [], |
|
'medium_risk': [], |
|
'low_risk': [], |
|
'reasoning': '' |
|
} |
|
|
|
|
|
fraud_score = 0.0 |
|
if isinstance(result, dict) and 'scores' in result: |
|
for label, score in zip(result.get('labels', []), result.get('scores', [])): |
|
if label != "legitimate listing": |
|
try: |
|
score_val = float(score) |
|
|
|
if label == "suspicious listing": |
|
score_val *= 0.3 |
|
elif label == "potential fraud": |
|
score_val *= 0.5 |
|
elif label == "high risk listing": |
|
score_val *= 0.6 |
|
except Exception: |
|
score_val = 0.0 |
|
fraud_score += score_val |
|
fraud_classification['confidence_scores'][label] = score_val |
|
else: |
|
|
|
fraud_score = 0.02 |
|
|
|
|
|
try: |
|
fraud_score = min(1.0, fraud_score / (len(risk_categories) - 1) * 0.5) |
|
except Exception: |
|
fraud_score = 0.0 |
|
fraud_classification['alert_score'] = fraud_score |
|
|
|
|
|
if fraud_score >= 0.8: |
|
fraud_classification['alert_level'] = 'high' |
|
elif fraud_score >= 0.5: |
|
fraud_classification['alert_level'] = 'medium' |
|
elif fraud_score >= 0.3: |
|
fraud_classification['alert_level'] = 'low' |
|
else: |
|
fraud_classification['alert_level'] = 'minimal' |
|
|
|
|
|
reasoning_parts = [] |
|
|
|
if fraud_score < 0.3: |
|
reasoning_parts.append("This property was classified as legitimate based on AI analysis of the listing details.") |
|
elif fraud_score < 0.5: |
|
reasoning_parts.append("This property was classified as low risk based on AI analysis of the listing details.") |
|
elif fraud_score < 0.8: |
|
reasoning_parts.append("This property was classified as medium risk based on AI analysis of the listing details.") |
|
else: |
|
reasoning_parts.append("This property was classified as high risk based on AI analysis of the listing details.") |
|
|
|
|
|
if fraud_classification['confidence_scores']: |
|
highest_risk = max(fraud_classification['confidence_scores'].items(), key=lambda x: x[1]) |
|
if highest_risk[1] > 0.4: |
|
reasoning_parts.append(f"Primary concern: {highest_risk[0]} (confidence: {highest_risk[1]:.0%})") |
|
|
|
fraud_classification['reasoning'] = " ".join(reasoning_parts) |
|
|
|
return fraud_classification |
|
|
|
except Exception as e: |
|
logger.error(f"Error in fraud classification: {str(e)}") |
|
return { |
|
'alert_level': 'minimal', |
|
'alert_score': 0.02, |
|
'confidence_scores': {}, |
|
'high_risk': [], |
|
'medium_risk': [], |
|
'low_risk': [], |
|
'reasoning': f'Fraud analysis failed: {str(e)}' |
|
} |
|
|
|
def simple_fraud_classification(text, categories): |
|
""" |
|
Simple keyword-based fraud classification fallback. |
|
""" |
|
text_lower = text.lower() |
|
|
|
|
|
category_keywords = { |
|
"fraudulent listing": ["fake", "scam", "fraud", "illegal", "unauthorized"], |
|
"misleading information": ["misleading", "false", "wrong", "incorrect", "fake"], |
|
"fake property": ["fake", "non-existent", "virtual", "photoshopped"], |
|
"scam attempt": ["scam", "fraud", "cheat", "trick", "deceive"], |
|
"legitimate listing": ["real", "genuine", "authentic", "verified", "legitimate"] |
|
} |
|
|
|
scores = [] |
|
for category in categories: |
|
keywords = category_keywords.get(category, []) |
|
score = sum(1 for keyword in keywords if keyword in text_lower) / len(keywords) if keywords else 0.1 |
|
scores.append(min(1.0, score)) |
|
|
|
return { |
|
"labels": categories, |
|
"scores": scores |
|
} |
|
|