|
import os
|
|
import base64
|
|
import json
|
|
import io
|
|
import datetime
|
|
from PIL import Image
|
|
import logging
|
|
from huggingface_hub import HfApi, CommitOperationAdd
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
HF_DATASET_NAME = "aiwithoutborders-xyz/degentic_rd0"
|
|
LOCAL_LOG_DIR = "./hf_inference_logs"
|
|
|
|
|
|
class NumpyEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, np.float32):
|
|
return float(obj)
|
|
return json.JSONEncoder.default(self, obj)
|
|
|
|
def _pil_to_base64(image: Image.Image) -> str:
|
|
"""Converts a PIL Image to a base64 string."""
|
|
|
|
if not isinstance(image, Image.Image):
|
|
raise TypeError(f"Expected a PIL Image, but received type: {type(image)}")
|
|
|
|
buffered = io.BytesIO()
|
|
|
|
if image.mode != 'RGB':
|
|
image = image.convert('RGB')
|
|
image.save(buffered, format="JPEG", quality=85)
|
|
return base64.b64encode(buffered.getvalue()).decode('utf-8')
|
|
|
|
|
|
|
|
def initialize_dataset_repo():
|
|
"""Initializes or ensures the Hugging Face dataset repository exists."""
|
|
api = HfApi(token=os.getenv("HF_TOKEN"))
|
|
try:
|
|
api.repo_info(repo_id=HF_DATASET_NAME, repo_type="dataset")
|
|
logger.info(f"Hugging Face dataset repository already exists: {HF_DATASET_NAME}")
|
|
except Exception:
|
|
logger.info(f"Creating new Hugging Face dataset repository: {HF_DATASET_NAME}")
|
|
api.create_repo(repo_id=HF_DATASET_NAME, repo_type="dataset", private=True)
|
|
return api
|
|
|
|
def log_inference_data(
|
|
original_image: Image.Image,
|
|
inference_params: dict,
|
|
model_predictions: list[dict],
|
|
ensemble_output: dict,
|
|
forensic_images: list[Image.Image],
|
|
agent_monitoring_data: dict,
|
|
human_feedback: dict = None
|
|
):
|
|
"""Logs a single inference event by uploading a JSON file to the Hugging Face dataset repository."""
|
|
try:
|
|
api = initialize_dataset_repo()
|
|
|
|
original_image_b64 = _pil_to_base64(original_image)
|
|
|
|
forensic_images_b64 = []
|
|
for img_item in forensic_images:
|
|
if img_item is not None:
|
|
if not isinstance(img_item, Image.Image):
|
|
try:
|
|
img_item = Image.fromarray(img_item)
|
|
except Exception as e:
|
|
logger.error(f"Error converting forensic image to PIL for base64 encoding: {e}")
|
|
continue
|
|
forensic_images_b64.append(_pil_to_base64(img_item))
|
|
|
|
new_entry = {
|
|
"timestamp": datetime.datetime.now().isoformat(),
|
|
"image": original_image_b64,
|
|
"inference_request": inference_params,
|
|
"model_predictions": model_predictions,
|
|
"ensemble_output": ensemble_output,
|
|
"forensic_outputs": forensic_images_b64,
|
|
"agent_monitoring_data": agent_monitoring_data,
|
|
"human_feedback": human_feedback if human_feedback is not None else {}
|
|
}
|
|
|
|
|
|
os.makedirs(LOCAL_LOG_DIR, exist_ok=True)
|
|
timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")
|
|
log_file_path = os.path.join(LOCAL_LOG_DIR, f"log_{timestamp_str}.json")
|
|
|
|
|
|
with open(log_file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(new_entry, f, cls=NumpyEncoder, indent=2)
|
|
|
|
logger.info(f"Inference data logged successfully to local file: {log_file_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to log inference data to local file: {e}") |