Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import json | |
import logging | |
from datetime import datetime | |
from uuid import uuid4 | |
import requests | |
from pathlib import Path | |
from datasets import load_dataset, Dataset | |
import os | |
from huggingface_hub import CommitScheduler, HfApi | |
import random | |
class ChatLogger: | |
def __init__(self, scheduler): | |
"""Initialize the chat logger with paths and configurations""" | |
if not scheduler: | |
raise ValueError("Scheduler is required") | |
self.scheduler = scheduler | |
self.json_dataset_dir = scheduler.folder_path | |
self.logs_path = self.json_dataset_dir / f"logs-{uuid4()}.jsonl" | |
def get_client_ip(self, request=None): | |
"""Get the client IP address from the request context""" | |
try: | |
if request: | |
# Try different headers that might contain the real IP | |
ip = request.client.host | |
# Check for proxy headers | |
forwarded_for = request.headers.get('X-Forwarded-For') | |
if forwarded_for: | |
# X-Forwarded-For can contain multiple IPs - first one is the client | |
ip = forwarded_for.split(',')[0].strip() | |
logging.debug(f"Client IP detected: {ip}") | |
return ip | |
except Exception as e: | |
logging.error(f"Error getting client IP: {e}") | |
return "127.0.0.1" | |
def get_client_location(self, ip_address): | |
"""Get geolocation info using ipapi.co""" | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
} | |
try: | |
response = requests.get( | |
f'https://ipapi.co/{ip_address}/json/', | |
headers=headers, | |
timeout=5 | |
) | |
if response.status_code == 200: | |
data = response.json() | |
# Add random noise between -0.01 and 0.01 degrees (roughly ±1km) | |
lat = data.get('latitude') | |
lon = data.get('longitude') | |
if lat is not None and lon is not None: | |
lat += random.uniform(-0.01, 0.01) | |
lon += random.uniform(-0.01, 0.01) | |
return { | |
'city': data.get('city'), | |
'region': data.get('region'), | |
'country': data.get('country_name'), | |
'latitude': lat, | |
'longitude': lon | |
} | |
elif response.status_code == 429: | |
logging.warning(f"Rate limit exceeded for IP lookup") | |
return None | |
else: | |
logging.error(f"Error in IP lookup: Status code {response.status_code}") | |
return None | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Request failed in IP lookup: {str(e)}") | |
return None | |
def create_log_entry(self, query, answer, retrieved_content, feedback=None, request=None): | |
"""Create a structured log entry with all required fields""" | |
timestamp = datetime.now().timestamp() | |
# Get client location if request is provided | |
ip = self.get_client_ip(request) if request else None | |
location = self.get_client_location(ip) if ip else None | |
log_entry = { | |
"record_id": str(uuid4()), | |
"session_id": str(uuid4()), # In practice, this should be passed in from the session | |
"time": str(timestamp), | |
"client_location": location, | |
"question": query, | |
"answer": answer, | |
"retrieved_content": retrieved_content if isinstance(retrieved_content, list) else [retrieved_content], | |
"feedback": feedback | |
} | |
return log_entry | |
def cleanup_local_files(self): | |
"""Delete local JSON files after successful upload""" | |
try: | |
# List all files in json_dataset directory | |
for file in self.json_dataset_dir.glob("*.json*"): | |
try: | |
file.unlink() # Delete file | |
logging.info(f"Deleted local file: {file}") | |
except Exception as e: | |
logging.error(f"Error deleting file {file}: {e}") | |
# Optionally remove the directory if empty | |
if not any(self.json_dataset_dir.iterdir()): | |
self.json_dataset_dir.rmdir() | |
logging.info("Removed empty json_dataset directory") | |
except Exception as e: | |
logging.error(f"Error in cleanup: {e}") | |
def save_local(self, log_entry): | |
"""Save log entry to local JSONL file""" | |
try: | |
# Reorder fields for consistency | |
field_order = [ | |
"record_id", | |
"session_id", | |
"time", | |
"client_location", | |
"question", | |
"answer", | |
"retrieved_content", | |
"feedback" | |
] | |
ordered_logs = {k: log_entry.get(k) for k in field_order if k in log_entry} | |
with self.scheduler.lock: | |
with open(self.logs_path, 'a') as f: | |
json.dump(ordered_logs, f) | |
f.write('\n') | |
logging.info("Log entry saved") | |
# After successful write, trigger cleanup | |
self.cleanup_local_files() | |
return True | |
except Exception as e: | |
logging.error(f"Error saving to local file: {str(e)}") | |
return False | |
def log(self, query, answer, retrieved_content, feedback=None, request=None): | |
"""Main logging method that handles both local and HF storage""" | |
# Create log entry | |
log_entry = self.create_log_entry( | |
query=query, | |
answer=answer, | |
retrieved_content=retrieved_content, | |
feedback=feedback, | |
request=request | |
) | |
# Save locally with thread safety | |
return self.save_local(log_entry) | |