propertyverification / models /fraud_classification.py
sksameermujahid's picture
Upload 22 files
ebb3d5e verified
# models/fraud_classification.py
from .model_loader import load_model
from .logging_config import logger
import re
def classify_fraud(property_details, description):
"""
Classify the fraud risk of a property listing using AI.
"""
try:
# Combine property details and description for analysis
text_to_analyze = f"{property_details} {description}"
# CRITICAL: Check for obvious fake data patterns first - Much more lenient
fake_patterns = [
r'^\d+$', # Only numbers (very strict)
r'price.*\d{1,2}', # Very low prices (more lenient)
r'size.*\d{1,2}', # Very small sizes (more lenient)
]
fake_detected = False
for pattern in fake_patterns:
if re.search(pattern, text_to_analyze.lower()):
# Only mark as fake if it's extremely obvious
if pattern == r'^\d+$' and len(text_to_analyze.strip()) <= 3:
fake_detected = True
break
# For other patterns, be more lenient
elif pattern in [r'price.*\d{1,2}', r'size.*\d{1,2}']:
# Only mark as fake if multiple patterns are found
continue
# Check for repeated numbers (like "2, 2, 2, 2") - Much more lenient
numbers = re.findall(r'\b\d+\b', text_to_analyze.lower())
if len(numbers) >= 5: # Increased threshold from 3 to 5
unique_numbers = set(numbers)
if len(unique_numbers) <= 1: # Only if ALL numbers are the same
fake_detected = True
# Check for extremely low values - Much more lenient
if any(word in text_to_analyze.lower() for word in ['₹1', '₹2']): # Only extremely low values
fake_detected = True
# Check for very small property sizes - Much more lenient
if any(word in text_to_analyze.lower() for word in ['1 sq ft', '2 sq ft']): # Only extremely small
fake_detected = True
# If fake data is detected, return moderate fraud score instead of high
if fake_detected:
return {
'alert_level': 'medium', # Changed from 'high' to 'medium'
'alert_score': 0.6, # Reduced from 0.9 to 0.6
'confidence_scores': {
'high risk listing': 0.6, # Reduced from 0.9
'potential fraud': 0.5, # Reduced from 0.8
'suspicious listing': 0.4, # Reduced from 0.7
'legitimate listing': 0.2 # Increased from 0.1
},
'high_risk': ['Fake data patterns detected'],
'medium_risk': [],
'low_risk': [],
'reasoning': 'This property was classified as medium risk due to detected fake data patterns.'
}
# Use a more lenient classification approach for legitimate-looking data
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli")
# More balanced risk categories
risk_categories = [
"legitimate listing",
"suspicious listing",
"potential fraud",
"high risk listing"
]
# Classify the text
result = classifier(text_to_analyze[:1000], risk_categories, multi_label=False)
fraud_classification = {
'alert_level': 'minimal',
'alert_score': 0.0,
'confidence_scores': {},
'high_risk': [],
'medium_risk': [],
'low_risk': [],
'reasoning': ''
}
# Process classification results - Much more lenient for legitimate data
fraud_score = 0.0
if isinstance(result, dict) and 'scores' in result:
for label, score in zip(result.get('labels', []), result.get('scores', [])):
if label != "legitimate listing":
try:
score_val = float(score)
# Much more lenient reduction of suspicious classifications
if label == "suspicious listing":
score_val *= 0.3 # Reduced from 0.5 to 0.3
elif label == "potential fraud":
score_val *= 0.5 # Reduced from 0.7 to 0.5
elif label == "high risk listing":
score_val *= 0.6 # Reduced from 0.8 to 0.6
except Exception:
score_val = 0.0
fraud_score += score_val
fraud_classification['confidence_scores'][label] = score_val
else:
# Handle fallback result
fraud_score = 0.02 # Reduced from 0.05 to 0.02
# Normalize fraud score to 0-1 range with much more lenient scaling
try:
fraud_score = min(1.0, fraud_score / (len(risk_categories) - 1) * 0.5) # Reduced by 50%
except Exception:
fraud_score = 0.0
fraud_classification['alert_score'] = fraud_score
# Determine alert level with much more lenient thresholds
if fraud_score >= 0.8: # Increased from 0.7
fraud_classification['alert_level'] = 'high'
elif fraud_score >= 0.5: # Increased from 0.4
fraud_classification['alert_level'] = 'medium'
elif fraud_score >= 0.3: # Increased from 0.2
fraud_classification['alert_level'] = 'low'
else:
fraud_classification['alert_level'] = 'minimal'
# Generate reasoning based on scores
reasoning_parts = []
if fraud_score < 0.3:
reasoning_parts.append("This property was classified as legitimate based on AI analysis of the listing details.")
elif fraud_score < 0.5:
reasoning_parts.append("This property was classified as low risk based on AI analysis of the listing details.")
elif fraud_score < 0.8:
reasoning_parts.append("This property was classified as medium risk based on AI analysis of the listing details.")
else:
reasoning_parts.append("This property was classified as high risk based on AI analysis of the listing details.")
# Add specific risk indicators if any
if fraud_classification['confidence_scores']:
highest_risk = max(fraud_classification['confidence_scores'].items(), key=lambda x: x[1])
if highest_risk[1] > 0.4: # Increased threshold from 0.3 to 0.4
reasoning_parts.append(f"Primary concern: {highest_risk[0]} (confidence: {highest_risk[1]:.0%})")
fraud_classification['reasoning'] = " ".join(reasoning_parts)
return fraud_classification
except Exception as e:
logger.error(f"Error in fraud classification: {str(e)}")
return {
'alert_level': 'minimal',
'alert_score': 0.02, # Reduced from 0.05 to 0.02
'confidence_scores': {},
'high_risk': [],
'medium_risk': [],
'low_risk': [],
'reasoning': f'Fraud analysis failed: {str(e)}'
}
def simple_fraud_classification(text, categories):
"""
Simple keyword-based fraud classification fallback.
"""
text_lower = text.lower()
# Define keywords for each category
category_keywords = {
"fraudulent listing": ["fake", "scam", "fraud", "illegal", "unauthorized"],
"misleading information": ["misleading", "false", "wrong", "incorrect", "fake"],
"fake property": ["fake", "non-existent", "virtual", "photoshopped"],
"scam attempt": ["scam", "fraud", "cheat", "trick", "deceive"],
"legitimate listing": ["real", "genuine", "authentic", "verified", "legitimate"]
}
scores = []
for category in categories:
keywords = category_keywords.get(category, [])
score = sum(1 for keyword in keywords if keyword in text_lower) / len(keywords) if keywords else 0.1
scores.append(min(1.0, score))
return {
"labels": categories,
"scores": scores
}