Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| from typing import TYPE_CHECKING, Any | |
| from uuid import UUID | |
| import httpx | |
| from httpx import HTTPError, HTTPStatusError | |
| from loguru import logger | |
| from langflow.services.base import Service | |
| from langflow.services.store.exceptions import APIKeyError, FilterError, ForbiddenError | |
| from langflow.services.store.schema import ( | |
| CreateComponentResponse, | |
| DownloadComponentResponse, | |
| ListComponentResponse, | |
| ListComponentResponseModel, | |
| StoreComponentCreate, | |
| ) | |
| from langflow.services.store.utils import ( | |
| process_component_data, | |
| process_tags_for_post, | |
| update_components_with_user_data, | |
| ) | |
| if TYPE_CHECKING: | |
| from langflow.services.settings.service import SettingsService | |
| from contextlib import asynccontextmanager | |
| from contextvars import ContextVar | |
| user_data_var: ContextVar[dict[str, Any] | None] = ContextVar("user_data", default=None) | |
| async def user_data_context(store_service: StoreService, api_key: str | None = None): | |
| # Fetch and set user data to the context variable | |
| if api_key: | |
| try: | |
| user_data, _ = await store_service.get( | |
| f"{store_service.base_url}/users/me", api_key, params={"fields": "id"} | |
| ) | |
| user_data_var.set(user_data[0]) | |
| except HTTPStatusError as exc: | |
| if exc.response.status_code == httpx.codes.FORBIDDEN: | |
| msg = "Invalid API key" | |
| raise ValueError(msg) from exc | |
| try: | |
| yield | |
| finally: | |
| # Clear the user data from the context variable | |
| user_data_var.set(None) | |
| def get_id_from_search_string(search_string: str) -> str | None: | |
| """Extracts the ID from a search string. | |
| Args: | |
| search_string (str): The search string to extract the ID from. | |
| Returns: | |
| Optional[str]: The extracted ID, or None if no ID is found. | |
| """ | |
| possible_id: str | None = search_string | |
| if "www.langflow.store/store/" in search_string: | |
| possible_id = search_string.split("/")[-1] | |
| try: | |
| possible_id = str(UUID(search_string)) | |
| except ValueError: | |
| possible_id = None | |
| return possible_id | |
| class StoreService(Service): | |
| """This is a service that integrates langflow with the store which is a Directus instance. | |
| It allows to search, get and post components to the store. | |
| """ | |
| name = "store_service" | |
| def __init__(self, settings_service: SettingsService): | |
| self.settings_service = settings_service | |
| self.base_url = self.settings_service.settings.store_url | |
| self.download_webhook_url = self.settings_service.settings.download_webhook_url | |
| self.like_webhook_url = self.settings_service.settings.like_webhook_url | |
| self.components_url = f"{self.base_url}/items/components" | |
| self.default_fields = [ | |
| "id", | |
| "name", | |
| "description", | |
| "user_created.username", | |
| "is_component", | |
| "tags.tags_id.name", | |
| "tags.tags_id.id", | |
| "count(liked_by)", | |
| "count(downloads)", | |
| "metadata", | |
| "last_tested_version", | |
| "private", | |
| ] | |
| self.timeout = 30 | |
| # Create a context manager that will use the api key to | |
| # get the user data and all requests inside the context manager | |
| # will make a property return that data | |
| # Without making the request multiple times | |
| async def check_api_key(self, api_key: str): | |
| # Check if the api key is valid | |
| # If it is, return True | |
| # If it is not, return False | |
| try: | |
| user_data, _ = await self.get(f"{self.base_url}/users/me", api_key, params={"fields": "id"}) | |
| return "id" in user_data[0] | |
| except HTTPStatusError as exc: | |
| if exc.response.status_code in {403, 401}: | |
| return False | |
| msg = f"Unexpected status code: {exc.response.status_code}" | |
| raise ValueError(msg) from exc | |
| except Exception as exc: | |
| msg = f"Unexpected error: {exc}" | |
| raise ValueError(msg) from exc | |
| async def get( | |
| self, url: str, api_key: str | None = None, params: dict[str, Any] | None = None | |
| ) -> tuple[list[dict[str, Any]], dict[str, Any]]: | |
| """Utility method to perform GET requests.""" | |
| headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.get(url, headers=headers, params=params, timeout=self.timeout) | |
| response.raise_for_status() | |
| except HTTPError: | |
| raise | |
| except Exception as exc: | |
| msg = f"GET failed: {exc}" | |
| raise ValueError(msg) from exc | |
| json_response = response.json() | |
| result = json_response["data"] | |
| metadata = {} | |
| if "meta" in json_response: | |
| metadata = json_response["meta"] | |
| if isinstance(result, dict): | |
| return [result], metadata | |
| return result, metadata | |
| async def call_webhook(self, api_key: str, webhook_url: str, component_id: UUID) -> None: | |
| # The webhook is a POST request with the data in the body | |
| # For now we are calling it just for testing | |
| try: | |
| headers = {"Authorization": f"Bearer {api_key}"} | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| webhook_url, headers=headers, json={"component_id": str(component_id)}, timeout=self.timeout | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except HTTPError: | |
| raise | |
| except Exception: # noqa: BLE001 | |
| logger.opt(exception=True).debug("Webhook failed") | |
| def build_tags_filter(self, tags: list[str]): | |
| tags_filter: dict[str, Any] = {"tags": {"_and": []}} | |
| for tag in tags: | |
| tags_filter["tags"]["_and"].append({"_some": {"tags_id": {"name": {"_eq": tag}}}}) | |
| return tags_filter | |
| async def count_components( | |
| self, | |
| filter_conditions: list[dict[str, Any]], | |
| *, | |
| api_key: str | None = None, | |
| use_api_key: bool | None = False, | |
| ) -> int: | |
| params = {"aggregate": json.dumps({"count": "*"})} | |
| if filter_conditions: | |
| params["filter"] = json.dumps({"_and": filter_conditions}) | |
| api_key = api_key if use_api_key else None | |
| results, _ = await self.get(self.components_url, api_key, params) | |
| return int(results[0].get("count", 0)) | |
| def build_search_filter_conditions(query: str): | |
| # instead of build the param ?search=query, we will build the filter | |
| # that will use _icontains (case insensitive) | |
| conditions: dict[str, Any] = {"_or": []} | |
| conditions["_or"].append({"name": {"_icontains": query}}) | |
| conditions["_or"].append({"description": {"_icontains": query}}) | |
| conditions["_or"].append({"tags": {"tags_id": {"name": {"_icontains": query}}}}) | |
| conditions["_or"].append({"user_created": {"username": {"_icontains": query}}}) | |
| return conditions | |
| def build_filter_conditions( | |
| self, | |
| *, | |
| component_id: str | None = None, | |
| search: str | None = None, | |
| private: bool | None = None, | |
| tags: list[str] | None = None, | |
| is_component: bool | None = None, | |
| filter_by_user: bool | None = False, | |
| liked: bool | None = False, | |
| store_api_key: str | None = None, | |
| ): | |
| filter_conditions = [] | |
| if component_id is None: | |
| component_id = get_id_from_search_string(search) if search else None | |
| if search is not None and component_id is None: | |
| search_conditions = self.build_search_filter_conditions(search) | |
| filter_conditions.append(search_conditions) | |
| if private is not None: | |
| filter_conditions.append({"private": {"_eq": private}}) | |
| if tags: | |
| tags_filter = self.build_tags_filter(tags) | |
| filter_conditions.append(tags_filter) | |
| if component_id is not None: | |
| filter_conditions.append({"id": {"_eq": component_id}}) | |
| if is_component is not None: | |
| filter_conditions.append({"is_component": {"_eq": is_component}}) | |
| if liked and store_api_key: | |
| liked_filter = self.build_liked_filter() | |
| filter_conditions.append(liked_filter) | |
| elif liked and not store_api_key: | |
| msg = "You must provide an API key to filter by likes" | |
| raise APIKeyError(msg) | |
| if filter_by_user and store_api_key: | |
| user_data = user_data_var.get() | |
| if not user_data: | |
| msg = "No user data" | |
| raise ValueError(msg) | |
| filter_conditions.append({"user_created": {"_eq": user_data["id"]}}) | |
| elif filter_by_user and not store_api_key: | |
| msg = "You must provide an API key to filter your components" | |
| raise APIKeyError(msg) | |
| else: | |
| filter_conditions.append({"private": {"_eq": False}}) | |
| return filter_conditions | |
| def build_liked_filter(self): | |
| user_data = user_data_var.get() | |
| # params["filter"] = json.dumps({"user_created": {"_eq": user_data["id"]}}) | |
| if not user_data: | |
| msg = "No user data" | |
| raise ValueError(msg) | |
| return {"liked_by": {"directus_users_id": {"_eq": user_data["id"]}}} | |
| async def query_components( | |
| self, | |
| *, | |
| api_key: str | None = None, | |
| sort: list[str] | None = None, | |
| page: int = 1, | |
| limit: int = 15, | |
| fields: list[str] | None = None, | |
| filter_conditions: list[dict[str, Any]] | None = None, | |
| use_api_key: bool | None = False, | |
| ) -> tuple[list[ListComponentResponse], dict[str, Any]]: | |
| params: dict[str, Any] = { | |
| "page": page, | |
| "limit": limit, | |
| "fields": ",".join(fields) if fields is not None else ",".join(self.default_fields), | |
| "meta": "filter_count", # !This is DEPRECATED so we should remove it ASAP | |
| } | |
| # ?aggregate[count]=likes | |
| if sort: | |
| params["sort"] = ",".join(sort) | |
| # Only public components or the ones created by the user | |
| # check for "public" or "Public" | |
| if filter_conditions: | |
| params["filter"] = json.dumps({"_and": filter_conditions}) | |
| # If not liked, this means we are getting public components | |
| # so we don't need to risk passing an invalid api_key | |
| # and getting 401 | |
| api_key = api_key if use_api_key else None | |
| results, metadata = await self.get(self.components_url, api_key, params) | |
| if isinstance(results, dict): | |
| results = [results] | |
| results_objects = [ListComponentResponse(**result) for result in results] | |
| return results_objects, metadata | |
| async def get_liked_by_user_components(self, component_ids: list[str], api_key: str) -> list[str]: | |
| # Get fields id | |
| # filter should be "id is in component_ids AND liked_by directus_users_id token is api_key" | |
| # return the ids | |
| user_data = user_data_var.get() | |
| if not user_data: | |
| msg = "No user data" | |
| raise ValueError(msg) | |
| params = { | |
| "fields": "id", | |
| "filter": json.dumps( | |
| { | |
| "_and": [ | |
| {"id": {"_in": component_ids}}, | |
| {"liked_by": {"directus_users_id": {"_eq": user_data["id"]}}}, | |
| ] | |
| } | |
| ), | |
| } | |
| results, _ = await self.get(self.components_url, api_key, params) | |
| return [result["id"] for result in results] | |
| # Which of the components is parent of the user's components | |
| async def get_components_in_users_collection(self, component_ids: list[str], api_key: str): | |
| user_data = user_data_var.get() | |
| if not user_data: | |
| msg = "No user data" | |
| raise ValueError(msg) | |
| params = { | |
| "fields": "id", | |
| "filter": json.dumps( | |
| { | |
| "_and": [ | |
| {"user_created": {"_eq": user_data["id"]}}, | |
| {"parent": {"_in": component_ids}}, | |
| ] | |
| } | |
| ), | |
| } | |
| results, _ = await self.get(self.components_url, api_key, params) | |
| return [result["id"] for result in results] | |
| async def download(self, api_key: str, component_id: UUID) -> DownloadComponentResponse: | |
| url = f"{self.components_url}/{component_id}" | |
| params = {"fields": "id,name,description,data,is_component,metadata"} | |
| if not self.download_webhook_url: | |
| msg = "DOWNLOAD_WEBHOOK_URL is not set" | |
| raise ValueError(msg) | |
| component, _ = await self.get(url, api_key, params) | |
| await self.call_webhook(api_key, self.download_webhook_url, component_id) | |
| if len(component) > 1: | |
| msg = "Something went wrong while downloading the component" | |
| raise ValueError(msg) | |
| component_dict = component[0] | |
| download_component = DownloadComponentResponse(**component_dict) | |
| # Check if metadata is an empty dict | |
| if download_component.metadata in [None, {}] and download_component.data is not None: | |
| # If it is, we need to build the metadata | |
| try: | |
| download_component.metadata = process_component_data(download_component.data.get("nodes", [])) | |
| except KeyError as e: | |
| msg = "Invalid component data. No nodes found" | |
| raise ValueError(msg) from e | |
| return download_component | |
| async def upload(self, api_key: str, component_data: StoreComponentCreate) -> CreateComponentResponse: | |
| headers = {"Authorization": f"Bearer {api_key}"} | |
| component_dict = component_data.model_dump(exclude_unset=True) | |
| # Parent is a UUID, but the store expects a string | |
| response = None | |
| if component_dict.get("parent"): | |
| component_dict["parent"] = str(component_dict["parent"]) | |
| component_dict = process_tags_for_post(component_dict) | |
| try: | |
| # response = httpx.post(self.components_url, headers=headers, json=component_dict) | |
| # response.raise_for_status() | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| self.components_url, headers=headers, json=component_dict, timeout=self.timeout | |
| ) | |
| response.raise_for_status() | |
| component = response.json()["data"] | |
| return CreateComponentResponse(**component) | |
| except HTTPError as exc: | |
| if response: | |
| try: | |
| errors = response.json() | |
| message = errors["errors"][0]["message"] | |
| if message == "An unexpected error occurred.": | |
| # This is a bug in Directus that returns this error | |
| # when an error was thrown in the flow | |
| message = "You already have a component with this name. Please choose a different name." | |
| raise FilterError(message) | |
| except UnboundLocalError: | |
| pass | |
| msg = f"Upload failed: {exc}" | |
| raise ValueError(msg) from exc | |
| async def update( | |
| self, api_key: str, component_id: UUID, component_data: StoreComponentCreate | |
| ) -> CreateComponentResponse: | |
| # Patch is the same as post, but we need to add the id to the url | |
| headers = {"Authorization": f"Bearer {api_key}"} | |
| component_dict = component_data.model_dump(exclude_unset=True) | |
| # Parent is a UUID, but the store expects a string | |
| response = None | |
| if component_dict.get("parent"): | |
| component_dict["parent"] = str(component_dict["parent"]) | |
| component_dict = process_tags_for_post(component_dict) | |
| try: | |
| # response = httpx.post(self.components_url, headers=headers, json=component_dict) | |
| # response.raise_for_status() | |
| async with httpx.AsyncClient() as client: | |
| response = await client.patch( | |
| self.components_url + f"/{component_id}", headers=headers, json=component_dict, timeout=self.timeout | |
| ) | |
| response.raise_for_status() | |
| component = response.json()["data"] | |
| return CreateComponentResponse(**component) | |
| except HTTPError as exc: | |
| if response: | |
| try: | |
| errors = response.json() | |
| message = errors["errors"][0]["message"] | |
| if message == "An unexpected error occurred.": | |
| # This is a bug in Directus that returns this error | |
| # when an error was thrown in the flow | |
| message = "You already have a component with this name. Please choose a different name." | |
| raise FilterError(message) | |
| except UnboundLocalError: | |
| pass | |
| msg = f"Upload failed: {exc}" | |
| raise ValueError(msg) from exc | |
| async def get_tags(self) -> list[dict[str, Any]]: | |
| url = f"{self.base_url}/items/tags" | |
| params = {"fields": "id,name"} | |
| tags, _ = await self.get(url, api_key=None, params=params) | |
| return tags | |
| async def get_user_likes(self, api_key: str) -> list[dict[str, Any]]: | |
| url = f"{self.base_url}/users/me" | |
| params = { | |
| "fields": "id,likes", | |
| } | |
| likes, _ = await self.get(url, api_key, params) | |
| return likes | |
| async def get_component_likes_count(self, component_id: str, api_key: str | None = None) -> int: | |
| url = f"{self.components_url}/{component_id}" | |
| params = { | |
| "fields": "id,count(liked_by)", | |
| } | |
| result, _ = await self.get(url, api_key=api_key, params=params) | |
| if len(result) == 0: | |
| msg = "Component not found" | |
| raise ValueError(msg) | |
| likes = result[0]["liked_by_count"] | |
| # likes_by_count is a string | |
| # try to convert it to int | |
| try: | |
| likes = int(likes) | |
| except ValueError as e: | |
| msg = f"Unexpected value for likes count: {likes}" | |
| raise ValueError(msg) from e | |
| return likes | |
| async def like_component(self, api_key: str, component_id: str) -> bool: | |
| # if it returns a list with one id, it means the like was successful | |
| # if it returns an int, it means the like was removed | |
| if not self.like_webhook_url: | |
| msg = "LIKE_WEBHOOK_URL is not set" | |
| raise ValueError(msg) | |
| headers = {"Authorization": f"Bearer {api_key}"} | |
| # response = httpx.post( | |
| # self.like_webhook_url, | |
| # json={"component_id": str(component_id)}, | |
| # headers=headers, | |
| # ) | |
| # response.raise_for_status() | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| self.like_webhook_url, | |
| json={"component_id": str(component_id)}, | |
| headers=headers, | |
| timeout=self.timeout, | |
| ) | |
| response.raise_for_status() | |
| if response.status_code == httpx.codes.OK: | |
| result = response.json() | |
| if isinstance(result, list): | |
| return True | |
| if isinstance(result, int): | |
| return False | |
| msg = f"Unexpected result: {result}" | |
| raise ValueError(msg) | |
| msg = f"Unexpected status code: {response.status_code}" | |
| raise ValueError(msg) | |
| async def get_list_component_response_model( | |
| self, | |
| *, | |
| component_id: str | None = None, | |
| search: str | None = None, | |
| private: bool | None = None, | |
| tags: list[str] | None = None, | |
| is_component: bool | None = None, | |
| fields: list[str] | None = None, | |
| filter_by_user: bool = False, | |
| liked: bool = False, | |
| store_api_key: str | None = None, | |
| sort: list[str] | None = None, | |
| page: int = 1, | |
| limit: int = 15, | |
| ): | |
| async with user_data_context(api_key=store_api_key, store_service=self): | |
| filter_conditions: list[dict[str, Any]] = self.build_filter_conditions( | |
| component_id=component_id, | |
| search=search, | |
| private=private, | |
| tags=tags, | |
| is_component=is_component, | |
| filter_by_user=filter_by_user, | |
| liked=liked, | |
| store_api_key=store_api_key, | |
| ) | |
| result: list[ListComponentResponse] = [] | |
| authorized = False | |
| metadata: dict = {} | |
| comp_count = 0 | |
| try: | |
| result, metadata = await self.query_components( | |
| api_key=store_api_key, | |
| page=page, | |
| limit=limit, | |
| sort=sort, | |
| fields=fields, | |
| filter_conditions=filter_conditions, | |
| use_api_key=liked or filter_by_user, | |
| ) | |
| if metadata: | |
| comp_count = metadata.get("filter_count", 0) | |
| except HTTPStatusError as exc: | |
| if exc.response.status_code == httpx.codes.FORBIDDEN: | |
| msg = "You are not authorized to access this public resource" | |
| raise ForbiddenError(msg) from exc | |
| if exc.response.status_code == httpx.codes.UNAUTHORIZED: | |
| msg = "You are not authorized to access this resource. Please check your API key." | |
| raise APIKeyError(msg) from exc | |
| except Exception as exc: | |
| msg = f"Unexpected error: {exc}" | |
| raise ValueError(msg) from exc | |
| try: | |
| if result and not metadata: | |
| if len(result) >= limit: | |
| comp_count = await self.count_components( | |
| api_key=store_api_key, | |
| filter_conditions=filter_conditions, | |
| use_api_key=liked or filter_by_user, | |
| ) | |
| else: | |
| comp_count = len(result) | |
| elif not metadata: | |
| comp_count = 0 | |
| except HTTPStatusError as exc: | |
| if exc.response.status_code == httpx.codes.FORBIDDEN: | |
| msg = "You are not authorized to access this public resource" | |
| raise ForbiddenError(msg) from exc | |
| if exc.response.status_code == httpx.codes.UNAUTHORIZED: | |
| msg = "You are not authorized to access this resource. Please check your API key." | |
| raise APIKeyError(msg) from exc | |
| if store_api_key: | |
| # Now, from the result, we need to get the components | |
| # the user likes and set the liked_by_user to True | |
| # if any of the components does not have an id, it means | |
| # we should not update the components | |
| if not result or any(component.id is None for component in result): | |
| authorized = await self.check_api_key(store_api_key) | |
| else: | |
| try: | |
| updated_result = await update_components_with_user_data( | |
| result, self, store_api_key, liked=liked | |
| ) | |
| authorized = True | |
| result = updated_result | |
| except Exception: # noqa: BLE001 | |
| logger.opt(exception=True).debug("Error updating components with user data") | |
| # If we get an error here, it means the user is not authorized | |
| authorized = False | |
| return ListComponentResponseModel(results=result, authorized=authorized, count=comp_count) | |