import json import logging from datetime import datetime from uuid import uuid4 import requests from pathlib import Path from datasets import load_dataset, Dataset import os from huggingface_hub import CommitScheduler, HfApi import random class ChatLogger: def __init__(self, scheduler): """Initialize the chat logger with paths and configurations""" if not scheduler: raise ValueError("Scheduler is required") self.scheduler = scheduler self.json_dataset_dir = scheduler.folder_path self.logs_path = self.json_dataset_dir / f"logs-{uuid4()}.jsonl" def get_client_ip(self, request=None): """Get the client IP address from the request context""" try: if request: # Try different headers that might contain the real IP ip = request.client.host # Check for proxy headers forwarded_for = request.headers.get('X-Forwarded-For') if forwarded_for: # X-Forwarded-For can contain multiple IPs - first one is the client ip = forwarded_for.split(',')[0].strip() logging.debug(f"Client IP detected: {ip}") return ip except Exception as e: logging.error(f"Error getting client IP: {e}") return "127.0.0.1" def get_client_location(self, ip_address): """Get geolocation info using ipapi.co""" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } try: response = requests.get( f'https://ipapi.co/{ip_address}/json/', headers=headers, timeout=5 ) if response.status_code == 200: data = response.json() # Add random noise between -0.01 and 0.01 degrees (roughly ±1km) lat = data.get('latitude') lon = data.get('longitude') if lat is not None and lon is not None: lat += random.uniform(-0.01, 0.01) lon += random.uniform(-0.01, 0.01) return { 'city': data.get('city'), 'region': data.get('region'), 'country': data.get('country_name'), 'latitude': lat, 'longitude': lon } elif response.status_code == 429: logging.warning(f"Rate limit exceeded for IP lookup") return None else: logging.error(f"Error in IP lookup: Status code {response.status_code}") return None except requests.exceptions.RequestException as e: logging.error(f"Request failed in IP lookup: {str(e)}") return None def create_log_entry(self, query, answer, retrieved_content, feedback=None, request=None): """Create a structured log entry with all required fields""" timestamp = datetime.now().timestamp() # Get client location if request is provided ip = self.get_client_ip(request) if request else None location = self.get_client_location(ip) if ip else None log_entry = { "record_id": str(uuid4()), "session_id": str(uuid4()), # In practice, this should be passed in from the session "time": str(timestamp), "client_location": location, "question": query, "answer": answer, "retrieved_content": retrieved_content if isinstance(retrieved_content, list) else [retrieved_content], "feedback": feedback } return log_entry def cleanup_local_files(self): """Delete local JSON files after successful upload""" try: # List all files in json_dataset directory for file in self.json_dataset_dir.glob("*.json*"): try: file.unlink() # Delete file logging.info(f"Deleted local file: {file}") except Exception as e: logging.error(f"Error deleting file {file}: {e}") # Optionally remove the directory if empty if not any(self.json_dataset_dir.iterdir()): self.json_dataset_dir.rmdir() logging.info("Removed empty json_dataset directory") except Exception as e: logging.error(f"Error in cleanup: {e}") def save_local(self, log_entry): """Save log entry to local JSONL file""" try: # Reorder fields for consistency field_order = [ "record_id", "session_id", "time", "client_location", "question", "answer", "retrieved_content", "feedback" ] ordered_logs = {k: log_entry.get(k) for k in field_order if k in log_entry} with self.scheduler.lock: with open(self.logs_path, 'a') as f: json.dump(ordered_logs, f) f.write('\n') logging.info("Log entry saved") # After successful write, trigger cleanup self.cleanup_local_files() return True except Exception as e: logging.error(f"Error saving to local file: {str(e)}") return False def log(self, query, answer, retrieved_content, feedback=None, request=None): """Main logging method that handles both local and HF storage""" # Create log entry log_entry = self.create_log_entry( query=query, answer=answer, retrieved_content=retrieved_content, feedback=feedback, request=request ) # Save locally with thread safety return self.save_local(log_entry)