Spaces:
Running
Running
from __future__ import annotations | |
import importlib | |
import inspect | |
from typing import TYPE_CHECKING | |
from loguru import logger | |
from langflow.utils.concurrency import KeyedMemoryLockManager | |
if TYPE_CHECKING: | |
from langflow.services.base import Service | |
from langflow.services.factory import ServiceFactory | |
from langflow.services.schema import ServiceType | |
class NoFactoryRegisteredError(Exception): | |
pass | |
class ServiceManager: | |
"""Manages the creation of different services.""" | |
def __init__(self) -> None: | |
self.services: dict[str, Service] = {} | |
self.factories: dict[str, ServiceFactory] = {} | |
self.register_factories() | |
self.keyed_lock = KeyedMemoryLockManager() | |
def register_factories(self) -> None: | |
for factory in self.get_factories(): | |
try: | |
self.register_factory(factory) | |
except Exception: # noqa: BLE001 | |
logger.exception(f"Error initializing {factory}") | |
def register_factory( | |
self, | |
service_factory: ServiceFactory, | |
) -> None: | |
"""Registers a new factory with dependencies.""" | |
service_name = service_factory.service_class.name | |
self.factories[service_name] = service_factory | |
def get(self, service_name: ServiceType, default: ServiceFactory | None = None) -> Service: | |
"""Get (or create) a service by its name.""" | |
with self.keyed_lock.lock(service_name): | |
if service_name not in self.services: | |
self._create_service(service_name, default) | |
return self.services[service_name] | |
def _create_service(self, service_name: ServiceType, default: ServiceFactory | None = None) -> None: | |
"""Create a new service given its name, handling dependencies.""" | |
logger.debug(f"Create service {service_name}") | |
self._validate_service_creation(service_name, default) | |
# Create dependencies first | |
factory = self.factories.get(service_name) | |
if factory is None and default is not None: | |
self.register_factory(default) | |
factory = default | |
if factory is None: | |
msg = f"No factory registered for {service_name}" | |
raise NoFactoryRegisteredError(msg) | |
for dependency in factory.dependencies: | |
if dependency not in self.services: | |
self._create_service(dependency) | |
# Collect the dependent services | |
dependent_services = {dep.value: self.services[dep] for dep in factory.dependencies} | |
# Create the actual service | |
self.services[service_name] = self.factories[service_name].create(**dependent_services) | |
self.services[service_name].set_ready() | |
def _validate_service_creation(self, service_name: ServiceType, default: ServiceFactory | None = None) -> None: | |
"""Validate whether the service can be created.""" | |
if service_name not in self.factories and default is None: | |
msg = f"No factory registered for the service class '{service_name.name}'" | |
raise NoFactoryRegisteredError(msg) | |
def update(self, service_name: ServiceType) -> None: | |
"""Update a service by its name.""" | |
if service_name in self.services: | |
logger.debug(f"Update service {service_name}") | |
self.services.pop(service_name, None) | |
self.get(service_name) | |
async def teardown(self) -> None: | |
"""Teardown all the services.""" | |
for service in self.services.values(): | |
if service is None: | |
continue | |
logger.debug(f"Teardown service {service.name}") | |
try: | |
await service.teardown() | |
except Exception as exc: # noqa: BLE001 | |
logger.exception(exc) | |
self.services = {} | |
self.factories = {} | |
def get_factories(): | |
from langflow.services.factory import ServiceFactory | |
from langflow.services.schema import ServiceType | |
service_names = [ServiceType(service_type).value.replace("_service", "") for service_type in ServiceType] | |
base_module = "langflow.services" | |
factories = [] | |
for name in service_names: | |
try: | |
module_name = f"{base_module}.{name}.factory" | |
module = importlib.import_module(module_name) | |
# Find all classes in the module that are subclasses of ServiceFactory | |
for _, obj in inspect.getmembers(module, inspect.isclass): | |
if issubclass(obj, ServiceFactory) and obj is not ServiceFactory: | |
factories.append(obj()) | |
break | |
except Exception as exc: | |
logger.exception(exc) | |
msg = f"Could not initialize services. Please check your settings. Error in {name}." | |
raise RuntimeError(msg) from exc | |
return factories | |
service_manager = ServiceManager() | |
def initialize_settings_service() -> None: | |
"""Initialize the settings manager.""" | |
from langflow.services.settings import factory as settings_factory | |
service_manager.register_factory(settings_factory.SettingsServiceFactory()) | |
def initialize_session_service() -> None: | |
"""Initialize the session manager.""" | |
from langflow.services.cache import factory as cache_factory | |
from langflow.services.session import factory as session_service_factory | |
initialize_settings_service() | |
service_manager.register_factory(cache_factory.CacheServiceFactory()) | |
service_manager.register_factory(session_service_factory.SessionServiceFactory()) | |