Tai Truong
fix readme
d202ada
raw
history blame
4.54 kB
import json
from typing import Any
import requests
from langchain.tools import StructuredTool
from loguru import logger
from pydantic import BaseModel, Field
from langflow.base.langchain_utilities.model import LCToolComponent
from langflow.field_typing import Tool
from langflow.inputs import MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data
class NotionPageUpdate(LCToolComponent):
display_name: str = "Update Page Property "
description: str = "Update the properties of a Notion page."
documentation: str = "https://docs.langflow.org/integrations/notion/page-update"
icon = "NotionDirectoryLoader"
inputs = [
StrInput(
name="page_id",
display_name="Page ID",
info="The ID of the Notion page to update.",
),
MultilineInput(
name="properties",
display_name="Properties",
info="The properties to update on the page (as a JSON string or a dictionary).",
),
SecretStrInput(
name="notion_secret",
display_name="Notion Secret",
info="The Notion integration token.",
required=True,
),
]
class NotionPageUpdateSchema(BaseModel):
page_id: str = Field(..., description="The ID of the Notion page to update.")
properties: str | dict[str, Any] = Field(
..., description="The properties to update on the page (as a JSON string or a dictionary)."
)
def run_model(self) -> Data:
result = self._update_notion_page(self.page_id, self.properties)
if isinstance(result, str):
# An error occurred, return it as text
return Data(text=result)
# Success, return the updated page data
output = "Updated page properties:\n"
for prop_name, prop_value in result.get("properties", {}).items():
output += f"{prop_name}: {prop_value}\n"
return Data(text=output, data=result)
def build_tool(self) -> Tool:
return StructuredTool.from_function(
name="update_notion_page",
description="Update the properties of a Notion page. "
"IMPORTANT: Use the tool to check the Database properties for more details before using this tool.",
func=self._update_notion_page,
args_schema=self.NotionPageUpdateSchema,
)
def _update_notion_page(self, page_id: str, properties: str | dict[str, Any]) -> dict[str, Any] | str:
url = f"https://api.notion.com/v1/pages/{page_id}"
headers = {
"Authorization": f"Bearer {self.notion_secret}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28", # Use the latest supported version
}
# Parse properties if it's a string
if isinstance(properties, str):
try:
parsed_properties = json.loads(properties)
except json.JSONDecodeError as e:
error_message = f"Invalid JSON format for properties: {e}"
logger.exception(error_message)
return error_message
else:
parsed_properties = properties
data = {"properties": parsed_properties}
try:
logger.info(f"Sending request to Notion API: URL: {url}, Data: {json.dumps(data)}")
response = requests.patch(url, headers=headers, json=data, timeout=10)
response.raise_for_status()
updated_page = response.json()
logger.info(f"Successfully updated Notion page. Response: {json.dumps(updated_page)}")
except requests.exceptions.HTTPError as e:
error_message = f"HTTP Error occurred: {e}"
if e.response is not None:
error_message += f"\nStatus code: {e.response.status_code}"
error_message += f"\nResponse body: {e.response.text}"
logger.exception(error_message)
return error_message
except requests.exceptions.RequestException as e:
error_message = f"An error occurred while making the request: {e}"
logger.exception(error_message)
return error_message
except Exception as e: # noqa: BLE001
error_message = f"An unexpected error occurred: {e}"
logger.exception(error_message)
return error_message
return updated_page
def __call__(self, *args, **kwargs):
return self._update_notion_page(*args, **kwargs)