import pandas as pd import numpy as np import torch import torch.nn as nn from transformers import BertTokenizer, BertModel from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.ensemble import IsolationForest import warnings warnings.filterwarnings('ignore') class FraudDetectionTester: def __init__(self, model_path='fraud_detection_model.pth'): """Initialize the fraud detection tester""" self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') self.model_path = model_path self.model = None self.scaler = None self.label_encoder = None self.isolation_forest = None # Load the model self.load_model() def create_bert_fraud_model(self, numerical_features_dim): """Recreate the BERT fraud detection model architecture""" class BERTFraudDetector(nn.Module): def __init__(self, bert_model_name, numerical_features_dim, dropout_rate=0.3): super(BERTFraudDetector, self).__init__() # BERT for text processing self.bert = BertModel.from_pretrained(bert_model_name) # Freeze BERT parameters for faster training (optional) for param in self.bert.parameters(): param.requires_grad = False # Unfreeze last few layers for fine-tuning for param in self.bert.encoder.layer[-2:].parameters(): param.requires_grad = True # Feature processing layers self.text_projection = nn.Linear(self.bert.config.hidden_size, 256) self.numerical_projection = nn.Linear(numerical_features_dim, 256) # Anomaly detection features self.anomaly_detector = nn.Sequential( nn.Linear(256, 128), nn.ReLU(), nn.Dropout(dropout_rate), nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, 1) ) # Combined classifier self.classifier = nn.Sequential( nn.Linear(512 + 1, 256), # 256 + 256 + 1 (anomaly score) nn.ReLU(), nn.Dropout(dropout_rate), nn.Linear(256, 128), nn.ReLU(), nn.Dropout(dropout_rate), nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, 1), nn.Sigmoid() ) def forward(self, input_ids, attention_mask, numerical_features): # Process text with BERT bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask) text_features = self.text_projection(bert_output.pooler_output) # Process numerical features numerical_features = self.numerical_projection(numerical_features) # Anomaly detection anomaly_score = self.anomaly_detector(numerical_features) # Combine all features combined_features = torch.cat([text_features, numerical_features, anomaly_score], dim=1) # Final classification fraud_probability = self.classifier(combined_features) return fraud_probability.squeeze(), anomaly_score.squeeze() return BERTFraudDetector('bert-base-uncased', numerical_features_dim) def load_model(self): """Load the pre-trained fraud detection model""" try: print(f"šŸ”„ Loading model from {self.model_path}...") # Add safe globals for sklearn objects torch.serialization.add_safe_globals([ StandardScaler, LabelEncoder, IsolationForest ]) # Load with weights_only=False for backward compatibility # This is safe if you trust the source of the model file checkpoint = torch.load(self.model_path, map_location=self.device, weights_only=False) # Load preprocessing objects self.scaler = checkpoint['scaler'] self.label_encoder = checkpoint['label_encoder'] self.isolation_forest = checkpoint['isolation_forest'] # Create and load model numerical_features_dim = 14 # Same as training self.model = self.create_bert_fraud_model(numerical_features_dim) self.model.load_state_dict(checkpoint['model_state_dict']) self.model.to(self.device) self.model.eval() print("āœ… Model loaded successfully!") except FileNotFoundError: print(f"āŒ Error: Model file '{self.model_path}' not found!") print("Make sure you have trained and saved the model first.") raise except Exception as e: print(f"āŒ Error loading model: {str(e)}") print("If you're still getting errors, try updating PyTorch or ensure the model file is from a trusted source.") raise def tokenize_descriptions(self, descriptions, max_length=128): """Tokenize transaction descriptions for BERT""" # Convert pandas Series to list if needed if hasattr(descriptions, 'tolist'): descriptions = descriptions.tolist() elif isinstance(descriptions, str): descriptions = [descriptions] elif not isinstance(descriptions, list): descriptions = list(descriptions) # Ensure all descriptions are strings descriptions = [str(desc) for desc in descriptions] encoded = self.tokenizer( descriptions, truncation=True, padding=True, max_length=max_length, return_tensors='pt' ) return encoded['input_ids'], encoded['attention_mask'] def preprocess_single_transaction(self, transaction): """Preprocess a single transaction for prediction""" # Create DataFrame from transaction if isinstance(transaction, dict): df = pd.DataFrame([transaction]) else: df = pd.DataFrame(transaction) # Feature engineering (same as training) df['amount_log'] = np.log1p(df['amount']) df['is_weekend'] = (df['day_of_week'] >= 5).astype(int) df['is_night'] = ((df['hour'] >= 22) | (df['hour'] <= 6)).astype(int) df['high_frequency'] = (df['transaction_count_1h'] > 3).astype(int) df['amount_deviation'] = abs(df['amount'] - df['avg_amount_1h']) / (df['avg_amount_1h'] + 1) # Handle unknown categories for merchant_category try: df['merchant_category_encoded'] = self.label_encoder.transform(df['merchant_category']) except ValueError as e: print(f"āš ļø Warning: Unknown merchant category '{df['merchant_category'].iloc[0]}'. Using default value.") # Use the first category as default or assign a default encoded value df['merchant_category_encoded'] = 0 # Prepare numerical features numerical_features = ['amount_log', 'hour', 'day_of_week', 'days_since_last_transaction', 'transaction_count_1h', 'transaction_count_24h', 'avg_amount_1h', 'location_risk_score', 'account_age_days', 'merchant_category_encoded', 'is_weekend', 'is_night', 'high_frequency', 'amount_deviation'] X_numerical = self.scaler.transform(df[numerical_features]) # Process text - ensure it's a string df['processed_description'] = df['description'].astype(str).str.lower().str.replace(r'[^\w\s]', '', regex=True) return df, X_numerical def predict_fraud(self, transactions): """Predict fraud for one or more transactions""" print("šŸ” Analyzing transactions for fraud...") # Handle single transaction if isinstance(transactions, dict): transactions = [transactions] results = [] for i, transaction in enumerate(transactions): try: # Preprocess transaction df, X_numerical = self.preprocess_single_transaction(transaction) # Tokenize description - extract the actual string values processed_descriptions = df['processed_description'].tolist() input_ids, attention_masks = self.tokenize_descriptions(processed_descriptions) # Make prediction with torch.no_grad(): batch_num = torch.tensor(X_numerical).float().to(self.device) batch_ids = input_ids.to(self.device) batch_masks = attention_masks.to(self.device) fraud_prob, anomaly_score = self.model(batch_ids, batch_masks, batch_num) # Get isolation forest prediction isolation_pred = self.isolation_forest.decision_function(X_numerical) # Handle single prediction vs batch if isinstance(fraud_prob, torch.Tensor): if fraud_prob.dim() == 0: # Single prediction fraud_prob_val = fraud_prob.item() anomaly_score_val = anomaly_score.item() else: # Batch prediction fraud_prob_val = fraud_prob[0].item() anomaly_score_val = anomaly_score[0].item() else: fraud_prob_val = float(fraud_prob) anomaly_score_val = float(anomaly_score) # Combine predictions (ensemble approach) combined_score = (0.6 * fraud_prob_val + 0.3 * (1 - (isolation_pred[0] + 0.5)) + 0.1 * anomaly_score_val) # Create result result = { 'transaction_id': transaction.get('transaction_id', f'test_{i+1}'), 'amount': transaction['amount'], 'description': transaction['description'], 'fraud_probability': float(combined_score), 'is_fraud_predicted': bool(combined_score > 0.5), 'risk_level': self.get_risk_level(combined_score), 'anomaly_score': float(anomaly_score_val), 'bert_score': float(fraud_prob_val), 'isolation_score': float(isolation_pred[0]) } results.append(result) except Exception as e: print(f"āŒ Error processing transaction {i+1}: {str(e)}") import traceback traceback.print_exc() # Print full error traceback for debugging results.append({ 'transaction_id': transaction.get('transaction_id', f'test_{i+1}'), 'error': str(e) }) return results def get_risk_level(self, score): """Determine risk level based on fraud probability""" if score > 0.8: return 'CRITICAL' elif score > 0.6: return 'HIGH' elif score > 0.4: return 'MEDIUM' elif score > 0.2: return 'LOW' else: return 'MINIMAL' def display_results(self, results): """Display prediction results in a nice format""" print("\n" + "="*80) print("🚨 FRAUD DETECTION RESULTS") print("="*80) for i, result in enumerate(results): if 'error' in result: print(f"\nāŒ Transaction {i+1}: ERROR - {result['error']}") continue print(f"\nšŸ“‹ Transaction {i+1}:") print(f" ID: {result['transaction_id']}") print(f" Amount: ${result['amount']:.2f}") print(f" Description: {result['description']}") print(f" šŸŽÆ Fraud Probability: {result['fraud_probability']:.4f} ({result['fraud_probability']*100:.2f}%)") # Color-coded prediction if result['is_fraud_predicted']: print(f" 🚨 Prediction: FRAUD DETECTED") else: print(f" āœ… Prediction: LEGITIMATE") print(f" šŸ“Š Risk Level: {result['risk_level']}") print(f" šŸ” Anomaly Score: {result['anomaly_score']:.4f}") print(f" šŸ¤– BERT Score: {result['bert_score']:.4f}") print(f" šŸļø Isolation Score: {result['isolation_score']:.4f}") # Risk indicator risk_bar = "ā–ˆ" * int(result['fraud_probability'] * 20) print(f" šŸ“ˆ Risk Meter: [{risk_bar:<20}] {result['fraud_probability']*100:.1f}%") print("\n" + "="*80) def create_sample_transactions(): """Create sample transactions for testing""" return [ { 'transaction_id': 'TEST_001', 'amount': 45.67, 'merchant_category': 'grocery', 'description': 'WALMART SUPERCENTER CA 1234', 'hour': 14, 'day_of_week': 2, 'days_since_last_transaction': 1.0, 'transaction_count_1h': 1, 'transaction_count_24h': 3, 'avg_amount_1h': 50.0, 'location_risk_score': 0.1, 'account_age_days': 730 }, { 'transaction_id': 'TEST_002', 'amount': 2999.99, 'merchant_category': 'online', 'description': 'SUSPICIOUS ELECTRONICS STORE XX 9999', 'hour': 3, 'day_of_week': 6, 'days_since_last_transaction': 60.0, 'transaction_count_1h': 12, 'transaction_count_24h': 25, 'avg_amount_1h': 150.0, 'location_risk_score': 0.95, 'account_age_days': 15 }, { 'transaction_id': 'TEST_003', 'amount': 89.50, 'merchant_category': 'restaurant', 'description': 'STARBUCKS COFFEE NY 5678', 'hour': 8, 'day_of_week': 1, 'days_since_last_transaction': 0.5, 'transaction_count_1h': 1, 'transaction_count_24h': 4, 'avg_amount_1h': 85.0, 'location_risk_score': 0.2, 'account_age_days': 1095 }, { 'transaction_id': 'TEST_004', 'amount': 500.00, 'merchant_category': 'atm', 'description': 'ATM WITHDRAWAL FOREIGN COUNTRY 0000', 'hour': 23, 'day_of_week': 0, 'days_since_last_transaction': 0.1, 'transaction_count_1h': 5, 'transaction_count_24h': 8, 'avg_amount_1h': 200.0, 'location_risk_score': 0.8, 'account_age_days': 365 } ] def create_custom_transaction(): """Interactive function to create custom transaction""" print("\nšŸ› ļø CREATE CUSTOM TRANSACTION") print("-" * 40) transaction = {} try: transaction['transaction_id'] = input("Transaction ID (optional): ") or 'CUSTOM_001' transaction['amount'] = float(input("Amount ($): ")) print("Merchant categories: grocery, gas_station, restaurant, online, retail, atm") transaction['merchant_category'] = input("Merchant category: ") or 'online' transaction['description'] = input("Transaction description: ") or 'Unknown merchant' transaction['hour'] = int(input("Hour (0-23): ")) transaction['day_of_week'] = int(input("Day of week (0=Monday, 6=Sunday): ")) transaction['days_since_last_transaction'] = float(input("Days since last transaction: ")) transaction['transaction_count_1h'] = int(input("Transactions in last hour: ")) transaction['transaction_count_24h'] = int(input("Transactions in last 24 hours: ")) transaction['avg_amount_1h'] = float(input("Average amount in last hour ($): ")) transaction['location_risk_score'] = float(input("Location risk score (0-1): ")) transaction['account_age_days'] = float(input("Account age in days: ")) return transaction except ValueError as e: print(f"āŒ Invalid input: {e}") return None def main(): """Main testing function""" print("šŸš€ FRAUD DETECTION MODEL TESTER") print("="*50) # Initialize tester try: tester = FraudDetectionTester('fraud_detection_model.pth') except: print("Make sure you have the trained model file 'fraud_detection_model.pth' in the same directory!") return while True: print("\nšŸ“‹ TESTING OPTIONS:") print("1. Test with sample transactions") print("2. Create custom transaction") print("3. Test single transaction") print("4. Exit") choice = input("\nEnter your choice (1-4): ").strip() if choice == '1': # Test with sample transactions sample_transactions = create_sample_transactions() results = tester.predict_fraud(sample_transactions) tester.display_results(results) elif choice == '2': # Create custom transaction custom_transaction = create_custom_transaction() if custom_transaction: results = tester.predict_fraud([custom_transaction]) tester.display_results(results) elif choice == '3': # Quick single transaction test print("\n⚔ QUICK TRANSACTION TEST") print("-" * 30) try: quick_transaction = { 'transaction_id': 'QUICK_TEST', 'amount': float(input("Amount ($): ")), 'merchant_category': 'online', 'description': input("Description: ") or 'Unknown transaction', 'hour': int(input("Hour (0-23): ")), 'day_of_week': 2, 'days_since_last_transaction': 1.0, 'transaction_count_1h': int(input("Transactions in last hour: ")), 'transaction_count_24h': 5, 'avg_amount_1h': 100.0, 'location_risk_score': float(input("Risk score (0-1): ")), 'account_age_days': 365 } results = tester.predict_fraud([quick_transaction]) tester.display_results(results) except ValueError as e: print(f"āŒ Invalid input: {e}") elif choice == '4': print("šŸ‘‹ Goodbye!") break else: print("āŒ Invalid choice! Please enter 1-4.") if __name__ == "__main__": main()