from __future__ import annotations import asyncio import os import platform import sys from datetime import datetime, timezone from typing import TYPE_CHECKING import httpx from loguru import logger from langflow.services.base import Service from langflow.services.telemetry.opentelemetry import OpenTelemetry from langflow.services.telemetry.schema import ( ComponentPayload, PlaygroundPayload, RunPayload, ShutdownPayload, VersionPayload, ) from langflow.utils.version import get_version_info if TYPE_CHECKING: from pydantic import BaseModel from langflow.services.settings.service import SettingsService class TelemetryService(Service): name = "telemetry_service" def __init__(self, settings_service: SettingsService): super().__init__() self.settings_service = settings_service self.base_url = settings_service.settings.telemetry_base_url self.telemetry_queue: asyncio.Queue = asyncio.Queue() self.client = httpx.AsyncClient(timeout=10.0) # Set a reasonable timeout self.running = False self._stopping = False self.ot = OpenTelemetry(prometheus_enabled=settings_service.settings.prometheus_enabled) self.architecture: str | None = None self.worker_task: asyncio.Task | None = None # Check for do-not-track settings self.do_not_track = ( os.getenv("DO_NOT_TRACK", "False").lower() == "true" or settings_service.settings.do_not_track ) async def telemetry_worker(self) -> None: while self.running: func, payload, path = await self.telemetry_queue.get() try: await func(payload, path) except Exception: # noqa: BLE001 logger.exception("Error sending telemetry data") finally: self.telemetry_queue.task_done() async def send_telemetry_data(self, payload: BaseModel, path: str | None = None) -> None: if self.do_not_track: logger.debug("Telemetry tracking is disabled.") return url = f"{self.base_url}" if path: url = f"{url}/{path}" try: payload_dict = payload.model_dump(by_alias=True, exclude_none=True, exclude_unset=True) response = await self.client.get(url, params=payload_dict) if response.status_code != httpx.codes.OK: logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}") else: logger.debug("Telemetry data sent successfully.") except httpx.HTTPStatusError: logger.exception("HTTP error occurred") except httpx.RequestError: logger.exception("Request error occurred") except Exception: # noqa: BLE001 logger.exception("Unexpected error occurred") async def log_package_run(self, payload: RunPayload) -> None: await self._queue_event((self.send_telemetry_data, payload, "run")) async def log_package_shutdown(self) -> None: payload = ShutdownPayload(time_running=(datetime.now(timezone.utc) - self._start_time).seconds) await self._queue_event(payload) async def _queue_event(self, payload) -> None: if self.do_not_track or self._stopping: return await self.telemetry_queue.put(payload) async def log_package_version(self) -> None: python_version = ".".join(platform.python_version().split(".")[:2]) version_info = get_version_info() if self.architecture is None: self.architecture = (await asyncio.to_thread(platform.architecture))[0] payload = VersionPayload( package=version_info["package"].lower(), version=version_info["version"], platform=platform.platform(), python=python_version, cache_type=self.settings_service.settings.cache_type, backend_only=self.settings_service.settings.backend_only, arch=self.architecture, auto_login=self.settings_service.auth_settings.AUTO_LOGIN, ) await self._queue_event((self.send_telemetry_data, payload, None)) async def log_package_playground(self, payload: PlaygroundPayload) -> None: await self._queue_event((self.send_telemetry_data, payload, "playground")) async def log_package_component(self, payload: ComponentPayload) -> None: await self._queue_event((self.send_telemetry_data, payload, "component")) def start(self) -> None: if self.running or self.do_not_track: return try: self.running = True self._start_time = datetime.now(timezone.utc) self.worker_task = asyncio.create_task(self.telemetry_worker()) self.log_package_version_task = asyncio.create_task(self.log_package_version()) except Exception: # noqa: BLE001 logger.exception("Error starting telemetry service") async def flush(self) -> None: if self.do_not_track: return try: await self.telemetry_queue.join() except Exception: # noqa: BLE001 logger.exception("Error flushing logs") async def _cancel_task(self, task: asyncio.Task, cancel_msg: str) -> None: task.cancel(cancel_msg) try: await task except asyncio.CancelledError: current_task = asyncio.current_task() if sys.version_info >= (3, 11): if current_task and current_task.cancelling() > 0: raise elif current_task and hasattr(current_task, "_must_cancel") and current_task._must_cancel: raise async def stop(self) -> None: if self.do_not_track or self._stopping: return try: self._stopping = True # flush all the remaining events and then stop await self.flush() self.running = False if self.worker_task: await self._cancel_task(self.worker_task, "Cancel telemetry worker task") if self.log_package_version_task: await self._cancel_task(self.log_package_version_task, "Cancel telemetry log package version task") await self.client.aclose() except Exception: # noqa: BLE001 logger.exception("Error stopping tracing service") async def teardown(self) -> None: await self.stop()