File size: 5,739 Bytes
e3278e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import os
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field

import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.llms.custom_httpx.http_handler import (
    get_async_httpx_client,
    httpxSpecialProvider,
)


# from here: https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-observe/how-to/logging-data-via-restful-apis#structuring-your-records
class LLMResponse(BaseModel):
    latency_ms: int
    status_code: int
    input_text: str
    output_text: str
    node_type: str
    model: str
    num_input_tokens: int
    num_output_tokens: int
    output_logprobs: Optional[Dict[str, Any]] = Field(
        default=None,
        description="Optional. When available, logprobs are used to compute Uncertainty.",
    )
    created_at: str = Field(
        ..., description='timestamp constructed in "%Y-%m-%dT%H:%M:%S" format'
    )
    tags: Optional[List[str]] = None
    user_metadata: Optional[Dict[str, Any]] = None


class GalileoObserve(CustomLogger):
    def __init__(self) -> None:
        self.in_memory_records: List[dict] = []
        self.batch_size = 1
        self.base_url = os.getenv("GALILEO_BASE_URL", None)
        self.project_id = os.getenv("GALILEO_PROJECT_ID", None)
        self.headers: Optional[Dict[str, str]] = None
        self.async_httpx_handler = get_async_httpx_client(
            llm_provider=httpxSpecialProvider.LoggingCallback
        )
        pass

    def set_galileo_headers(self):
        # following https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-observe/how-to/logging-data-via-restful-apis#logging-your-records

        headers = {
            "accept": "application/json",
            "Content-Type": "application/x-www-form-urlencoded",
        }
        galileo_login_response = litellm.module_level_client.post(
            url=f"{self.base_url}/login",
            headers=headers,
            data={
                "username": os.getenv("GALILEO_USERNAME"),
                "password": os.getenv("GALILEO_PASSWORD"),
            },
        )

        access_token = galileo_login_response.json()["access_token"]

        self.headers = {
            "accept": "application/json",
            "Content-Type": "application/json",
            "Authorization": f"Bearer {access_token}",
        }

    def get_output_str_from_response(self, response_obj, kwargs):
        output = None
        if response_obj is not None and (
            kwargs.get("call_type", None) == "embedding"
            or isinstance(response_obj, litellm.EmbeddingResponse)
        ):
            output = None
        elif response_obj is not None and isinstance(
            response_obj, litellm.ModelResponse
        ):
            output = response_obj["choices"][0]["message"].json()
        elif response_obj is not None and isinstance(
            response_obj, litellm.TextCompletionResponse
        ):
            output = response_obj.choices[0].text
        elif response_obj is not None and isinstance(
            response_obj, litellm.ImageResponse
        ):
            output = response_obj["data"]

        return output

    async def async_log_success_event(
        self, kwargs: Any, response_obj: Any, start_time: Any, end_time: Any
    ):
        verbose_logger.debug("On Async Success")

        _latency_ms = int((end_time - start_time).total_seconds() * 1000)
        _call_type = kwargs.get("call_type", "litellm")
        input_text = litellm.utils.get_formatted_prompt(
            data=kwargs, call_type=_call_type
        )

        _usage = response_obj.get("usage", {}) or {}
        num_input_tokens = _usage.get("prompt_tokens", 0)
        num_output_tokens = _usage.get("completion_tokens", 0)

        output_text = self.get_output_str_from_response(
            response_obj=response_obj, kwargs=kwargs
        )

        if output_text is not None:
            request_record = LLMResponse(
                latency_ms=_latency_ms,
                status_code=200,
                input_text=input_text,
                output_text=output_text,
                node_type=_call_type,
                model=kwargs.get("model", "-"),
                num_input_tokens=num_input_tokens,
                num_output_tokens=num_output_tokens,
                created_at=start_time.strftime(
                    "%Y-%m-%dT%H:%M:%S"
                ),  # timestamp str constructed in "%Y-%m-%dT%H:%M:%S" format
            )

            # dump to dict
            request_dict = request_record.model_dump()
            self.in_memory_records.append(request_dict)

            if len(self.in_memory_records) >= self.batch_size:
                await self.flush_in_memory_records()

    async def flush_in_memory_records(self):
        verbose_logger.debug("flushing in memory records")
        response = await self.async_httpx_handler.post(
            url=f"{self.base_url}/projects/{self.project_id}/observe/ingest",
            headers=self.headers,
            json={"records": self.in_memory_records},
        )

        if response.status_code == 200:
            verbose_logger.debug(
                "Galileo Logger:successfully flushed in memory records"
            )
            self.in_memory_records = []
        else:
            verbose_logger.debug("Galileo Logger: failed to flush in memory records")
            verbose_logger.debug(
                "Galileo Logger error=%s, status code=%s",
                response.text,
                response.status_code,
            )

    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
        verbose_logger.debug("On Async Failure")