Spaces:
Building
Building
File size: 16,196 Bytes
7a8853f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
from mcp.server.fastmcp import FastMCP
import random
import time
from litellm import completion
import shlex
from subprocess import Popen, PIPE
from threading import Timer
import os
import glob
import http.client
import json
import openpyxl
import shutil
from google import genai
client = genai.Client(api_key="AIzaSyAQgAtQPpY0bQaCqCISGxeyF6tpDePx-Jg")
source_dir = "/app/uploads/temp"
destination_dir = "/app/code_interpreter"
files_list=[]
downloaded_files=[]
# os.environ.get('GROQ_API_KEY')
os.environ["GROQ_API_KEY"] ="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"
os.environ["GEMINI_API_KEY"] ="AIzaSyBPfR-HG_HeUgLF0LYW1XQgQUxFF6jF_0U"
os.environ["OPENROUTER_API_KEY"] = "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"
mcp = FastMCP("code_sandbox")
data={}
result=""
stdout=""
stderr=""
import requests
import os
from bs4 import BeautifulSoup # For parsing HTML
def download_all_files(base_url, files_endpoint, download_directory):
"""Downloads all files listed on the server's /upload page."""
global downloaded_files
# Create the download directory if it doesn't exist
if not os.path.exists(download_directory):
os.makedirs(download_directory)
try:
# 1. Get the HTML of the /upload page
files_url = f"{base_url}{files_endpoint}"
response = requests.get(files_url)
response.raise_for_status() # Check for HTTP errors
# 2. Parse the HTML using BeautifulSoup
soup = BeautifulSoup(response.content, "html.parser")
# 3. Find all the <a> (anchor) tags, which represent the links to the files
# This assumes the file links are inside <a> tags as shown in the server code
file_links = soup.find_all("a")
# 4. Iterate through the links and download the files
for link in file_links:
try:
file_url = link.get("href") # Extract the href attribute (the URL)
if file_url:
# Construct the full file URL if the href is relative
if not file_url.startswith("http"):
file_url = f"{base_url}{file_url}" # Relative URLs
filename = os.path.basename(file_url) # Extract the filename from the URL
file_path = os.path.join(download_directory, filename)
if filename in downloaded_files:
pass
else:
downloaded_files.append(filename)
print(f"Downloading: {filename} from {file_url}")
# Download the file
file_response = requests.get(file_url, stream=True) # Use stream=True for large files
file_response.raise_for_status() # Check for HTTP errors
with open(file_path, "wb") as file: # Open in binary write mode
for chunk in file_response.iter_content(chunk_size=8192): # Iterate and write in chunks (good for large files)
if chunk: # filter out keep-alive new chunks
file.write(chunk)
print(f"Downloaded: {filename} to {file_path}")
except requests.exceptions.RequestException as e:
print(f"Error downloading {link.get('href')}: {e}")
except OSError as e: #Handles potential issues with file permissions or disk space.
print(f"Error saving {filename}: {e}")
except requests.exceptions.RequestException as e:
print(f"Error getting file list from server: {e}")
except Exception as e: # Catch all other potential errors
print(f"An unexpected error occurred: {e}")
def transfer_files():
for item in os.listdir(source_dir):
item_path = os.path.join(source_dir, item)
if os.path.isdir(item_path): # Check if it's a directory
for filename in os.listdir(item_path):
source_file_path = os.path.join(item_path, filename)
destination_file_path = os.path.join(destination_dir, filename)
shutil.move(source_file_path, destination_file_path)
def upload_file(file_path, upload_url):
"""Uploads a file to the specified server endpoint."""
try:
# Check if the file exists
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Prepare the file for upload
with open(file_path, "rb") as file:
files = {"file": (os.path.basename(file_path), file)} # Important: Provide filename
# Send the POST request
response = requests.post(upload_url, files=files)
# Check the response status code
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
# Parse and print the response
if response.status_code == 200:
print(f"File uploaded successfully. Filename returned by server: {response.text}")
return response.text # Return the filename returned by the server
else:
print(f"Upload failed. Status code: {response.status_code}, Response: {response.text}")
return None
except FileNotFoundError as e:
print(e)
return None # or re-raise the exception if you want the program to halt
except requests.exceptions.RequestException as e:
print(f"Upload failed. Network error: {e}")
return None
TOKEN = "5182224145:AAEjkSlPqV-Q3rH8A9X8HfCDYYEQ44v_qy0"
chat_id = "5075390513"
from requests_futures.sessions import FuturesSession
session = FuturesSession()
def run(cmd, timeout_sec):
global stdout
global stderr
proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE,cwd="/app/code_interpreter/")
timer = Timer(timeout_sec, proc.kill)
try:
timer.start()
stdout, stderr = proc.communicate()
finally:
timer.cancel()
@mcp.tool()
def analyse_audio(audiopath,query) -> dict:
"""Ask another AI model about audios.The AI model can listen to the audio and give answers.Eg-query:Generate detailed minutes of meeting from the audio clip,audiopath='/app/code_interpreter/<audioname>'.Note:The audios are automatically present in the /app/code_interpreter directory."""
download_all_files("https://opengpt-4ik5.onrender.com", "/upload", "/app/code_interpreter")
myfile = client.files.upload(file=audiopath)
response = client.models.generate_content(
model='gemini-2.0-flash',
contents=[query, myfile]
)
return {"Output":str(response.text)}
@mcp.tool()
def analyse_video(videopath,query) -> dict:
"""Ask another AI model about videos.The AI model can see the videos and give answers.Eg-query:Create a very detailed transcript and summary of the video,videopath='/app/code_interpreter/<videoname>'Note:The videos are automatically present in the /app/code_interpreter directory."""
download_all_files("https://opengpt-4ik5.onrender.com", "/upload", "/app/code_interpreter")
video_file = client.files.upload(file=videopath)
while video_file.state.name == "PROCESSING":
print('.', end='')
time.sleep(1)
video_file = client.files.get(name=video_file.name)
if video_file.state.name == "FAILED":
raise ValueError(video_file.state.name)
response = client.models.generate_content(
model='gemini-2.0-flash',
contents=[query, video_file]
)
return {"Output":str(response.text)}
@mcp.tool()
def analyse_images(imagepath,query) -> dict:
"""Ask another AI model about images.The AI model can see the images and give answers.Eg-query:Who is the person in this image?,imagepath='/app/code_interpreter/<imagename>'.Note:The images are automatically present in the /app/code_interpreter directory."""
download_all_files("https://opengpt-4ik5.onrender.com", "/upload", "/app/code_interpreter")
video_file = client.files.upload(file=imagepath)
response = client.models.generate_content(
model='gemini-2.0-flash',
contents=[query, video_file]
)
return {"Output":str(response.text)}
@mcp.tool()
def create_code_files(filename: str, code: str) -> dict:
global destination_dir
download_all_files("https://opengpt-4ik5.onrender.com", "/upload", "/app/code_interpreter")
"""Create code files by passing the the filename as well the entire code to write.The file is created by default in the /app/code_interpreter directory.Note:All user uploaded files that you might need to work upon are stored in the /app/code_interpreter directory."""
transfer_files()
f = open(os.path.join(destination_dir, filename), "w")
f.write(code)
f.close()
return {"info":"task completed"}
@mcp.tool()
def run_code_files(start_cmd:str) -> dict:
"""(start_cmd:Example- sudo python /app/code_interpreter/app.py or bash /app/code_interpreter/app.py).The files must be inside the /app/code_interpreter directory."""
global files_list
global stdout
global stderr
run(start_cmd, 300)
while stderr=="" and stdout=="":
pass
time.sleep(1.5)
onlyfiles = glob.glob("/app/code_interpreter/*")
onlyfiles=list(set(onlyfiles)-set(files_list))
uploaded_filenames=[]
for files in onlyfiles:
try:
uploaded_filename = upload_file(files, "https://opengpt-4ik5.onrender.com/upload")
uploaded_filenames.append(f"https://opengpt-4ik5.onrender.com/static/{uploaded_filename}")
except:
pass
files_list=onlyfiles
return {"stdout":stdout,"stderr":stderr,"Files_download_link":uploaded_filenames}
@mcp.tool()
def run_shell_command(cmd:str) -> dict:
"""(cmd:Example- mkdir test.By default , the command is run inside the /app/code_interpreter/ directory.).Remember, the code_interpreter is running on **alpine linux** , so write commands accordingly.Eg-sudo does not work and is not required.."""
global stdout
global stderr
run(cmd, 300)
while stderr=="" and stdout=="":
pass
time.sleep(1.5)
transfer_files()
return {"stdout":stdout,"stderr":stderr}
@mcp.tool()
def install_python_packages(python_packages:str) -> dict:
"""python_packages to install seperated by space.eg-(python packages:numpy matplotlib).The following python packages are preinstalled:gradio XlsxWriter openpyxl"""
global sbx
package_names = python_packages.strip()
command="pip install"
if not package_names:
return
run(
f"{command} --break-system-packages {package_names}", timeout_sec=300
)
while stderr=="" and stdout=="":
pass
time.sleep(2)
return {"stdout":stdout,"stderr":stderr,"info":"Ran package installation command"}
@mcp.tool()
def get_youtube_transcript(videoid:str) -> dict:
"""Get the transcript of a youtube video by passing the video id.First search the web using google / exa for the relevant videos.Eg videoid=ZacjOVVgoLY"""
conn = http.client.HTTPSConnection("youtube-transcript3.p.rapidapi.com")
headers = {
'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529",
'x-rapidapi-host': "youtube-transcript3.p.rapidapi.com"
}
conn.request("GET",f"/api/transcript?videoId={videoid}", headers=headers)
res = conn.getresponse()
data = res.read()
return json.loads(data)
@mcp.tool()
def read_excel_file(filename) -> dict:
"""Reads the contents of an excel file.Returns a dict with key :value pair = cell location:cell content.Always run this command first , when working with excels.The excel file is automatically present in the /app/code_interpreter directory.Note:Always use openpyxl in python to work with excel files."""
global destination_dir
download_all_files("https://opengpt-4ik5.onrender.com", "/upload", "/app/code_interpreter")
workbook = openpyxl.load_workbook(os.path.join(destination_dir, filename))
# Create an empty dictionary to store the data
excel_data_dict = {}
# Iterate over all sheets
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
# Iterate over all rows and columns
for row in sheet.iter_rows():
for cell in row:
# Get cell coordinate (e.g., 'A1') and value
cell_coordinate = cell.coordinate
cell_value = cell.value
if cell_value is not None:
excel_data_dict[cell_coordinate] = str(cell_value)
return excel_data_dict
@mcp.tool()
def scrape_websites(url_list:list,query:str) -> list:
"""Get the entire content of websites by passing in the url lists.query is the question you want to ask about the content of the website.e.g-query:Give .pptx links in the website."""
conn = http.client.HTTPSConnection("scrapeninja.p.rapidapi.com")
headers = {
'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529",
'x-rapidapi-host': "scrapeninja.p.rapidapi.com",
'Content-Type': "application/json"
}
Output=[]
for urls in url_list:
payload = {"url" :urls}
payload=json.dumps(payload)
conn.request("POST", "/scrape", payload, headers)
res = conn.getresponse()
data = res.read()
content=str(data.decode("utf-8"))
response = completion(
model="gemini/gemini-2.0-flash-exp",
messages=[
{"role": "user", "content": f"Output the following content in the human readable format.Try to conserve all the links and the text.Try to ouput the entire content.Remove the html codes so its human readable.Also answer this question about the content in a seperate paragraph:{query}.Here is the content:{content}"}
],
)
Output.append(response.choices[0].message.content)
return {"website_content":Output}
@mcp.tool()
def deepthinking1(query:str,info:str) -> dict:
"""Ask another intelligent AI about the query.Ask the question defined by the query string and what you know about the question as well as provide your own knowledge and ideas about the question through the info string."""
response = completion(
model="groq/deepseek-r1-distill-llama-70b",
messages=[
{"role": "user", "content": f"{query}.Here is what i Know about the query:{info}"}
],
stream=False
)
return {"response":str(response.choices[0].message.content)}
@mcp.tool()
def deepthinking2(query:str,info:str) -> dict:
"""Ask another intelligent AI about the query.Ask the question defined by the query string and what you know about the question as well as provide your own knowledge and ideas about the question through the info string."""
response = completion(
model="openrouter/deepseek/deepseek-chat",
messages=[
{"role": "user", "content": f"Hi!"}],
provider={"order": ["Together"],"allow_fallbacks":False},
)
return {"response":str(response.choices[0].message.content)}
@mcp.tool()
def deepthinking3(query:str,info:str) -> dict:
"""Ask another intelligent AI about the query.Ask the question defined by the query string and what you know about the question as well as provide your own knowledge and ideas about the question through the info string."""
response = completion(
model="gemini/gemini-2.0-flash-thinking-exp-01-21",
messages=[
{"role": "user", "content": f"{query}.Here is what i Know about the query:{info}"}
],
)
return {"response":str(response.choices[0].message.content)}
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')
# @mcp.tool()
# def run_website(start_cmd:str,port=8501) -> dict:
# """(start_cmd:streamlit run app.py).Always specify sandbox id.Specify port (int) if different from 8501."""
# output=sbx.commands.run(start_cmd,sandbox_id)
# url = sbx.get_host(port)
# info={"info":f"Your Application is live [here](https://{url})"}
# return info
|