Tai Truong
fix readme
d202ada
from __future__ import annotations
from typing import TYPE_CHECKING
from langflow.logging.logger import logger
from langflow.services.cache.disk import AsyncDiskCache
from langflow.services.cache.service import AsyncInMemoryCache, CacheService, RedisCache, ThreadingInMemoryCache
from langflow.services.factory import ServiceFactory
if TYPE_CHECKING:
from langflow.services.settings.service import SettingsService
class CacheServiceFactory(ServiceFactory):
def __init__(self) -> None:
super().__init__(CacheService)
def create(self, settings_service: SettingsService):
# Here you would have logic to create and configure a CacheService
# based on the settings_service
if settings_service.settings.cache_type == "redis":
logger.debug("Creating Redis cache")
redis_cache: RedisCache = RedisCache(
host=settings_service.settings.redis_host,
port=settings_service.settings.redis_port,
db=settings_service.settings.redis_db,
url=settings_service.settings.redis_url,
expiration_time=settings_service.settings.redis_cache_expire,
)
if redis_cache.is_connected():
logger.debug("Redis cache is connected")
return redis_cache
# do not attempt to fallback to another cache type
msg = "Failed to connect to Redis cache"
raise ConnectionError(msg)
if settings_service.settings.cache_type == "memory":
return ThreadingInMemoryCache(expiration_time=settings_service.settings.cache_expire)
if settings_service.settings.cache_type == "async":
return AsyncInMemoryCache(expiration_time=settings_service.settings.cache_expire)
if settings_service.settings.cache_type == "disk":
return AsyncDiskCache(
cache_dir=settings_service.settings.config_dir,
expiration_time=settings_service.settings.cache_expire,
)
return None