Tai Truong
fix readme
d202ada
import json
from typing import Any
import requests
from langchain.tools import StructuredTool
from loguru import logger
from pydantic import BaseModel, Field
from langflow.base.langchain_utilities.model import LCToolComponent
from langflow.field_typing import Tool
from langflow.inputs import MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data
class NotionPageUpdate(LCToolComponent):
display_name: str = "Update Page Property "
description: str = "Update the properties of a Notion page."
documentation: str = "https://docs.langflow.org/integrations/notion/page-update"
icon = "NotionDirectoryLoader"
inputs = [
StrInput(
name="page_id",
display_name="Page ID",
info="The ID of the Notion page to update.",
),
MultilineInput(
name="properties",
display_name="Properties",
info="The properties to update on the page (as a JSON string or a dictionary).",
),
SecretStrInput(
name="notion_secret",
display_name="Notion Secret",
info="The Notion integration token.",
required=True,
),
]
class NotionPageUpdateSchema(BaseModel):
page_id: str = Field(..., description="The ID of the Notion page to update.")
properties: str | dict[str, Any] = Field(
..., description="The properties to update on the page (as a JSON string or a dictionary)."
)
def run_model(self) -> Data:
result = self._update_notion_page(self.page_id, self.properties)
if isinstance(result, str):
# An error occurred, return it as text
return Data(text=result)
# Success, return the updated page data
output = "Updated page properties:\n"
for prop_name, prop_value in result.get("properties", {}).items():
output += f"{prop_name}: {prop_value}\n"
return Data(text=output, data=result)
def build_tool(self) -> Tool:
return StructuredTool.from_function(
name="update_notion_page",
description="Update the properties of a Notion page. "
"IMPORTANT: Use the tool to check the Database properties for more details before using this tool.",
func=self._update_notion_page,
args_schema=self.NotionPageUpdateSchema,
)
def _update_notion_page(self, page_id: str, properties: str | dict[str, Any]) -> dict[str, Any] | str:
url = f"https://api.notion.com/v1/pages/{page_id}"
headers = {
"Authorization": f"Bearer {self.notion_secret}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28", # Use the latest supported version
}
# Parse properties if it's a string
if isinstance(properties, str):
try:
parsed_properties = json.loads(properties)
except json.JSONDecodeError as e:
error_message = f"Invalid JSON format for properties: {e}"
logger.exception(error_message)
return error_message
else:
parsed_properties = properties
data = {"properties": parsed_properties}
try:
logger.info(f"Sending request to Notion API: URL: {url}, Data: {json.dumps(data)}")
response = requests.patch(url, headers=headers, json=data, timeout=10)
response.raise_for_status()
updated_page = response.json()
logger.info(f"Successfully updated Notion page. Response: {json.dumps(updated_page)}")
except requests.exceptions.HTTPError as e:
error_message = f"HTTP Error occurred: {e}"
if e.response is not None:
error_message += f"\nStatus code: {e.response.status_code}"
error_message += f"\nResponse body: {e.response.text}"
logger.exception(error_message)
return error_message
except requests.exceptions.RequestException as e:
error_message = f"An error occurred while making the request: {e}"
logger.exception(error_message)
return error_message
except Exception as e: # noqa: BLE001
error_message = f"An unexpected error occurred: {e}"
logger.exception(error_message)
return error_message
return updated_page
def __call__(self, *args, **kwargs):
return self._update_notion_page(*args, **kwargs)