Spaces:
Running
Running
# Standard library imports | |
from collections.abc import Sequence | |
from typing import Any | |
# Third-party imports | |
from composio.client.collections import AppAuthScheme | |
from composio.client.exceptions import NoItemsFound | |
from composio_langchain import Action, App, ComposioToolSet | |
from langchain_core.tools import Tool | |
from loguru import logger | |
from typing_extensions import override | |
# Local imports | |
from langflow.base.langchain_utilities.model import LCToolComponent | |
from langflow.inputs import DropdownInput, LinkInput, MessageTextInput, MultiselectInput, SecretStrInput, StrInput | |
from langflow.io import Output | |
class ComposioAPIComponent(LCToolComponent): | |
display_name: str = "Composio Tools" | |
description: str = "Use Composio toolset to run actions with your agent" | |
name = "ComposioAPI" | |
icon = "Composio" | |
documentation: str = "https://docs.composio.dev" | |
inputs = [ | |
# Basic configuration inputs | |
MessageTextInput(name="entity_id", display_name="Entity ID", value="default", advanced=True), | |
SecretStrInput( | |
name="api_key", | |
display_name="Composio API Key", | |
required=True, | |
# refresh_button=True, | |
info="Refer to https://docs.composio.dev/faq/api_key/api_key", | |
), | |
DropdownInput( | |
name="app_names", | |
display_name="App Name", | |
options=list(App.__annotations__), | |
value="", | |
info="The app name to use. Please refresh after selecting app name", | |
refresh_button=True, | |
), | |
# Authentication-related inputs (initially hidden) | |
SecretStrInput( | |
name="app_credentials", | |
display_name="App Credentials", | |
required=False, | |
dynamic=True, | |
show=False, | |
info="Credentials for app authentication (API Key, Password, etc)", | |
), | |
MessageTextInput( | |
name="username", | |
display_name="Username", | |
required=False, | |
dynamic=True, | |
show=False, | |
info="Username for Basic authentication", | |
), | |
LinkInput( | |
name="auth_link", | |
display_name="Authentication Link", | |
value="", | |
info="Click to authenticate with OAuth2", | |
dynamic=True, | |
show=False, | |
placeholder="Click to authenticate", | |
), | |
StrInput( | |
name="auth_status", | |
display_name="Auth Status", | |
value="Not Connected", | |
info="Current authentication status", | |
dynamic=True, | |
show=False, | |
), | |
MultiselectInput( | |
name="action_names", | |
display_name="Actions to use", | |
required=True, | |
options=[], | |
value=[], | |
info="The actions to pass to agent to execute", | |
dynamic=True, | |
show=False, | |
), | |
] | |
outputs = [ | |
Output(name="tools", display_name="Tools", method="build_tool"), | |
] | |
def _check_for_authorization(self, app: str) -> str: | |
"""Checks if the app is authorized. | |
Args: | |
app (str): The app name to check authorization for. | |
Returns: | |
str: The authorization status or URL. | |
""" | |
toolset = self._build_wrapper() | |
entity = toolset.client.get_entity(id=self.entity_id) | |
try: | |
# Check if user is already connected | |
entity.get_connection(app=app) | |
except NoItemsFound: | |
# Get auth scheme for the app | |
auth_scheme = self._get_auth_scheme(app) | |
return self._handle_auth_by_scheme(entity, app, auth_scheme) | |
except Exception: # noqa: BLE001 | |
logger.exception("Authorization error") | |
return "Error checking authorization" | |
else: | |
return f"{app} CONNECTED" | |
def _get_auth_scheme(self, app_name: str) -> AppAuthScheme: | |
"""Get the primary auth scheme for an app. | |
Args: | |
app_name (str): The name of the app to get auth scheme for. | |
Returns: | |
AppAuthScheme: The auth scheme details. | |
""" | |
toolset = self._build_wrapper() | |
try: | |
return toolset.get_auth_scheme_for_app(app=app_name.lower()) | |
except Exception: # noqa: BLE001 | |
logger.exception(f"Error getting auth scheme for {app_name}") | |
return None | |
def _handle_auth_by_scheme(self, entity: Any, app: str, auth_scheme: AppAuthScheme) -> str: | |
"""Handle authentication based on the auth scheme. | |
Args: | |
entity (Any): The entity instance. | |
app (str): The app name. | |
auth_scheme (AppAuthScheme): The auth scheme details. | |
Returns: | |
str: The authentication status or URL. | |
""" | |
auth_mode = auth_scheme.auth_mode | |
try: | |
# First check if already connected | |
entity.get_connection(app=app) | |
except NoItemsFound: | |
# If not connected, handle new connection based on auth mode | |
if auth_mode == "API_KEY": | |
if hasattr(self, "app_credentials") and self.app_credentials: | |
try: | |
entity.initiate_connection( | |
app_name=app, | |
auth_mode="API_KEY", | |
auth_config={"api_key": self.app_credentials}, | |
use_composio_auth=False, | |
force_new_integration=True, | |
) | |
except Exception as e: # noqa: BLE001 | |
logger.error(f"Error connecting with API Key: {e}") | |
return "Invalid API Key" | |
else: | |
return f"{app} CONNECTED" | |
return "Enter API Key" | |
if ( | |
auth_mode == "BASIC" | |
and hasattr(self, "username") | |
and hasattr(self, "app_credentials") | |
and self.username | |
and self.app_credentials | |
): | |
try: | |
entity.initiate_connection( | |
app_name=app, | |
auth_mode="BASIC", | |
auth_config={"username": self.username, "password": self.app_credentials}, | |
use_composio_auth=False, | |
force_new_integration=True, | |
) | |
except Exception as e: # noqa: BLE001 | |
logger.error(f"Error connecting with Basic Auth: {e}") | |
return "Invalid credentials" | |
else: | |
return f"{app} CONNECTED" | |
elif auth_mode == "BASIC": | |
return "Enter Username and Password" | |
if auth_mode == "OAUTH2": | |
try: | |
return self._initiate_default_connection(entity, app) | |
except Exception as e: # noqa: BLE001 | |
logger.error(f"Error initiating OAuth2: {e}") | |
return "OAuth2 initialization failed" | |
return "Unsupported auth mode" | |
except Exception as e: # noqa: BLE001 | |
logger.error(f"Error checking connection status: {e}") | |
return f"Error: {e!s}" | |
else: | |
return f"{app} CONNECTED" | |
def _initiate_default_connection(self, entity: Any, app: str) -> str: | |
connection = entity.initiate_connection(app_name=app, use_composio_auth=True, force_new_integration=True) | |
return connection.redirectUrl | |
def _get_connected_app_names_for_entity(self) -> list[str]: | |
toolset = self._build_wrapper() | |
connections = toolset.client.get_entity(id=self.entity_id).get_connections() | |
return list({connection.appUniqueId for connection in connections}) | |
def _get_normalized_app_name(self) -> str: | |
"""Get app name without connection status suffix. | |
Returns: | |
str: Normalized app name. | |
""" | |
return self.app_names.replace(" ✅", "").replace("_connected", "") | |
def update_build_config(self, build_config: dict, field_value: Any, field_name: str | None = None) -> dict: | |
# First, ensure all dynamic fields are hidden by default | |
dynamic_fields = ["app_credentials", "username", "auth_link", "auth_status", "action_names"] | |
for field in dynamic_fields: | |
if field in build_config: | |
if build_config[field]["value"] is None or build_config[field]["value"] == "": | |
build_config[field]["show"] = False | |
build_config[field]["advanced"] = True # Hide from main view | |
else: | |
build_config[field]["show"] = True | |
build_config[field]["advanced"] = False | |
if field_name in {"app_names"} and hasattr(self, "api_key") and self.api_key != "": | |
# app_name = self._get_normalized_app_name() | |
app_name = self.app_names | |
try: | |
toolset = self._build_wrapper() | |
entity = toolset.client.get_entity(id=self.entity_id) | |
# Always show auth_status when app is selected | |
build_config["auth_status"]["show"] = True | |
build_config["auth_status"]["advanced"] = False | |
try: | |
# Check if already connected | |
entity.get_connection(app=app_name) | |
build_config["auth_status"]["value"] = "✅" | |
build_config["auth_link"]["show"] = False | |
# Show action selection for connected apps | |
build_config["action_names"]["show"] = True | |
build_config["action_names"]["advanced"] = False | |
except NoItemsFound: | |
# Get auth scheme and show relevant fields | |
auth_scheme = self._get_auth_scheme(app_name) | |
auth_mode = auth_scheme.auth_mode | |
logger.info(f"Auth mode for {app_name}: {auth_mode}") | |
if auth_mode == "API_KEY": | |
build_config["app_credentials"]["show"] = True | |
build_config["app_credentials"]["advanced"] = False | |
build_config["app_credentials"]["display_name"] = "API Key" | |
build_config["auth_status"]["value"] = "Enter API Key" | |
elif auth_mode == "BASIC": | |
build_config["username"]["show"] = True | |
build_config["username"]["advanced"] = False | |
build_config["app_credentials"]["show"] = True | |
build_config["app_credentials"]["advanced"] = False | |
build_config["app_credentials"]["display_name"] = "Password" | |
build_config["auth_status"]["value"] = "Enter Username and Password" | |
elif auth_mode == "OAUTH2": | |
build_config["auth_link"]["show"] = True | |
build_config["auth_link"]["advanced"] = False | |
auth_url = self._initiate_default_connection(entity, app_name) | |
build_config["auth_link"]["value"] = auth_url | |
build_config["auth_status"]["value"] = "Click link to authenticate" | |
else: | |
build_config["auth_status"]["value"] = "Unsupported auth mode" | |
# Update action names if connected | |
if build_config["auth_status"]["value"] == "✅": | |
all_action_names = list(Action.__annotations__) | |
app_action_names = [ | |
action_name | |
for action_name in all_action_names | |
if action_name.lower().startswith(app_name.lower() + "_") | |
] | |
if build_config["action_names"]["options"] != app_action_names: | |
build_config["action_names"]["options"] = app_action_names | |
build_config["action_names"]["value"] = [app_action_names[0]] if app_action_names else [""] | |
except Exception as e: # noqa: BLE001 | |
logger.error(f"Error checking auth status: {e}, app: {app_name}") | |
build_config["auth_status"]["value"] = f"Error: {e!s}" | |
return build_config | |
def build_tool(self) -> Sequence[Tool]: | |
"""Build Composio tools based on selected actions. | |
Returns: | |
Sequence[Tool]: List of configured Composio tools. | |
""" | |
composio_toolset = self._build_wrapper() | |
return composio_toolset.get_tools(actions=self.action_names) | |
def _build_wrapper(self) -> ComposioToolSet: | |
"""Build the Composio toolset wrapper. | |
Returns: | |
ComposioToolSet: The initialized toolset. | |
Raises: | |
ValueError: If the API key is not found or invalid. | |
""" | |
try: | |
if not self.api_key: | |
msg = "Composio API Key is required" | |
raise ValueError(msg) | |
return ComposioToolSet(api_key=self.api_key) | |
except ValueError as e: | |
logger.error(f"Error building Composio wrapper: {e}") | |
msg = "Please provide a valid Composio API Key in the component settings" | |
raise ValueError(msg) from e | |