|
import os |
|
import json |
|
import asyncio |
|
import aiohttp |
|
import logging |
|
from dotenv import load_dotenv |
|
from http.client import HTTPSConnection |
|
from typing import List, Dict, Any |
|
from datetime import datetime, timezone |
|
from langchain.prompts import ChatPromptTemplate |
|
from src.utils.api_key_manager import with_api_manager |
|
from src.helpers.helper import remove_markdown |
|
|
|
|
|
ENV_FILE_PATH = os.getenv("WRITABLE_DIR", "/tmp") + "/.env" |
|
load_dotenv(ENV_FILE_PATH, override=True) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class MCPClient: |
|
def __init__(self): |
|
self.webhook_url = os.getenv("PIPEDREAM_WEBHOOK_URL") |
|
if not self.webhook_url: |
|
logger.warning("PIPEDREAM_WEBHOOK_URL not set in environment variables") |
|
|
|
|
|
self.timeout = aiohttp.ClientTimeout(total=60) |
|
|
|
|
|
@with_api_manager() |
|
async def fetch_app_data( |
|
self, |
|
provider: str, |
|
services: List[str], |
|
query: str, |
|
user_id: str, |
|
access_token: str, |
|
*, |
|
llm |
|
) -> Dict[str, Any]: |
|
if not self.webhook_url: |
|
logger.error("Pipedream webhook URL not configured") |
|
return {"error": "Pipedream integration not configured"} |
|
|
|
|
|
print(f"=== MCP fetch_app_data called ===") |
|
print(f"Provider: {provider}") |
|
print(f"Services: {services}") |
|
print(f"Query: {query}") |
|
print(f"User ID: {user_id}") |
|
print(f"Access token exists: {bool(access_token)}") |
|
print(f"Access token length: {len(access_token) if access_token else 0}") |
|
print("==================================") |
|
|
|
|
|
if not access_token: |
|
logger.error(f"No access token for {provider}! Cannot proceed.") |
|
return {"error": f"No authentication token for {provider}"} |
|
|
|
|
|
if query.startswith("This is the previous context of the conversation:"): |
|
prompt = \ |
|
"""You are an expert at re-writing queries according to the context of the conversation. |
|
Your task is to re-write the query to be more specific and relevant to the previous context provided below, but ONLY if necessary. |
|
|
|
However, you MUST return the current query (and ONLY the current query) as it is if no changes are needed. |
|
The decision whether to re-write the query or not is based on the previous context of the conversation. |
|
|
|
Your output should be ONLY the re-written or current query, without any additional text or formatting. |
|
|
|
{query}""" |
|
prompt_template = ChatPromptTemplate.from_template(prompt) |
|
messages = prompt_template.format_messages(query=query) |
|
response = await llm.ainvoke(messages) |
|
query = remove_markdown(response.content.strip()) |
|
|
|
payload = { |
|
"provider": provider, |
|
"services": services, |
|
"query": query, |
|
"user_id": user_id, |
|
"token": access_token, |
|
"timestamp": datetime.now(timezone.utc).isoformat() |
|
} |
|
|
|
|
|
headers = {'Content-Type': 'application/json'} |
|
|
|
print(f"Fetching {provider} data for services: {services}") |
|
print(f"Payload to send: {json.dumps({**payload, 'token': "REDACTED" if payload['token'] else None}, indent=2)}") |
|
|
|
|
|
host = self.webhook_url.replace("https://", "").replace("http://", "").strip() |
|
|
|
try: |
|
|
|
conn = HTTPSConnection(host, timeout=self.timeout.total) |
|
conn.request("POST", "/", f"""{json.dumps(payload)}""", headers) |
|
response = conn.getresponse() |
|
print(f"Response status: {response.status}") |
|
print(f"Response headers: {dict(response.getheaders())}") |
|
if response.status == 200: |
|
try: |
|
|
|
data = json.loads(response.read().decode()) |
|
if data is None or (isinstance(data, dict) and not data): |
|
logger.warning("Pipedream returned a null response.") |
|
return {"error": "Received no data from the provider."} |
|
except json.JSONDecodeError: |
|
logger.error("Pipedream returned a non-JSON or empty response.") |
|
return {"error": "Invalid response from the provider."} |
|
|
|
auth_error = False |
|
for service_key in data: |
|
if isinstance(data[service_key], dict) and data[service_key].get('error'): |
|
error_details = data[service_key].get('details', '') |
|
if '401' in str(error_details) or 'authError' in str(error_details) or 'UNAUTHENTICATED' in str(error_details): |
|
auth_error = True |
|
break |
|
if auth_error: |
|
logger.error(f"Authentication failed for {provider}") |
|
return {"error": f"Authentication failed for {provider}. Please reconnect your account."} |
|
|
|
logger.info(f"Successfully fetched {provider} data") |
|
return data |
|
else: |
|
error_text = response.read().decode() |
|
logger.error(f"Pipedream request failed: {response.status} - {error_text}") |
|
return {"error": f"Failed to fetch data: {response.status} - {error_text}"} |
|
except asyncio.TimeoutError: |
|
logger.error("Pipedream request timed out") |
|
return {"error": "Request timed out. Please try again."} |
|
except aiohttp.ClientError as e: |
|
logger.error(f"Network error calling Pipedream: {str(e)}") |
|
return {"error": "Network error. Please check your connection."} |
|
except Exception as e: |
|
|
|
logger.error(f"Unexpected error calling Pipedream: {str(e)}") |
|
return {"error": "An unexpected error occurred"} |
|
|
|
|
|
def format_as_context(self, provider: str, data: Dict[str, Any]) -> str: |
|
if not data or "error" in data: |
|
return "" |
|
|
|
context = f"\n[{provider.upper()} {"WORKSPACE" if provider == "slack" else "APP"} DATA]\n" |
|
context += "=" * 50 + "\n" |
|
|
|
|
|
if provider == "google": |
|
context += self._format_google_data(data) |
|
elif provider == "microsoft": |
|
context += self._format_microsoft_data(data) |
|
elif provider == "slack": |
|
context += self._format_slack_data(data) |
|
else: |
|
context += f"Unknown provider: {provider}\n" |
|
|
|
context += "=" * 50 + "\n" |
|
return context |
|
|
|
|
|
def _format_google_data(self, data: Dict[str, Any]) -> str: |
|
formatted = "" |
|
|
|
|
|
if "drive" in data and isinstance(data["drive"], dict) and "files" in data["drive"]: |
|
formatted += "\n📁 GOOGLE DRIVE FILES:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
files = data["drive"]["files"] |
|
if not files: |
|
formatted += "No files found matching the query.\n" |
|
else: |
|
for i, file in enumerate(files[:10], 1): |
|
formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" |
|
formatted += f" Type: {file.get('mimeType', 'Unknown')}\n" |
|
formatted += f" Modified: {file.get('modifiedTime', 'Unknown')}\n" |
|
|
|
if file.get('webViewLink'): |
|
formatted += f" Link: {file['webViewLink']}\n" |
|
|
|
if file.get('content'): |
|
content_preview = file['content'][:500] |
|
if len(file['content']) > 500: |
|
content_preview += "..." |
|
formatted += f" Content Preview:\n {content_preview}\n" |
|
|
|
formatted += "\n" |
|
|
|
|
|
if "gmail" in data and isinstance(data["gmail"], dict) and "messages" in data["gmail"]: |
|
formatted += "\n📧 GMAIL MESSAGES:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
messages = data["gmail"]["messages"] |
|
if not messages: |
|
formatted += "No messages found matching the query.\n" |
|
else: |
|
for i, msg in enumerate(messages[:10], 1): |
|
formatted += f"\n{i}. From: {msg.get('from', 'Unknown')}\n" |
|
formatted += f" Subject: {msg.get('subject', 'No subject')}\n" |
|
|
|
body_preview = msg.get('body', '')[:300] |
|
if msg.get('body', '') and len(msg['body']) > 300: |
|
body_preview += "..." |
|
formatted += f" Preview: {body_preview}\n" |
|
|
|
|
|
if "calendar" in data and isinstance(data["calendar"], dict) and "events" in data["calendar"]: |
|
formatted += "\n📅 GOOGLE CALENDAR EVENTS:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
events = data["calendar"]["events"] |
|
if not events: |
|
formatted += "No calendar events found matching the query.\n" |
|
else: |
|
for i, event in enumerate(events[:10], 1): |
|
formatted += f"\n{i}. Event: {event.get('summary', 'No title')}\n" |
|
formatted += f" Time: {event.get('start', 'Unknown')}\n" |
|
|
|
if event.get('location'): |
|
formatted += f" Location: {event['location']}\n" |
|
|
|
if event.get('description'): |
|
desc_preview = event['description'][:200] |
|
if len(event['description']) > 200: |
|
desc_preview += "..." |
|
formatted += f" Description: {desc_preview}\n" |
|
|
|
|
|
if "docs" in data and isinstance(data["docs"], dict) and "docs" in data["docs"]: |
|
formatted += "\n📄 GOOGLE DOCS:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
docs = data["docs"]["docs"] |
|
if not docs: |
|
formatted += "No documents found matching the query.\n" |
|
else: |
|
for i, doc in enumerate(docs[:5], 1): |
|
formatted += f"\n{i}. Document: {doc.get('name', 'Unknown')}\n" |
|
formatted += f" Modified: {doc.get('modifiedTime', 'Unknown')}\n" |
|
|
|
if doc.get('content'): |
|
content_preview = doc['content'][:500] |
|
if len(doc['content']) > 500: |
|
content_preview += "..." |
|
formatted += f" Content Preview:\n {content_preview}\n" |
|
|
|
|
|
if "sheets" in data and isinstance(data["sheets"], dict) and "sheets" in data["sheets"]: |
|
formatted += "\n📊 GOOGLE SHEETS:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
sheets = data["sheets"]["sheets"] |
|
if not sheets: |
|
formatted += "No spreadsheets found matching the query.\n" |
|
else: |
|
for i, sheet in enumerate(sheets[:5], 1): |
|
formatted += f"\n{i}. Spreadsheet: {sheet.get('name', 'Unknown')}\n" |
|
formatted += f" Modified: {sheet.get('modifiedTime', 'Unknown')}\n" |
|
|
|
if sheet.get('content'): |
|
content_preview = sheet['content'][:300] |
|
if len(sheet['content']) > 300: |
|
content_preview += "..." |
|
formatted += f" Data Preview:\n {content_preview}\n" |
|
|
|
|
|
if "tasks" in data and isinstance(data["tasks"], dict) and "tasks" in data["tasks"]: |
|
formatted += "\n✅ GOOGLE TASKS:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
tasks = data["tasks"]["tasks"] |
|
if not tasks: |
|
formatted += "No tasks found matching the query.\n" |
|
else: |
|
for i, task in enumerate(tasks[:10], 1): |
|
formatted += f"\n{i}. Task: {task.get('title', 'No title')}\n" |
|
formatted += f" List: {task.get('listTitle', 'Unknown')}\n" |
|
formatted += f" Status: {task.get('status', 'Unknown')}\n" |
|
|
|
if task.get('notes'): |
|
formatted += f" Notes: {task['notes'][:200]}...\n" |
|
|
|
if task.get('due'): |
|
formatted += f" Due: {task['due']}\n" |
|
|
|
|
|
|
|
return formatted |
|
|
|
|
|
def _format_microsoft_data(self, data: Dict[str, Any]) -> str: |
|
formatted = "" |
|
|
|
|
|
if "word" in data and isinstance(data["word"], dict) and "documents" in data["word"]: |
|
formatted += "\n📄 MICROSOFT WORD DOCUMENTS:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
documents = data["word"]["documents"] |
|
if not documents: |
|
formatted += "No documents found matching the query.\n" |
|
else: |
|
for i, doc in enumerate(documents[:5], 1): |
|
formatted += f"\n{i}. Document: {doc.get('name', 'Unknown')}\n" |
|
formatted += f" Modified: {doc.get('lastModifiedDateTime', 'Unknown')}\n" |
|
|
|
if doc.get('content'): |
|
content_preview = doc['content'][:500] |
|
if len(doc['content']) > 500: |
|
content_preview += "..." |
|
formatted += f" Content Preview:\n {content_preview}\n" |
|
|
|
|
|
if "excel" in data and isinstance(data["excel"], dict) and "workbooks" in data["excel"]: |
|
formatted += "\n📊 MICROSOFT EXCEL WORKBOOKS:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
workbooks = data["excel"]["workbooks"] |
|
if not workbooks: |
|
formatted += "No workbooks found matching the query.\n" |
|
else: |
|
for i, wb in enumerate(workbooks[:5], 1): |
|
formatted += f"\n{i}. Workbook: {wb.get('name', 'Unknown')}\n" |
|
formatted += f" Modified: {wb.get('lastModifiedDateTime', 'Unknown')}\n" |
|
|
|
if wb.get('content'): |
|
content_preview = wb['content'][:500] |
|
if len(wb['content']) > 500: |
|
content_preview += "..." |
|
formatted += f" Content Preview:\n {content_preview}\n" |
|
|
|
|
|
if "powerpoint" in data and isinstance(data["powerpoint"], dict) and "presentations" in data["powerpoint"]: |
|
formatted += "\n📊 MICROSOFT POWERPOINT PRESENTATIONS:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
presentations = data["powerpoint"]["presentations"] |
|
if not presentations: |
|
formatted += "No presentations found matching the query.\n" |
|
else: |
|
for i, pres in enumerate(presentations[:5], 1): |
|
formatted += f"\n{i}. Presentation: {pres.get('name', 'Unknown')}\n" |
|
formatted += f" Modified: {pres.get('lastModifiedDateTime', 'Unknown')}\n" |
|
|
|
if pres.get('content'): |
|
content_preview = pres['content'][:500] |
|
if len(pres['content']) > 500: |
|
content_preview += "..." |
|
formatted += f" Content Preview:\n {content_preview}\n" |
|
|
|
|
|
if "onedrive" in data and isinstance(data["onedrive"], dict) and "files" in data["onedrive"]: |
|
formatted += "\n📁 ONEDRIVE FILES:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
files = data["onedrive"]["files"] |
|
if not files: |
|
formatted += "No files found matching the query.\n" |
|
else: |
|
for i, file in enumerate(files[:10], 1): |
|
formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" |
|
formatted += f" Modified: {file.get('lastModified', 'Unknown')}\n" |
|
|
|
if file.get('webUrl'): |
|
formatted += f" URL: {file['webUrl']}\n" |
|
|
|
if file.get('content'): |
|
content_preview = file['content'][:500] |
|
if len(file['content']) > 500: |
|
content_preview += "..." |
|
formatted += f" Content Preview:\n {content_preview}\n" |
|
|
|
|
|
if "outlook" in data and isinstance(data["outlook"], dict) and "messages" in data["outlook"]: |
|
formatted += "\n📧 OUTLOOK MESSAGES:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
messages = data["outlook"]["messages"] |
|
if not messages: |
|
formatted += "No messages found matching the query.\n" |
|
else: |
|
for i, msg in enumerate(messages[:10], 1): |
|
formatted += f"\n{i}. From: {msg.get('from', 'Unknown')}\n" |
|
formatted += f" Subject: {msg.get('subject', 'No subject')}\n" |
|
|
|
body_preview = msg.get('body', '')[:300] |
|
if msg.get('body', '') and len(msg['body']) > 300: |
|
body_preview += "..." |
|
formatted += f" Preview: {body_preview}\n" |
|
|
|
|
|
if "onenote" in data and isinstance(data["onenote"], dict) and "pages" in data["onenote"]: |
|
formatted += "\n📓 ONENOTE PAGES:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
pages = data["onenote"]["pages"] |
|
if not pages: |
|
formatted += "No pages found matching the query.\n" |
|
else: |
|
for i, page in enumerate(pages[:10], 1): |
|
formatted += f"\n{i}. Page: {page.get('title', 'Unknown')}\n" |
|
formatted += f" Section: {page.get('parentSection', 'Unknown')}\n" |
|
formatted += f" Modified: {page.get('lastModifiedDateTime', 'Unknown')}\n" |
|
|
|
if page.get('contentPreview'): |
|
formatted += f" Preview: {page['contentPreview'][:200]}...\n" |
|
|
|
|
|
if "todo" in data and isinstance(data["todo"], dict) and "tasks" in data["todo"]: |
|
formatted += "\n✅ MICROSOFT TO DO:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
tasks = data["todo"]["tasks"] |
|
if not tasks: |
|
formatted += "No tasks found matching the query.\n" |
|
else: |
|
for i, task in enumerate(tasks[:10], 1): |
|
formatted += f"\n{i}. Task: {task.get('title', 'No title')}\n" |
|
formatted += f" List: {task.get('listName', 'Unknown')}\n" |
|
formatted += f" Status: {'Completed' if task.get('isCompleted') else 'Pending'}\n" |
|
|
|
if task.get('body', {}).get('content'): |
|
formatted += f" Notes: {task['body']['content'][:200]}...\n" |
|
|
|
if task.get('dueDateTime'): |
|
formatted += f" Due: {task['dueDateTime']['dateTime']}\n" |
|
|
|
|
|
if "exchange" in data and isinstance(data["exchange"], dict) and "events" in data["exchange"]: |
|
formatted += "\n📅 EXCHANGE CALENDAR:\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
events = data["exchange"]["events"] |
|
if not events: |
|
formatted += "No calendar events found matching the query.\n" |
|
else: |
|
for i, event in enumerate(events[:10], 1): |
|
formatted += f"\n{i}. Event: {event.get('subject', 'No subject')}\n" |
|
formatted += f" Start: {event.get('start', {}).get('dateTime', 'Unknown')}\n" |
|
formatted += f" End: {event.get('end', {}).get('dateTime', 'Unknown')}\n" |
|
|
|
if event.get('location', {}).get('displayName'): |
|
formatted += f" Location: {event['location']['displayName']}\n" |
|
|
|
if event.get('bodyPreview'): |
|
formatted += f" Preview: {event['bodyPreview'][:200]}...\n" |
|
|
|
return formatted |
|
|
|
|
|
def _format_slack_data(self, data: Dict[str, Any]) -> str: |
|
formatted = "" |
|
|
|
|
|
if "workspace" in data and isinstance(data["workspace"], dict): |
|
ws = data["workspace"] |
|
formatted += f"\n📍 Workspace: {ws.get('name', 'Unknown')} ({ws.get('domain', 'N/A')})\n" |
|
if ws.get('enterprise_name'): |
|
formatted += f" Enterprise: {ws['enterprise_name']}\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
|
|
if "users" in data and isinstance(data["users"], list): |
|
users = data["users"] |
|
formatted += f"\n👥 TEAM MEMBERS ({len(users)} total):\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
|
|
admins = [u for u in users if u.get('is_admin') or u.get('is_owner')] |
|
if admins: |
|
formatted += "Leadership:\n" |
|
for admin in admins[:5]: |
|
role = "Owner" if admin.get('is_primary_owner') else ("Admin" if admin.get('is_admin') else "Owner") |
|
formatted += f" • {admin.get('real_name', admin.get('name', 'Unknown'))} (@{admin.get('name', '')}) - {role}\n" |
|
if admin.get('title'): |
|
formatted += f" Title: {admin['title']}\n" |
|
|
|
|
|
regular_users = [u for u in users if not (u.get('is_admin') or u.get('is_owner'))][:10] |
|
if regular_users: |
|
formatted += "\nTeam Members (sample):\n" |
|
for user in regular_users: |
|
formatted += f" • {user.get('real_name', user.get('name', 'Unknown'))} (@{user.get('name', '')})" |
|
if user.get('title'): |
|
formatted += f" - {user['title']}" |
|
if user.get('presence'): |
|
formatted += f" [{user['presence']}]" |
|
formatted += "\n" |
|
|
|
|
|
if "channels" in data and isinstance(data["channels"], list): |
|
channels = data["channels"] |
|
formatted += f"\n📢 CHANNELS ({len(channels)} total):\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
|
|
public_channels = [ch for ch in channels if not ch.get('is_private')] |
|
private_channels = [ch for ch in channels if ch.get('is_private')] |
|
|
|
if public_channels: |
|
formatted += f"Public Channels ({len(public_channels)}):\n" |
|
for ch in public_channels[:10]: |
|
formatted += f" • #{ch.get('name', 'Unknown')}" |
|
if ch.get('num_members'): |
|
formatted += f" ({ch['num_members']} members)" |
|
if ch.get('topic'): |
|
formatted += f"\n Topic: {ch['topic'][:50]}..." |
|
formatted += "\n" |
|
|
|
if private_channels: |
|
formatted += f"\nPrivate Channels ({len(private_channels)}):\n" |
|
for ch in private_channels[:5]: |
|
formatted += f" • 🔒 {ch.get('name', 'Unknown')}" |
|
if ch.get('num_members'): |
|
formatted += f" ({ch['num_members']} members)" |
|
formatted += "\n" |
|
|
|
|
|
channels_with_messages = [ch for ch in channels if ch.get('recent_messages')] |
|
if channels_with_messages: |
|
formatted += "\n💬 RECENT CHANNEL ACTIVITY:\n" |
|
formatted += "-" * 30 + "\n" |
|
for ch in channels_with_messages[:3]: |
|
formatted += f"\n#{ch.get('name', 'Unknown')}:\n" |
|
for msg in ch.get('recent_messages', [])[:3]: |
|
formatted += f" • {msg.get('user', 'Unknown')}: {msg.get('text', '')[:100]}" |
|
if msg.get('reply_count'): |
|
formatted += f" (🧵 {msg['reply_count']} replies)" |
|
formatted += "\n" |
|
|
|
|
|
if "searchResults" in data and isinstance(data["searchResults"], dict): |
|
search = data["searchResults"] |
|
|
|
|
|
if search.get("messages") and isinstance(search["messages"], list): |
|
messages = search["messages"] |
|
formatted += f"\n🔍 MESSAGE SEARCH RESULTS ({len(messages)} found):\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
for i, msg in enumerate(messages[:10], 1): |
|
formatted += f"\n{i}. User: {msg.get('user', 'Unknown')}\n" |
|
formatted += f" Channel: #{msg.get('channel', {}).get('name', 'Unknown')}\n" |
|
formatted += f" Message: {msg.get('text', '')[:200]}" |
|
if len(msg.get('text', '')) > 200: |
|
formatted += "..." |
|
formatted += "\n" |
|
|
|
if msg.get('permalink'): |
|
formatted += f" Link: {msg['permalink']}\n" |
|
|
|
if msg.get('reactions'): |
|
reactions_str = " ".join([f"{r['name']}:{r['count']}" for r in msg['reactions'][:5]]) |
|
formatted += f" Reactions: {reactions_str}\n" |
|
|
|
|
|
if search.get("files") and isinstance(search["files"], list): |
|
files = search["files"] |
|
formatted += f"\n📎 FILE SEARCH RESULTS ({len(files)} found):\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
for i, file in enumerate(files[:10], 1): |
|
formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" |
|
formatted += f" Type: {file.get('filetype', 'Unknown').upper()}\n" |
|
formatted += f" Size: {self._format_file_size(file.get('size', 0))}\n" |
|
formatted += f" Uploaded by: {file.get('user', 'Unknown')}\n" |
|
|
|
if file.get('channels'): |
|
formatted += f" Shared in: {len(file['channels'])} channel(s)\n" |
|
|
|
|
|
if "files" in data and isinstance(data["files"], list) and data["files"]: |
|
formatted += f"\n📂 RECENT FILES ({len(data['files'])} shown):\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
for i, file in enumerate(data["files"][:10], 1): |
|
formatted += f"\n{i}. {file.get('name', 'Unknown')}" |
|
if file.get('title') and file['title'] != file.get('name'): |
|
formatted += f" ({file['title']})" |
|
formatted += f"\n Type: {file.get('filetype', 'Unknown').upper()}" |
|
formatted += f" | Size: {self._format_file_size(file.get('size', 0))}\n" |
|
|
|
if file.get('comments_count'): |
|
formatted += f" 💬 {file['comments_count']} comment(s)\n" |
|
|
|
|
|
if "usergroups" in data and isinstance(data["usergroups"], list) and data["usergroups"]: |
|
formatted += f"\n👥 USER GROUPS ({len(data['usergroups'])} total):\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
for group in data["usergroups"][:10]: |
|
formatted += f"\n• {group.get('name', 'Unknown')} (@{group.get('handle', '')})\n" |
|
if group.get('description'): |
|
formatted += f" Description: {group['description'][:100]}" |
|
if len(group['description']) > 100: |
|
formatted += "..." |
|
formatted += "\n" |
|
if group.get('user_count'): |
|
formatted += f" Members: {group['user_count']}\n" |
|
|
|
|
|
if "emoji" in data and isinstance(data["emoji"], dict) and data["emoji"]: |
|
emoji_list = list(data["emoji"].keys()) |
|
formatted += f"\n😊 CUSTOM EMOJI ({len(emoji_list)} total):\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
|
|
sample_emoji = emoji_list[:20] |
|
formatted += "Sample: " + " ".join([f":{e}:" for e in sample_emoji]) |
|
if len(emoji_list) > 20: |
|
formatted += f" ... and {len(emoji_list) - 20} more" |
|
formatted += "\n" |
|
|
|
|
|
if "stars" in data and isinstance(data.get("stars"), list) and data["stars"]: |
|
formatted += f"\n⭐ STARRED ITEMS ({len(data['stars'])} total):\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
|
|
star_types = {} |
|
for star in data["stars"]: |
|
star_type = star.get('type', 'unknown') |
|
star_types[star_type] = star_types.get(star_type, 0) + 1 |
|
|
|
for star_type, count in star_types.items(): |
|
formatted += f" • {star_type.capitalize()}: {count}\n" |
|
|
|
|
|
if "reminders" in data and isinstance(data.get("reminders"), list) and data["reminders"]: |
|
formatted += f"\n⏰ REMINDERS ({len(data['reminders'])} active):\n" |
|
formatted += "-" * 30 + "\n" |
|
|
|
for i, reminder in enumerate(data["reminders"][:5], 1): |
|
formatted += f"\n{i}. {reminder.get('text', 'No description')}\n" |
|
if reminder.get('recurring'): |
|
formatted += " 🔄 Recurring\n" |
|
if reminder.get('user') and reminder['user'] != reminder.get('creator'): |
|
formatted += f" For: {reminder['user']}\n" |
|
|
|
|
|
if "error" in data: |
|
formatted += f"\n⚠️ ERROR: {data['error']}\n" |
|
if "partialResults" in data: |
|
formatted += "(Showing partial results above)\n" |
|
|
|
|
|
formatted += "\n" + "=" * 50 + "\n" |
|
formatted += "📊 SUMMARY:\n" |
|
if "workspace" in data: |
|
formatted += f"• Workspace: {data.get('workspace', {}).get('name', 'Unknown')}\n" |
|
if "users" in data: |
|
formatted += f"• Total Users: {len(data.get('users', []))}\n" |
|
if "channels" in data: |
|
formatted += f"• Total Channels: {len(data.get('channels', []))}\n" |
|
if "searchResults" in data: |
|
search = data["searchResults"] |
|
if search.get("messages"): |
|
formatted += f"• Messages Found: {len(search.get('messages', []))}\n" |
|
if search.get("files"): |
|
formatted += f"• Files Found: {len(search.get('files', []))}\n" |
|
if "emoji" in data and data["emoji"]: |
|
formatted += f"• Custom Emoji: {len(data.get('emoji', {}))}\n" |
|
|
|
return formatted |
|
|
|
|
|
def _format_file_size(self, size_bytes: int) -> str: |
|
if size_bytes < 1024: |
|
return f"{size_bytes} B" |
|
elif size_bytes < 1024 * 1024: |
|
return f"{size_bytes / 1024:.1f} KB" |
|
elif size_bytes < 1024 * 1024 * 1024: |
|
return f"{size_bytes / (1024 * 1024):.1f} MB" |
|
else: |
|
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" |
|
|
|
if __name__ == "__main__": |
|
|
|
client = MCPClient() |
|
print("\n\n-----Fetching app data...------\n\n") |
|
print(asyncio.run(client.fetch_app_data( |
|
provider="slack", |
|
services=[], |
|
query="give me a detailed summary of my slack workspace", |
|
user_id="b45c4267-a357-4dff-9c7e-c421d8c8b659", |
|
access_token="xoxp-9146298917605-9146298924117-9248981137408-2bd4faadc77fb2c47d7b5adde7a70eaf" |
|
))) |