Spaces:
GIZ
/
Running on CPU Upgrade

Asistente_EUDR / utils /logger.py
Romulan12's picture
cleaning up local files after write
6b49e60
raw
history blame
6.14 kB
import json
import logging
from datetime import datetime
from uuid import uuid4
import requests
from pathlib import Path
from datasets import load_dataset, Dataset
import os
from huggingface_hub import CommitScheduler, HfApi
import random
class ChatLogger:
def __init__(self, scheduler):
"""Initialize the chat logger with paths and configurations"""
if not scheduler:
raise ValueError("Scheduler is required")
self.scheduler = scheduler
self.json_dataset_dir = scheduler.folder_path
self.logs_path = self.json_dataset_dir / f"logs-{uuid4()}.jsonl"
def get_client_ip(self, request=None):
"""Get the client IP address from the request context"""
try:
if request:
# Try different headers that might contain the real IP
ip = request.client.host
# Check for proxy headers
forwarded_for = request.headers.get('X-Forwarded-For')
if forwarded_for:
# X-Forwarded-For can contain multiple IPs - first one is the client
ip = forwarded_for.split(',')[0].strip()
logging.debug(f"Client IP detected: {ip}")
return ip
except Exception as e:
logging.error(f"Error getting client IP: {e}")
return "127.0.0.1"
def get_client_location(self, ip_address):
"""Get geolocation info using ipapi.co"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
response = requests.get(
f'https://ipapi.co/{ip_address}/json/',
headers=headers,
timeout=5
)
if response.status_code == 200:
data = response.json()
# Add random noise between -0.01 and 0.01 degrees (roughly ±1km)
lat = data.get('latitude')
lon = data.get('longitude')
if lat is not None and lon is not None:
lat += random.uniform(-0.01, 0.01)
lon += random.uniform(-0.01, 0.01)
return {
'city': data.get('city'),
'region': data.get('region'),
'country': data.get('country_name'),
'latitude': lat,
'longitude': lon
}
elif response.status_code == 429:
logging.warning(f"Rate limit exceeded for IP lookup")
return None
else:
logging.error(f"Error in IP lookup: Status code {response.status_code}")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Request failed in IP lookup: {str(e)}")
return None
def create_log_entry(self, query, answer, retrieved_content, feedback=None, request=None):
"""Create a structured log entry with all required fields"""
timestamp = datetime.now().timestamp()
# Get client location if request is provided
ip = self.get_client_ip(request) if request else None
location = self.get_client_location(ip) if ip else None
log_entry = {
"record_id": str(uuid4()),
"session_id": str(uuid4()), # In practice, this should be passed in from the session
"time": str(timestamp),
"client_location": location,
"question": query,
"answer": answer,
"retrieved_content": retrieved_content if isinstance(retrieved_content, list) else [retrieved_content],
"feedback": feedback
}
return log_entry
def cleanup_local_files(self):
"""Delete local JSON files after successful upload"""
try:
# List all files in json_dataset directory
for file in self.json_dataset_dir.glob("*.json*"):
try:
file.unlink() # Delete file
logging.info(f"Deleted local file: {file}")
except Exception as e:
logging.error(f"Error deleting file {file}: {e}")
# Optionally remove the directory if empty
if not any(self.json_dataset_dir.iterdir()):
self.json_dataset_dir.rmdir()
logging.info("Removed empty json_dataset directory")
except Exception as e:
logging.error(f"Error in cleanup: {e}")
def save_local(self, log_entry):
"""Save log entry to local JSONL file"""
try:
# Reorder fields for consistency
field_order = [
"record_id",
"session_id",
"time",
"client_location",
"question",
"answer",
"retrieved_content",
"feedback"
]
ordered_logs = {k: log_entry.get(k) for k in field_order if k in log_entry}
with self.scheduler.lock:
with open(self.logs_path, 'a') as f:
json.dump(ordered_logs, f)
f.write('\n')
logging.info("Log entry saved")
# After successful write, trigger cleanup
self.cleanup_local_files()
return True
except Exception as e:
logging.error(f"Error saving to local file: {str(e)}")
return False
def log(self, query, answer, retrieved_content, feedback=None, request=None):
"""Main logging method that handles both local and HF storage"""
# Create log entry
log_entry = self.create_log_entry(
query=query,
answer=answer,
retrieved_content=retrieved_content,
feedback=feedback,
request=request
)
# Save locally with thread safety
return self.save_local(log_entry)