File size: 4,271 Bytes
eb56a55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import os
import base64
import json
import io
import datetime
from PIL import Image
import logging
from huggingface_hub import HfApi, CommitOperationAdd # Keep HfApi for repo creation, but remove CommitOperationAdd for direct upload
import numpy as np

logger = logging.getLogger(__name__)

HF_DATASET_NAME = "aiwithoutborders-xyz/degentic_rd0"
LOCAL_LOG_DIR = "./hf_inference_logs" # Define a local directory to store logs

# Custom JSON Encoder to handle numpy types
class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.float32):
            return float(obj)
        return json.JSONEncoder.default(self, obj)

def _pil_to_base64(image: Image.Image) -> str:
    """Converts a PIL Image to a base64 string."""
    # Explicitly check if the input is a PIL Image
    if not isinstance(image, Image.Image):
        raise TypeError(f"Expected a PIL Image, but received type: {type(image)}")

    buffered = io.BytesIO()
    # Ensure image is in RGB mode before saving as JPEG
    if image.mode != 'RGB':
        image = image.convert('RGB')
    image.save(buffered, format="JPEG", quality=85)
    return base64.b64encode(buffered.getvalue()).decode('utf-8')

# The initialize_dataset function will change significantly or be removed/simplified
# as we are no longer appending to a datasets.Dataset object directly in memory
def initialize_dataset_repo():
    """Initializes or ensures the Hugging Face dataset repository exists."""
    api = HfApi(token=os.getenv("HF_TOKEN"))
    try:
        api.repo_info(repo_id=HF_DATASET_NAME, repo_type="dataset")
        logger.info(f"Hugging Face dataset repository already exists: {HF_DATASET_NAME}")
    except Exception:
        logger.info(f"Creating new Hugging Face dataset repository: {HF_DATASET_NAME}")
        api.create_repo(repo_id=HF_DATASET_NAME, repo_type="dataset", private=True)
    return api # Return the API object for subsequent operations

def log_inference_data(

    original_image: Image.Image,

    inference_params: dict,

    model_predictions: list[dict],

    ensemble_output: dict,

    forensic_images: list[Image.Image],

    agent_monitoring_data: dict,

    human_feedback: dict = None

):
    """Logs a single inference event by uploading a JSON file to the Hugging Face dataset repository."""
    try:
        api = initialize_dataset_repo() # Get or create the repository

        original_image_b64 = _pil_to_base64(original_image)

        forensic_images_b64 = []
        for img_item in forensic_images:
            if img_item is not None:
                if not isinstance(img_item, Image.Image):
                    try:
                        img_item = Image.fromarray(img_item)
                    except Exception as e:
                        logger.error(f"Error converting forensic image to PIL for base64 encoding: {e}")
                        continue
                forensic_images_b64.append(_pil_to_base64(img_item))

        new_entry = {
            "timestamp": datetime.datetime.now().isoformat(),
            "image": original_image_b64,
            "inference_request": inference_params,
            "model_predictions": model_predictions,
            "ensemble_output": ensemble_output,
            "forensic_outputs": forensic_images_b64,
            "agent_monitoring_data": agent_monitoring_data,
            "human_feedback": human_feedback if human_feedback is not None else {}
        }
        
        # Define a unique path for the new log file within the local directory
        os.makedirs(LOCAL_LOG_DIR, exist_ok=True) # Ensure the local directory exists
        timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")
        log_file_path = os.path.join(LOCAL_LOG_DIR, f"log_{timestamp_str}.json")
        
        # Serialize the new entry to a JSON file using the custom encoder
        with open(log_file_path, 'w', encoding='utf-8') as f:
            json.dump(new_entry, f, cls=NumpyEncoder, indent=2)
        
        logger.info(f"Inference data logged successfully to local file: {log_file_path}")

    except Exception as e:
        logger.error(f"Failed to log inference data to local file: {e}")