from __future__ import annotations import json from typing import TYPE_CHECKING, Any from uuid import UUID import httpx from httpx import HTTPError, HTTPStatusError from loguru import logger from langflow.services.base import Service from langflow.services.store.exceptions import APIKeyError, FilterError, ForbiddenError from langflow.services.store.schema import ( CreateComponentResponse, DownloadComponentResponse, ListComponentResponse, ListComponentResponseModel, StoreComponentCreate, ) from langflow.services.store.utils import ( process_component_data, process_tags_for_post, update_components_with_user_data, ) if TYPE_CHECKING: from langflow.services.settings.service import SettingsService from contextlib import asynccontextmanager from contextvars import ContextVar user_data_var: ContextVar[dict[str, Any] | None] = ContextVar("user_data", default=None) @asynccontextmanager async def user_data_context(store_service: StoreService, api_key: str | None = None): # Fetch and set user data to the context variable if api_key: try: user_data, _ = await store_service.get( f"{store_service.base_url}/users/me", api_key, params={"fields": "id"} ) user_data_var.set(user_data[0]) except HTTPStatusError as exc: if exc.response.status_code == httpx.codes.FORBIDDEN: msg = "Invalid API key" raise ValueError(msg) from exc try: yield finally: # Clear the user data from the context variable user_data_var.set(None) def get_id_from_search_string(search_string: str) -> str | None: """Extracts the ID from a search string. Args: search_string (str): The search string to extract the ID from. Returns: Optional[str]: The extracted ID, or None if no ID is found. """ possible_id: str | None = search_string if "www.langflow.store/store/" in search_string: possible_id = search_string.split("/")[-1] try: possible_id = str(UUID(search_string)) except ValueError: possible_id = None return possible_id class StoreService(Service): """This is a service that integrates langflow with the store which is a Directus instance. It allows to search, get and post components to the store. """ name = "store_service" def __init__(self, settings_service: SettingsService): self.settings_service = settings_service self.base_url = self.settings_service.settings.store_url self.download_webhook_url = self.settings_service.settings.download_webhook_url self.like_webhook_url = self.settings_service.settings.like_webhook_url self.components_url = f"{self.base_url}/items/components" self.default_fields = [ "id", "name", "description", "user_created.username", "is_component", "tags.tags_id.name", "tags.tags_id.id", "count(liked_by)", "count(downloads)", "metadata", "last_tested_version", "private", ] self.timeout = 30 # Create a context manager that will use the api key to # get the user data and all requests inside the context manager # will make a property return that data # Without making the request multiple times async def check_api_key(self, api_key: str): # Check if the api key is valid # If it is, return True # If it is not, return False try: user_data, _ = await self.get(f"{self.base_url}/users/me", api_key, params={"fields": "id"}) return "id" in user_data[0] except HTTPStatusError as exc: if exc.response.status_code in {403, 401}: return False msg = f"Unexpected status code: {exc.response.status_code}" raise ValueError(msg) from exc except Exception as exc: msg = f"Unexpected error: {exc}" raise ValueError(msg) from exc async def get( self, url: str, api_key: str | None = None, params: dict[str, Any] | None = None ) -> tuple[list[dict[str, Any]], dict[str, Any]]: """Utility method to perform GET requests.""" headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} async with httpx.AsyncClient() as client: try: response = await client.get(url, headers=headers, params=params, timeout=self.timeout) response.raise_for_status() except HTTPError: raise except Exception as exc: msg = f"GET failed: {exc}" raise ValueError(msg) from exc json_response = response.json() result = json_response["data"] metadata = {} if "meta" in json_response: metadata = json_response["meta"] if isinstance(result, dict): return [result], metadata return result, metadata async def call_webhook(self, api_key: str, webhook_url: str, component_id: UUID) -> None: # The webhook is a POST request with the data in the body # For now we are calling it just for testing try: headers = {"Authorization": f"Bearer {api_key}"} async with httpx.AsyncClient() as client: response = await client.post( webhook_url, headers=headers, json={"component_id": str(component_id)}, timeout=self.timeout ) response.raise_for_status() return response.json() except HTTPError: raise except Exception: # noqa: BLE001 logger.opt(exception=True).debug("Webhook failed") def build_tags_filter(self, tags: list[str]): tags_filter: dict[str, Any] = {"tags": {"_and": []}} for tag in tags: tags_filter["tags"]["_and"].append({"_some": {"tags_id": {"name": {"_eq": tag}}}}) return tags_filter async def count_components( self, filter_conditions: list[dict[str, Any]], *, api_key: str | None = None, use_api_key: bool | None = False, ) -> int: params = {"aggregate": json.dumps({"count": "*"})} if filter_conditions: params["filter"] = json.dumps({"_and": filter_conditions}) api_key = api_key if use_api_key else None results, _ = await self.get(self.components_url, api_key, params) return int(results[0].get("count", 0)) @staticmethod def build_search_filter_conditions(query: str): # instead of build the param ?search=query, we will build the filter # that will use _icontains (case insensitive) conditions: dict[str, Any] = {"_or": []} conditions["_or"].append({"name": {"_icontains": query}}) conditions["_or"].append({"description": {"_icontains": query}}) conditions["_or"].append({"tags": {"tags_id": {"name": {"_icontains": query}}}}) conditions["_or"].append({"user_created": {"username": {"_icontains": query}}}) return conditions def build_filter_conditions( self, *, component_id: str | None = None, search: str | None = None, private: bool | None = None, tags: list[str] | None = None, is_component: bool | None = None, filter_by_user: bool | None = False, liked: bool | None = False, store_api_key: str | None = None, ): filter_conditions = [] if component_id is None: component_id = get_id_from_search_string(search) if search else None if search is not None and component_id is None: search_conditions = self.build_search_filter_conditions(search) filter_conditions.append(search_conditions) if private is not None: filter_conditions.append({"private": {"_eq": private}}) if tags: tags_filter = self.build_tags_filter(tags) filter_conditions.append(tags_filter) if component_id is not None: filter_conditions.append({"id": {"_eq": component_id}}) if is_component is not None: filter_conditions.append({"is_component": {"_eq": is_component}}) if liked and store_api_key: liked_filter = self.build_liked_filter() filter_conditions.append(liked_filter) elif liked and not store_api_key: msg = "You must provide an API key to filter by likes" raise APIKeyError(msg) if filter_by_user and store_api_key: user_data = user_data_var.get() if not user_data: msg = "No user data" raise ValueError(msg) filter_conditions.append({"user_created": {"_eq": user_data["id"]}}) elif filter_by_user and not store_api_key: msg = "You must provide an API key to filter your components" raise APIKeyError(msg) else: filter_conditions.append({"private": {"_eq": False}}) return filter_conditions def build_liked_filter(self): user_data = user_data_var.get() # params["filter"] = json.dumps({"user_created": {"_eq": user_data["id"]}}) if not user_data: msg = "No user data" raise ValueError(msg) return {"liked_by": {"directus_users_id": {"_eq": user_data["id"]}}} async def query_components( self, *, api_key: str | None = None, sort: list[str] | None = None, page: int = 1, limit: int = 15, fields: list[str] | None = None, filter_conditions: list[dict[str, Any]] | None = None, use_api_key: bool | None = False, ) -> tuple[list[ListComponentResponse], dict[str, Any]]: params: dict[str, Any] = { "page": page, "limit": limit, "fields": ",".join(fields) if fields is not None else ",".join(self.default_fields), "meta": "filter_count", # !This is DEPRECATED so we should remove it ASAP } # ?aggregate[count]=likes if sort: params["sort"] = ",".join(sort) # Only public components or the ones created by the user # check for "public" or "Public" if filter_conditions: params["filter"] = json.dumps({"_and": filter_conditions}) # If not liked, this means we are getting public components # so we don't need to risk passing an invalid api_key # and getting 401 api_key = api_key if use_api_key else None results, metadata = await self.get(self.components_url, api_key, params) if isinstance(results, dict): results = [results] results_objects = [ListComponentResponse(**result) for result in results] return results_objects, metadata async def get_liked_by_user_components(self, component_ids: list[str], api_key: str) -> list[str]: # Get fields id # filter should be "id is in component_ids AND liked_by directus_users_id token is api_key" # return the ids user_data = user_data_var.get() if not user_data: msg = "No user data" raise ValueError(msg) params = { "fields": "id", "filter": json.dumps( { "_and": [ {"id": {"_in": component_ids}}, {"liked_by": {"directus_users_id": {"_eq": user_data["id"]}}}, ] } ), } results, _ = await self.get(self.components_url, api_key, params) return [result["id"] for result in results] # Which of the components is parent of the user's components async def get_components_in_users_collection(self, component_ids: list[str], api_key: str): user_data = user_data_var.get() if not user_data: msg = "No user data" raise ValueError(msg) params = { "fields": "id", "filter": json.dumps( { "_and": [ {"user_created": {"_eq": user_data["id"]}}, {"parent": {"_in": component_ids}}, ] } ), } results, _ = await self.get(self.components_url, api_key, params) return [result["id"] for result in results] async def download(self, api_key: str, component_id: UUID) -> DownloadComponentResponse: url = f"{self.components_url}/{component_id}" params = {"fields": "id,name,description,data,is_component,metadata"} if not self.download_webhook_url: msg = "DOWNLOAD_WEBHOOK_URL is not set" raise ValueError(msg) component, _ = await self.get(url, api_key, params) await self.call_webhook(api_key, self.download_webhook_url, component_id) if len(component) > 1: msg = "Something went wrong while downloading the component" raise ValueError(msg) component_dict = component[0] download_component = DownloadComponentResponse(**component_dict) # Check if metadata is an empty dict if download_component.metadata in [None, {}] and download_component.data is not None: # If it is, we need to build the metadata try: download_component.metadata = process_component_data(download_component.data.get("nodes", [])) except KeyError as e: msg = "Invalid component data. No nodes found" raise ValueError(msg) from e return download_component async def upload(self, api_key: str, component_data: StoreComponentCreate) -> CreateComponentResponse: headers = {"Authorization": f"Bearer {api_key}"} component_dict = component_data.model_dump(exclude_unset=True) # Parent is a UUID, but the store expects a string response = None if component_dict.get("parent"): component_dict["parent"] = str(component_dict["parent"]) component_dict = process_tags_for_post(component_dict) try: # response = httpx.post(self.components_url, headers=headers, json=component_dict) # response.raise_for_status() async with httpx.AsyncClient() as client: response = await client.post( self.components_url, headers=headers, json=component_dict, timeout=self.timeout ) response.raise_for_status() component = response.json()["data"] return CreateComponentResponse(**component) except HTTPError as exc: if response: try: errors = response.json() message = errors["errors"][0]["message"] if message == "An unexpected error occurred.": # This is a bug in Directus that returns this error # when an error was thrown in the flow message = "You already have a component with this name. Please choose a different name." raise FilterError(message) except UnboundLocalError: pass msg = f"Upload failed: {exc}" raise ValueError(msg) from exc async def update( self, api_key: str, component_id: UUID, component_data: StoreComponentCreate ) -> CreateComponentResponse: # Patch is the same as post, but we need to add the id to the url headers = {"Authorization": f"Bearer {api_key}"} component_dict = component_data.model_dump(exclude_unset=True) # Parent is a UUID, but the store expects a string response = None if component_dict.get("parent"): component_dict["parent"] = str(component_dict["parent"]) component_dict = process_tags_for_post(component_dict) try: # response = httpx.post(self.components_url, headers=headers, json=component_dict) # response.raise_for_status() async with httpx.AsyncClient() as client: response = await client.patch( self.components_url + f"/{component_id}", headers=headers, json=component_dict, timeout=self.timeout ) response.raise_for_status() component = response.json()["data"] return CreateComponentResponse(**component) except HTTPError as exc: if response: try: errors = response.json() message = errors["errors"][0]["message"] if message == "An unexpected error occurred.": # This is a bug in Directus that returns this error # when an error was thrown in the flow message = "You already have a component with this name. Please choose a different name." raise FilterError(message) except UnboundLocalError: pass msg = f"Upload failed: {exc}" raise ValueError(msg) from exc async def get_tags(self) -> list[dict[str, Any]]: url = f"{self.base_url}/items/tags" params = {"fields": "id,name"} tags, _ = await self.get(url, api_key=None, params=params) return tags async def get_user_likes(self, api_key: str) -> list[dict[str, Any]]: url = f"{self.base_url}/users/me" params = { "fields": "id,likes", } likes, _ = await self.get(url, api_key, params) return likes async def get_component_likes_count(self, component_id: str, api_key: str | None = None) -> int: url = f"{self.components_url}/{component_id}" params = { "fields": "id,count(liked_by)", } result, _ = await self.get(url, api_key=api_key, params=params) if len(result) == 0: msg = "Component not found" raise ValueError(msg) likes = result[0]["liked_by_count"] # likes_by_count is a string # try to convert it to int try: likes = int(likes) except ValueError as e: msg = f"Unexpected value for likes count: {likes}" raise ValueError(msg) from e return likes async def like_component(self, api_key: str, component_id: str) -> bool: # if it returns a list with one id, it means the like was successful # if it returns an int, it means the like was removed if not self.like_webhook_url: msg = "LIKE_WEBHOOK_URL is not set" raise ValueError(msg) headers = {"Authorization": f"Bearer {api_key}"} # response = httpx.post( # self.like_webhook_url, # json={"component_id": str(component_id)}, # headers=headers, # ) # response.raise_for_status() async with httpx.AsyncClient() as client: response = await client.post( self.like_webhook_url, json={"component_id": str(component_id)}, headers=headers, timeout=self.timeout, ) response.raise_for_status() if response.status_code == httpx.codes.OK: result = response.json() if isinstance(result, list): return True if isinstance(result, int): return False msg = f"Unexpected result: {result}" raise ValueError(msg) msg = f"Unexpected status code: {response.status_code}" raise ValueError(msg) async def get_list_component_response_model( self, *, component_id: str | None = None, search: str | None = None, private: bool | None = None, tags: list[str] | None = None, is_component: bool | None = None, fields: list[str] | None = None, filter_by_user: bool = False, liked: bool = False, store_api_key: str | None = None, sort: list[str] | None = None, page: int = 1, limit: int = 15, ): async with user_data_context(api_key=store_api_key, store_service=self): filter_conditions: list[dict[str, Any]] = self.build_filter_conditions( component_id=component_id, search=search, private=private, tags=tags, is_component=is_component, filter_by_user=filter_by_user, liked=liked, store_api_key=store_api_key, ) result: list[ListComponentResponse] = [] authorized = False metadata: dict = {} comp_count = 0 try: result, metadata = await self.query_components( api_key=store_api_key, page=page, limit=limit, sort=sort, fields=fields, filter_conditions=filter_conditions, use_api_key=liked or filter_by_user, ) if metadata: comp_count = metadata.get("filter_count", 0) except HTTPStatusError as exc: if exc.response.status_code == httpx.codes.FORBIDDEN: msg = "You are not authorized to access this public resource" raise ForbiddenError(msg) from exc if exc.response.status_code == httpx.codes.UNAUTHORIZED: msg = "You are not authorized to access this resource. Please check your API key." raise APIKeyError(msg) from exc except Exception as exc: msg = f"Unexpected error: {exc}" raise ValueError(msg) from exc try: if result and not metadata: if len(result) >= limit: comp_count = await self.count_components( api_key=store_api_key, filter_conditions=filter_conditions, use_api_key=liked or filter_by_user, ) else: comp_count = len(result) elif not metadata: comp_count = 0 except HTTPStatusError as exc: if exc.response.status_code == httpx.codes.FORBIDDEN: msg = "You are not authorized to access this public resource" raise ForbiddenError(msg) from exc if exc.response.status_code == httpx.codes.UNAUTHORIZED: msg = "You are not authorized to access this resource. Please check your API key." raise APIKeyError(msg) from exc if store_api_key: # Now, from the result, we need to get the components # the user likes and set the liked_by_user to True # if any of the components does not have an id, it means # we should not update the components if not result or any(component.id is None for component in result): authorized = await self.check_api_key(store_api_key) else: try: updated_result = await update_components_with_user_data( result, self, store_api_key, liked=liked ) authorized = True result = updated_result except Exception: # noqa: BLE001 logger.opt(exception=True).debug("Error updating components with user data") # If we get an error here, it means the user is not authorized authorized = False return ListComponentResponseModel(results=result, authorized=authorized, count=comp_count)