Spaces:
Running
Running
from __future__ import annotations | |
import asyncio | |
import os | |
import platform | |
import sys | |
from datetime import datetime, timezone | |
from typing import TYPE_CHECKING | |
import httpx | |
from loguru import logger | |
from langflow.services.base import Service | |
from langflow.services.telemetry.opentelemetry import OpenTelemetry | |
from langflow.services.telemetry.schema import ( | |
ComponentPayload, | |
PlaygroundPayload, | |
RunPayload, | |
ShutdownPayload, | |
VersionPayload, | |
) | |
from langflow.utils.version import get_version_info | |
if TYPE_CHECKING: | |
from pydantic import BaseModel | |
from langflow.services.settings.service import SettingsService | |
class TelemetryService(Service): | |
name = "telemetry_service" | |
def __init__(self, settings_service: SettingsService): | |
super().__init__() | |
self.settings_service = settings_service | |
self.base_url = settings_service.settings.telemetry_base_url | |
self.telemetry_queue: asyncio.Queue = asyncio.Queue() | |
self.client = httpx.AsyncClient(timeout=10.0) # Set a reasonable timeout | |
self.running = False | |
self._stopping = False | |
self.ot = OpenTelemetry(prometheus_enabled=settings_service.settings.prometheus_enabled) | |
self.architecture: str | None = None | |
self.worker_task: asyncio.Task | None = None | |
# Check for do-not-track settings | |
self.do_not_track = ( | |
os.getenv("DO_NOT_TRACK", "False").lower() == "true" or settings_service.settings.do_not_track | |
) | |
async def telemetry_worker(self) -> None: | |
while self.running: | |
func, payload, path = await self.telemetry_queue.get() | |
try: | |
await func(payload, path) | |
except Exception: # noqa: BLE001 | |
logger.exception("Error sending telemetry data") | |
finally: | |
self.telemetry_queue.task_done() | |
async def send_telemetry_data(self, payload: BaseModel, path: str | None = None) -> None: | |
if self.do_not_track: | |
logger.debug("Telemetry tracking is disabled.") | |
return | |
url = f"{self.base_url}" | |
if path: | |
url = f"{url}/{path}" | |
try: | |
payload_dict = payload.model_dump(by_alias=True, exclude_none=True, exclude_unset=True) | |
response = await self.client.get(url, params=payload_dict) | |
if response.status_code != httpx.codes.OK: | |
logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}") | |
else: | |
logger.debug("Telemetry data sent successfully.") | |
except httpx.HTTPStatusError: | |
logger.exception("HTTP error occurred") | |
except httpx.RequestError: | |
logger.exception("Request error occurred") | |
except Exception: # noqa: BLE001 | |
logger.exception("Unexpected error occurred") | |
async def log_package_run(self, payload: RunPayload) -> None: | |
await self._queue_event((self.send_telemetry_data, payload, "run")) | |
async def log_package_shutdown(self) -> None: | |
payload = ShutdownPayload(time_running=(datetime.now(timezone.utc) - self._start_time).seconds) | |
await self._queue_event(payload) | |
async def _queue_event(self, payload) -> None: | |
if self.do_not_track or self._stopping: | |
return | |
await self.telemetry_queue.put(payload) | |
async def log_package_version(self) -> None: | |
python_version = ".".join(platform.python_version().split(".")[:2]) | |
version_info = get_version_info() | |
if self.architecture is None: | |
self.architecture = (await asyncio.to_thread(platform.architecture))[0] | |
payload = VersionPayload( | |
package=version_info["package"].lower(), | |
version=version_info["version"], | |
platform=platform.platform(), | |
python=python_version, | |
cache_type=self.settings_service.settings.cache_type, | |
backend_only=self.settings_service.settings.backend_only, | |
arch=self.architecture, | |
auto_login=self.settings_service.auth_settings.AUTO_LOGIN, | |
) | |
await self._queue_event((self.send_telemetry_data, payload, None)) | |
async def log_package_playground(self, payload: PlaygroundPayload) -> None: | |
await self._queue_event((self.send_telemetry_data, payload, "playground")) | |
async def log_package_component(self, payload: ComponentPayload) -> None: | |
await self._queue_event((self.send_telemetry_data, payload, "component")) | |
def start(self) -> None: | |
if self.running or self.do_not_track: | |
return | |
try: | |
self.running = True | |
self._start_time = datetime.now(timezone.utc) | |
self.worker_task = asyncio.create_task(self.telemetry_worker()) | |
self.log_package_version_task = asyncio.create_task(self.log_package_version()) | |
except Exception: # noqa: BLE001 | |
logger.exception("Error starting telemetry service") | |
async def flush(self) -> None: | |
if self.do_not_track: | |
return | |
try: | |
await self.telemetry_queue.join() | |
except Exception: # noqa: BLE001 | |
logger.exception("Error flushing logs") | |
async def _cancel_task(self, task: asyncio.Task, cancel_msg: str) -> None: | |
task.cancel(cancel_msg) | |
try: | |
await task | |
except asyncio.CancelledError: | |
current_task = asyncio.current_task() | |
if sys.version_info >= (3, 11): | |
if current_task and current_task.cancelling() > 0: | |
raise | |
elif current_task and hasattr(current_task, "_must_cancel") and current_task._must_cancel: | |
raise | |
async def stop(self) -> None: | |
if self.do_not_track or self._stopping: | |
return | |
try: | |
self._stopping = True | |
# flush all the remaining events and then stop | |
await self.flush() | |
self.running = False | |
if self.worker_task: | |
await self._cancel_task(self.worker_task, "Cancel telemetry worker task") | |
if self.log_package_version_task: | |
await self._cancel_task(self.log_package_version_task, "Cancel telemetry log package version task") | |
await self.client.aclose() | |
except Exception: # noqa: BLE001 | |
logger.exception("Error stopping tracing service") | |
async def teardown(self) -> None: | |
await self.stop() | |