from __future__ import annotations import importlib import inspect from typing import TYPE_CHECKING from loguru import logger from langflow.utils.concurrency import KeyedMemoryLockManager if TYPE_CHECKING: from langflow.services.base import Service from langflow.services.factory import ServiceFactory from langflow.services.schema import ServiceType class NoFactoryRegisteredError(Exception): pass class ServiceManager: """Manages the creation of different services.""" def __init__(self) -> None: self.services: dict[str, Service] = {} self.factories: dict[str, ServiceFactory] = {} self.register_factories() self.keyed_lock = KeyedMemoryLockManager() def register_factories(self) -> None: for factory in self.get_factories(): try: self.register_factory(factory) except Exception: # noqa: BLE001 logger.exception(f"Error initializing {factory}") def register_factory( self, service_factory: ServiceFactory, ) -> None: """Registers a new factory with dependencies.""" service_name = service_factory.service_class.name self.factories[service_name] = service_factory def get(self, service_name: ServiceType, default: ServiceFactory | None = None) -> Service: """Get (or create) a service by its name.""" with self.keyed_lock.lock(service_name): if service_name not in self.services: self._create_service(service_name, default) return self.services[service_name] def _create_service(self, service_name: ServiceType, default: ServiceFactory | None = None) -> None: """Create a new service given its name, handling dependencies.""" logger.debug(f"Create service {service_name}") self._validate_service_creation(service_name, default) # Create dependencies first factory = self.factories.get(service_name) if factory is None and default is not None: self.register_factory(default) factory = default if factory is None: msg = f"No factory registered for {service_name}" raise NoFactoryRegisteredError(msg) for dependency in factory.dependencies: if dependency not in self.services: self._create_service(dependency) # Collect the dependent services dependent_services = {dep.value: self.services[dep] for dep in factory.dependencies} # Create the actual service self.services[service_name] = self.factories[service_name].create(**dependent_services) self.services[service_name].set_ready() def _validate_service_creation(self, service_name: ServiceType, default: ServiceFactory | None = None) -> None: """Validate whether the service can be created.""" if service_name not in self.factories and default is None: msg = f"No factory registered for the service class '{service_name.name}'" raise NoFactoryRegisteredError(msg) def update(self, service_name: ServiceType) -> None: """Update a service by its name.""" if service_name in self.services: logger.debug(f"Update service {service_name}") self.services.pop(service_name, None) self.get(service_name) async def teardown(self) -> None: """Teardown all the services.""" for service in self.services.values(): if service is None: continue logger.debug(f"Teardown service {service.name}") try: await service.teardown() except Exception as exc: # noqa: BLE001 logger.exception(exc) self.services = {} self.factories = {} @staticmethod def get_factories(): from langflow.services.factory import ServiceFactory from langflow.services.schema import ServiceType service_names = [ServiceType(service_type).value.replace("_service", "") for service_type in ServiceType] base_module = "langflow.services" factories = [] for name in service_names: try: module_name = f"{base_module}.{name}.factory" module = importlib.import_module(module_name) # Find all classes in the module that are subclasses of ServiceFactory for _, obj in inspect.getmembers(module, inspect.isclass): if issubclass(obj, ServiceFactory) and obj is not ServiceFactory: factories.append(obj()) break except Exception as exc: logger.exception(exc) msg = f"Could not initialize services. Please check your settings. Error in {name}." raise RuntimeError(msg) from exc return factories service_manager = ServiceManager() def initialize_settings_service() -> None: """Initialize the settings manager.""" from langflow.services.settings import factory as settings_factory service_manager.register_factory(settings_factory.SettingsServiceFactory()) def initialize_session_service() -> None: """Initialize the session manager.""" from langflow.services.cache import factory as cache_factory from langflow.services.session import factory as session_service_factory initialize_settings_service() service_manager.register_factory(cache_factory.CacheServiceFactory()) service_manager.register_factory(session_service_factory.SessionServiceFactory())