Spaces:
Running
Running
from __future__ import annotations | |
import json | |
from typing import TYPE_CHECKING, Any | |
from uuid import UUID | |
import httpx | |
from httpx import HTTPError, HTTPStatusError | |
from loguru import logger | |
from langflow.services.base import Service | |
from langflow.services.store.exceptions import APIKeyError, FilterError, ForbiddenError | |
from langflow.services.store.schema import ( | |
CreateComponentResponse, | |
DownloadComponentResponse, | |
ListComponentResponse, | |
ListComponentResponseModel, | |
StoreComponentCreate, | |
) | |
from langflow.services.store.utils import ( | |
process_component_data, | |
process_tags_for_post, | |
update_components_with_user_data, | |
) | |
if TYPE_CHECKING: | |
from langflow.services.settings.service import SettingsService | |
from contextlib import asynccontextmanager | |
from contextvars import ContextVar | |
user_data_var: ContextVar[dict[str, Any] | None] = ContextVar("user_data", default=None) | |
async def user_data_context(store_service: StoreService, api_key: str | None = None): | |
# Fetch and set user data to the context variable | |
if api_key: | |
try: | |
user_data, _ = await store_service.get( | |
f"{store_service.base_url}/users/me", api_key, params={"fields": "id"} | |
) | |
user_data_var.set(user_data[0]) | |
except HTTPStatusError as exc: | |
if exc.response.status_code == httpx.codes.FORBIDDEN: | |
msg = "Invalid API key" | |
raise ValueError(msg) from exc | |
try: | |
yield | |
finally: | |
# Clear the user data from the context variable | |
user_data_var.set(None) | |
def get_id_from_search_string(search_string: str) -> str | None: | |
"""Extracts the ID from a search string. | |
Args: | |
search_string (str): The search string to extract the ID from. | |
Returns: | |
Optional[str]: The extracted ID, or None if no ID is found. | |
""" | |
possible_id: str | None = search_string | |
if "www.langflow.store/store/" in search_string: | |
possible_id = search_string.split("/")[-1] | |
try: | |
possible_id = str(UUID(search_string)) | |
except ValueError: | |
possible_id = None | |
return possible_id | |
class StoreService(Service): | |
"""This is a service that integrates langflow with the store which is a Directus instance. | |
It allows to search, get and post components to the store. | |
""" | |
name = "store_service" | |
def __init__(self, settings_service: SettingsService): | |
self.settings_service = settings_service | |
self.base_url = self.settings_service.settings.store_url | |
self.download_webhook_url = self.settings_service.settings.download_webhook_url | |
self.like_webhook_url = self.settings_service.settings.like_webhook_url | |
self.components_url = f"{self.base_url}/items/components" | |
self.default_fields = [ | |
"id", | |
"name", | |
"description", | |
"user_created.username", | |
"is_component", | |
"tags.tags_id.name", | |
"tags.tags_id.id", | |
"count(liked_by)", | |
"count(downloads)", | |
"metadata", | |
"last_tested_version", | |
"private", | |
] | |
self.timeout = 30 | |
# Create a context manager that will use the api key to | |
# get the user data and all requests inside the context manager | |
# will make a property return that data | |
# Without making the request multiple times | |
async def check_api_key(self, api_key: str): | |
# Check if the api key is valid | |
# If it is, return True | |
# If it is not, return False | |
try: | |
user_data, _ = await self.get(f"{self.base_url}/users/me", api_key, params={"fields": "id"}) | |
return "id" in user_data[0] | |
except HTTPStatusError as exc: | |
if exc.response.status_code in {403, 401}: | |
return False | |
msg = f"Unexpected status code: {exc.response.status_code}" | |
raise ValueError(msg) from exc | |
except Exception as exc: | |
msg = f"Unexpected error: {exc}" | |
raise ValueError(msg) from exc | |
async def get( | |
self, url: str, api_key: str | None = None, params: dict[str, Any] | None = None | |
) -> tuple[list[dict[str, Any]], dict[str, Any]]: | |
"""Utility method to perform GET requests.""" | |
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.get(url, headers=headers, params=params, timeout=self.timeout) | |
response.raise_for_status() | |
except HTTPError: | |
raise | |
except Exception as exc: | |
msg = f"GET failed: {exc}" | |
raise ValueError(msg) from exc | |
json_response = response.json() | |
result = json_response["data"] | |
metadata = {} | |
if "meta" in json_response: | |
metadata = json_response["meta"] | |
if isinstance(result, dict): | |
return [result], metadata | |
return result, metadata | |
async def call_webhook(self, api_key: str, webhook_url: str, component_id: UUID) -> None: | |
# The webhook is a POST request with the data in the body | |
# For now we are calling it just for testing | |
try: | |
headers = {"Authorization": f"Bearer {api_key}"} | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
webhook_url, headers=headers, json={"component_id": str(component_id)}, timeout=self.timeout | |
) | |
response.raise_for_status() | |
return response.json() | |
except HTTPError: | |
raise | |
except Exception: # noqa: BLE001 | |
logger.opt(exception=True).debug("Webhook failed") | |
def build_tags_filter(self, tags: list[str]): | |
tags_filter: dict[str, Any] = {"tags": {"_and": []}} | |
for tag in tags: | |
tags_filter["tags"]["_and"].append({"_some": {"tags_id": {"name": {"_eq": tag}}}}) | |
return tags_filter | |
async def count_components( | |
self, | |
filter_conditions: list[dict[str, Any]], | |
*, | |
api_key: str | None = None, | |
use_api_key: bool | None = False, | |
) -> int: | |
params = {"aggregate": json.dumps({"count": "*"})} | |
if filter_conditions: | |
params["filter"] = json.dumps({"_and": filter_conditions}) | |
api_key = api_key if use_api_key else None | |
results, _ = await self.get(self.components_url, api_key, params) | |
return int(results[0].get("count", 0)) | |
def build_search_filter_conditions(query: str): | |
# instead of build the param ?search=query, we will build the filter | |
# that will use _icontains (case insensitive) | |
conditions: dict[str, Any] = {"_or": []} | |
conditions["_or"].append({"name": {"_icontains": query}}) | |
conditions["_or"].append({"description": {"_icontains": query}}) | |
conditions["_or"].append({"tags": {"tags_id": {"name": {"_icontains": query}}}}) | |
conditions["_or"].append({"user_created": {"username": {"_icontains": query}}}) | |
return conditions | |
def build_filter_conditions( | |
self, | |
*, | |
component_id: str | None = None, | |
search: str | None = None, | |
private: bool | None = None, | |
tags: list[str] | None = None, | |
is_component: bool | None = None, | |
filter_by_user: bool | None = False, | |
liked: bool | None = False, | |
store_api_key: str | None = None, | |
): | |
filter_conditions = [] | |
if component_id is None: | |
component_id = get_id_from_search_string(search) if search else None | |
if search is not None and component_id is None: | |
search_conditions = self.build_search_filter_conditions(search) | |
filter_conditions.append(search_conditions) | |
if private is not None: | |
filter_conditions.append({"private": {"_eq": private}}) | |
if tags: | |
tags_filter = self.build_tags_filter(tags) | |
filter_conditions.append(tags_filter) | |
if component_id is not None: | |
filter_conditions.append({"id": {"_eq": component_id}}) | |
if is_component is not None: | |
filter_conditions.append({"is_component": {"_eq": is_component}}) | |
if liked and store_api_key: | |
liked_filter = self.build_liked_filter() | |
filter_conditions.append(liked_filter) | |
elif liked and not store_api_key: | |
msg = "You must provide an API key to filter by likes" | |
raise APIKeyError(msg) | |
if filter_by_user and store_api_key: | |
user_data = user_data_var.get() | |
if not user_data: | |
msg = "No user data" | |
raise ValueError(msg) | |
filter_conditions.append({"user_created": {"_eq": user_data["id"]}}) | |
elif filter_by_user and not store_api_key: | |
msg = "You must provide an API key to filter your components" | |
raise APIKeyError(msg) | |
else: | |
filter_conditions.append({"private": {"_eq": False}}) | |
return filter_conditions | |
def build_liked_filter(self): | |
user_data = user_data_var.get() | |
# params["filter"] = json.dumps({"user_created": {"_eq": user_data["id"]}}) | |
if not user_data: | |
msg = "No user data" | |
raise ValueError(msg) | |
return {"liked_by": {"directus_users_id": {"_eq": user_data["id"]}}} | |
async def query_components( | |
self, | |
*, | |
api_key: str | None = None, | |
sort: list[str] | None = None, | |
page: int = 1, | |
limit: int = 15, | |
fields: list[str] | None = None, | |
filter_conditions: list[dict[str, Any]] | None = None, | |
use_api_key: bool | None = False, | |
) -> tuple[list[ListComponentResponse], dict[str, Any]]: | |
params: dict[str, Any] = { | |
"page": page, | |
"limit": limit, | |
"fields": ",".join(fields) if fields is not None else ",".join(self.default_fields), | |
"meta": "filter_count", # !This is DEPRECATED so we should remove it ASAP | |
} | |
# ?aggregate[count]=likes | |
if sort: | |
params["sort"] = ",".join(sort) | |
# Only public components or the ones created by the user | |
# check for "public" or "Public" | |
if filter_conditions: | |
params["filter"] = json.dumps({"_and": filter_conditions}) | |
# If not liked, this means we are getting public components | |
# so we don't need to risk passing an invalid api_key | |
# and getting 401 | |
api_key = api_key if use_api_key else None | |
results, metadata = await self.get(self.components_url, api_key, params) | |
if isinstance(results, dict): | |
results = [results] | |
results_objects = [ListComponentResponse(**result) for result in results] | |
return results_objects, metadata | |
async def get_liked_by_user_components(self, component_ids: list[str], api_key: str) -> list[str]: | |
# Get fields id | |
# filter should be "id is in component_ids AND liked_by directus_users_id token is api_key" | |
# return the ids | |
user_data = user_data_var.get() | |
if not user_data: | |
msg = "No user data" | |
raise ValueError(msg) | |
params = { | |
"fields": "id", | |
"filter": json.dumps( | |
{ | |
"_and": [ | |
{"id": {"_in": component_ids}}, | |
{"liked_by": {"directus_users_id": {"_eq": user_data["id"]}}}, | |
] | |
} | |
), | |
} | |
results, _ = await self.get(self.components_url, api_key, params) | |
return [result["id"] for result in results] | |
# Which of the components is parent of the user's components | |
async def get_components_in_users_collection(self, component_ids: list[str], api_key: str): | |
user_data = user_data_var.get() | |
if not user_data: | |
msg = "No user data" | |
raise ValueError(msg) | |
params = { | |
"fields": "id", | |
"filter": json.dumps( | |
{ | |
"_and": [ | |
{"user_created": {"_eq": user_data["id"]}}, | |
{"parent": {"_in": component_ids}}, | |
] | |
} | |
), | |
} | |
results, _ = await self.get(self.components_url, api_key, params) | |
return [result["id"] for result in results] | |
async def download(self, api_key: str, component_id: UUID) -> DownloadComponentResponse: | |
url = f"{self.components_url}/{component_id}" | |
params = {"fields": "id,name,description,data,is_component,metadata"} | |
if not self.download_webhook_url: | |
msg = "DOWNLOAD_WEBHOOK_URL is not set" | |
raise ValueError(msg) | |
component, _ = await self.get(url, api_key, params) | |
await self.call_webhook(api_key, self.download_webhook_url, component_id) | |
if len(component) > 1: | |
msg = "Something went wrong while downloading the component" | |
raise ValueError(msg) | |
component_dict = component[0] | |
download_component = DownloadComponentResponse(**component_dict) | |
# Check if metadata is an empty dict | |
if download_component.metadata in [None, {}] and download_component.data is not None: | |
# If it is, we need to build the metadata | |
try: | |
download_component.metadata = process_component_data(download_component.data.get("nodes", [])) | |
except KeyError as e: | |
msg = "Invalid component data. No nodes found" | |
raise ValueError(msg) from e | |
return download_component | |
async def upload(self, api_key: str, component_data: StoreComponentCreate) -> CreateComponentResponse: | |
headers = {"Authorization": f"Bearer {api_key}"} | |
component_dict = component_data.model_dump(exclude_unset=True) | |
# Parent is a UUID, but the store expects a string | |
response = None | |
if component_dict.get("parent"): | |
component_dict["parent"] = str(component_dict["parent"]) | |
component_dict = process_tags_for_post(component_dict) | |
try: | |
# response = httpx.post(self.components_url, headers=headers, json=component_dict) | |
# response.raise_for_status() | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
self.components_url, headers=headers, json=component_dict, timeout=self.timeout | |
) | |
response.raise_for_status() | |
component = response.json()["data"] | |
return CreateComponentResponse(**component) | |
except HTTPError as exc: | |
if response: | |
try: | |
errors = response.json() | |
message = errors["errors"][0]["message"] | |
if message == "An unexpected error occurred.": | |
# This is a bug in Directus that returns this error | |
# when an error was thrown in the flow | |
message = "You already have a component with this name. Please choose a different name." | |
raise FilterError(message) | |
except UnboundLocalError: | |
pass | |
msg = f"Upload failed: {exc}" | |
raise ValueError(msg) from exc | |
async def update( | |
self, api_key: str, component_id: UUID, component_data: StoreComponentCreate | |
) -> CreateComponentResponse: | |
# Patch is the same as post, but we need to add the id to the url | |
headers = {"Authorization": f"Bearer {api_key}"} | |
component_dict = component_data.model_dump(exclude_unset=True) | |
# Parent is a UUID, but the store expects a string | |
response = None | |
if component_dict.get("parent"): | |
component_dict["parent"] = str(component_dict["parent"]) | |
component_dict = process_tags_for_post(component_dict) | |
try: | |
# response = httpx.post(self.components_url, headers=headers, json=component_dict) | |
# response.raise_for_status() | |
async with httpx.AsyncClient() as client: | |
response = await client.patch( | |
self.components_url + f"/{component_id}", headers=headers, json=component_dict, timeout=self.timeout | |
) | |
response.raise_for_status() | |
component = response.json()["data"] | |
return CreateComponentResponse(**component) | |
except HTTPError as exc: | |
if response: | |
try: | |
errors = response.json() | |
message = errors["errors"][0]["message"] | |
if message == "An unexpected error occurred.": | |
# This is a bug in Directus that returns this error | |
# when an error was thrown in the flow | |
message = "You already have a component with this name. Please choose a different name." | |
raise FilterError(message) | |
except UnboundLocalError: | |
pass | |
msg = f"Upload failed: {exc}" | |
raise ValueError(msg) from exc | |
async def get_tags(self) -> list[dict[str, Any]]: | |
url = f"{self.base_url}/items/tags" | |
params = {"fields": "id,name"} | |
tags, _ = await self.get(url, api_key=None, params=params) | |
return tags | |
async def get_user_likes(self, api_key: str) -> list[dict[str, Any]]: | |
url = f"{self.base_url}/users/me" | |
params = { | |
"fields": "id,likes", | |
} | |
likes, _ = await self.get(url, api_key, params) | |
return likes | |
async def get_component_likes_count(self, component_id: str, api_key: str | None = None) -> int: | |
url = f"{self.components_url}/{component_id}" | |
params = { | |
"fields": "id,count(liked_by)", | |
} | |
result, _ = await self.get(url, api_key=api_key, params=params) | |
if len(result) == 0: | |
msg = "Component not found" | |
raise ValueError(msg) | |
likes = result[0]["liked_by_count"] | |
# likes_by_count is a string | |
# try to convert it to int | |
try: | |
likes = int(likes) | |
except ValueError as e: | |
msg = f"Unexpected value for likes count: {likes}" | |
raise ValueError(msg) from e | |
return likes | |
async def like_component(self, api_key: str, component_id: str) -> bool: | |
# if it returns a list with one id, it means the like was successful | |
# if it returns an int, it means the like was removed | |
if not self.like_webhook_url: | |
msg = "LIKE_WEBHOOK_URL is not set" | |
raise ValueError(msg) | |
headers = {"Authorization": f"Bearer {api_key}"} | |
# response = httpx.post( | |
# self.like_webhook_url, | |
# json={"component_id": str(component_id)}, | |
# headers=headers, | |
# ) | |
# response.raise_for_status() | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
self.like_webhook_url, | |
json={"component_id": str(component_id)}, | |
headers=headers, | |
timeout=self.timeout, | |
) | |
response.raise_for_status() | |
if response.status_code == httpx.codes.OK: | |
result = response.json() | |
if isinstance(result, list): | |
return True | |
if isinstance(result, int): | |
return False | |
msg = f"Unexpected result: {result}" | |
raise ValueError(msg) | |
msg = f"Unexpected status code: {response.status_code}" | |
raise ValueError(msg) | |
async def get_list_component_response_model( | |
self, | |
*, | |
component_id: str | None = None, | |
search: str | None = None, | |
private: bool | None = None, | |
tags: list[str] | None = None, | |
is_component: bool | None = None, | |
fields: list[str] | None = None, | |
filter_by_user: bool = False, | |
liked: bool = False, | |
store_api_key: str | None = None, | |
sort: list[str] | None = None, | |
page: int = 1, | |
limit: int = 15, | |
): | |
async with user_data_context(api_key=store_api_key, store_service=self): | |
filter_conditions: list[dict[str, Any]] = self.build_filter_conditions( | |
component_id=component_id, | |
search=search, | |
private=private, | |
tags=tags, | |
is_component=is_component, | |
filter_by_user=filter_by_user, | |
liked=liked, | |
store_api_key=store_api_key, | |
) | |
result: list[ListComponentResponse] = [] | |
authorized = False | |
metadata: dict = {} | |
comp_count = 0 | |
try: | |
result, metadata = await self.query_components( | |
api_key=store_api_key, | |
page=page, | |
limit=limit, | |
sort=sort, | |
fields=fields, | |
filter_conditions=filter_conditions, | |
use_api_key=liked or filter_by_user, | |
) | |
if metadata: | |
comp_count = metadata.get("filter_count", 0) | |
except HTTPStatusError as exc: | |
if exc.response.status_code == httpx.codes.FORBIDDEN: | |
msg = "You are not authorized to access this public resource" | |
raise ForbiddenError(msg) from exc | |
if exc.response.status_code == httpx.codes.UNAUTHORIZED: | |
msg = "You are not authorized to access this resource. Please check your API key." | |
raise APIKeyError(msg) from exc | |
except Exception as exc: | |
msg = f"Unexpected error: {exc}" | |
raise ValueError(msg) from exc | |
try: | |
if result and not metadata: | |
if len(result) >= limit: | |
comp_count = await self.count_components( | |
api_key=store_api_key, | |
filter_conditions=filter_conditions, | |
use_api_key=liked or filter_by_user, | |
) | |
else: | |
comp_count = len(result) | |
elif not metadata: | |
comp_count = 0 | |
except HTTPStatusError as exc: | |
if exc.response.status_code == httpx.codes.FORBIDDEN: | |
msg = "You are not authorized to access this public resource" | |
raise ForbiddenError(msg) from exc | |
if exc.response.status_code == httpx.codes.UNAUTHORIZED: | |
msg = "You are not authorized to access this resource. Please check your API key." | |
raise APIKeyError(msg) from exc | |
if store_api_key: | |
# Now, from the result, we need to get the components | |
# the user likes and set the liked_by_user to True | |
# if any of the components does not have an id, it means | |
# we should not update the components | |
if not result or any(component.id is None for component in result): | |
authorized = await self.check_api_key(store_api_key) | |
else: | |
try: | |
updated_result = await update_components_with_user_data( | |
result, self, store_api_key, liked=liked | |
) | |
authorized = True | |
result = updated_result | |
except Exception: # noqa: BLE001 | |
logger.opt(exception=True).debug("Error updating components with user data") | |
# If we get an error here, it means the user is not authorized | |
authorized = False | |
return ListComponentResponseModel(results=result, authorized=authorized, count=comp_count) | |