Spaces:
Running
Running
from fastapi import FastAPI, HTTPException, Query | |
from fastapi.responses import JSONResponse | |
from fastapi import FastAPI, HTTPException | |
from fastapi.responses import StreamingResponse | |
from webscout import WEBS, transcriber, LLM, fastai | |
from stream import fastai_stream | |
from typing import Optional, List, Dict, Union | |
from fastapi.encoders import jsonable_encoder | |
from bs4 import BeautifulSoup | |
import requests | |
import urllib.parse | |
import asyncio | |
import aiohttp | |
import threading | |
import json | |
import os | |
import time | |
from huggingface_hub import HfApi | |
from huggingface_hub import InferenceClient | |
from PIL import Image | |
import io | |
app = FastAPI() | |
async def root(): | |
return {"message": "API documentation can be found at /docs"} | |
async def health_check(): | |
return {"status": "OK"} | |
async def search( | |
q: str, | |
max_results: int = 10, | |
timelimit: Optional[str] = None, | |
safesearch: str = "moderate", | |
region: str = "wt-wt", | |
backend: str = "api", | |
proxy: Optional[str] = None # Add proxy parameter here | |
): | |
"""Perform a text search.""" | |
try: | |
with WEBS(proxy=proxy) as webs: # Pass proxy to WEBS instance | |
results = webs.text( | |
keywords=q, | |
region=region, | |
safesearch=safesearch, | |
timelimit=timelimit, | |
backend=backend, | |
max_results=max_results, | |
) | |
return JSONResponse(content=jsonable_encoder(results)) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during search: {e}") | |
async def images( | |
q: str, | |
max_results: int = 10, | |
safesearch: str = "moderate", | |
region: str = "wt-wt", | |
timelimit: Optional[str] = None, | |
size: Optional[str] = None, | |
color: Optional[str] = None, | |
type_image: Optional[str] = None, | |
layout: Optional[str] = None, | |
license_image: Optional[str] = None, | |
proxy: Optional[str] = None # Add proxy parameter here | |
): | |
"""Perform an image search.""" | |
try: | |
with WEBS(proxy=proxy) as webs: # Pass proxy to WEBS instance | |
results = webs.images( | |
keywords=q, | |
region=region, | |
safesearch=safesearch, | |
timelimit=timelimit, | |
size=size, | |
color=color, | |
type_image=type_image, | |
layout=layout, | |
license_image=license_image, | |
max_results=max_results, | |
) | |
return JSONResponse(content=jsonable_encoder(results)) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during image search: {e}") | |
async def videos( | |
q: str, | |
max_results: int = 10, | |
safesearch: str = "moderate", | |
region: str = "wt-wt", | |
timelimit: Optional[str] = None, | |
resolution: Optional[str] = None, | |
duration: Optional[str] = None, | |
license_videos: Optional[str] = None, | |
proxy: Optional[str] = None # Add proxy parameter here | |
): | |
"""Perform a video search.""" | |
try: | |
with WEBS(proxy=proxy) as webs: # Pass proxy to WEBS instance | |
results = webs.videos( | |
keywords=q, | |
region=region, | |
safesearch=safesearch, | |
timelimit=timelimit, | |
resolution=resolution, | |
duration=duration, | |
license_videos=license_videos, | |
max_results=max_results, | |
) | |
return JSONResponse(content=jsonable_encoder(results)) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during video search: {e}") | |
async def news( | |
q: str, | |
max_results: int = 10, | |
safesearch: str = "moderate", | |
region: str = "wt-wt", | |
timelimit: Optional[str] = None, | |
proxy: Optional[str] = None # Add proxy parameter here | |
): | |
"""Perform a news search.""" | |
try: | |
with WEBS(proxy=proxy) as webs: # Pass proxy to WEBS instance | |
results = webs.news( | |
keywords=q, | |
region=region, | |
safesearch=safesearch, | |
timelimit=timelimit, | |
max_results=max_results | |
) | |
return JSONResponse(content=jsonable_encoder(results)) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during news search: {e}") | |
async def llm_chat( | |
model: str, | |
message: str, | |
system_prompt: str = Query(None, description="Optional custom system prompt") | |
): | |
"""Interact with a specified large language model with an optional system prompt.""" | |
try: | |
messages = [{"role": "user", "content": message}] | |
if system_prompt: | |
messages.insert(0, {"role": "system", "content": system_prompt}) # Add system message at the beginning | |
llm = LLM(model=model) | |
response = llm.chat(messages=messages) | |
return JSONResponse(content={"response": response}) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during LLM chat: {e}") | |
async def fast_ai(user: str, model: str = "llama3-70b", system: str = "Answer as concisely as possible."): | |
"""Get a response from the Snova AI service.""" | |
try: | |
response = await asyncio.to_thread(fastai, user, model, system) | |
return JSONResponse(content={"response": response}) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during Snova AI request: {e}") | |
async def fast_ai(user: str, model: str = "llama3-8b", system: str = "Answer as concisely as possible."): | |
"""Get a streaming response from the Snova AI service.""" | |
try: | |
return StreamingResponse(fastai_stream(user, model, system), media_type="text/event-stream") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during Snova AI request: {e}") | |
async def answers(q: str, proxy: Optional[str] = None): | |
"""Get instant answers for a query.""" | |
try: | |
with WEBS(proxy=proxy) as webs: | |
results = webs.answers(keywords=q) | |
return JSONResponse(content=jsonable_encoder(results)) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error getting instant answers: {e}") | |
async def chat( | |
q: str, | |
model: str = "gpt-4o-mini", | |
proxy: Optional[str] = None | |
): | |
"""Perform a text search.""" | |
try: | |
with WEBS(proxy=proxy) as webs: | |
results = webs.chat(keywords=q, model=model) | |
return JSONResponse(content=jsonable_encoder(results)) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error getting chat results: {e}") | |
def extract_text_from_webpage(html_content): | |
"""Extracts visible text from HTML content using BeautifulSoup.""" | |
soup = BeautifulSoup(html_content, "html.parser") | |
# Remove unwanted tags | |
for tag in soup(["script", "style", "header", "footer", "nav"]): | |
tag.extract() | |
# Get the remaining visible text | |
visible_text = soup.get_text(strip=True) | |
return visible_text | |
async def fetch_and_extract(url, max_chars, proxy: Optional[str] = None): | |
"""Fetches a URL and extracts text asynchronously.""" | |
async with aiohttp.ClientSession() as session: | |
try: | |
async with session.get(url, headers={"User-Agent": "Mozilla/5.0"}, proxy=proxy) as response: | |
response.raise_for_status() | |
html_content = await response.text() | |
visible_text = extract_text_from_webpage(html_content) | |
if len(visible_text) > max_chars: | |
visible_text = visible_text[:max_chars] + "..." | |
return {"link": url, "text": visible_text} | |
except (aiohttp.ClientError, requests.exceptions.RequestException) as e: | |
print(f"Error fetching or processing {url}: {e}") | |
return {"link": url, "text": None} | |
async def web_extract( | |
url: str, | |
max_chars: int = 12000, # Adjust based on token limit | |
proxy: Optional[str] = None | |
): | |
"""Extracts text from a given URL.""" | |
try: | |
result = await fetch_and_extract(url, max_chars, proxy) | |
return {"url": url, "text": result["text"]} | |
except requests.exceptions.RequestException as e: | |
raise HTTPException(status_code=500, detail=f"Error fetching or processing URL: {e}") | |
async def web_search_and_extract( | |
q: str, | |
max_results: int = 3, | |
timelimit: Optional[str] = None, | |
safesearch: str = "moderate", | |
region: str = "wt-wt", | |
backend: str = "html", | |
max_chars: int = 6000, | |
extract_only: bool = True, | |
proxy: Optional[str] = None | |
): | |
""" | |
Searches using WEBS, extracts text from the top results, and returns both. | |
""" | |
try: | |
with WEBS(proxy=proxy) as webs: | |
# Perform WEBS search | |
search_results = webs.text(keywords=q, region=region, safesearch=safesearch, | |
timelimit=timelimit, backend=backend, max_results=max_results) | |
# Extract text from each result's link asynchronously | |
tasks = [fetch_and_extract(result['href'], max_chars, proxy) for result in search_results if 'href' in result] | |
extracted_results = await asyncio.gather(*tasks) | |
if extract_only: | |
return JSONResponse(content=jsonable_encoder(extracted_results)) | |
else: | |
return JSONResponse(content=jsonable_encoder({"search_results": search_results, "extracted_results": extracted_results})) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during search and extraction: {e}") | |
def extract_text_from_webpage2(html_content): | |
"""Extracts visible text from HTML content using BeautifulSoup.""" | |
soup = BeautifulSoup(html_content, "html.parser") | |
# Remove unwanted tags | |
for tag in soup(["script", "style", "header", "footer", "nav"]): | |
tag.extract() | |
# Get the remaining visible text | |
visible_text = soup.get_text(strip=True) | |
return visible_text | |
def fetch_and_extract2(url, max_chars, proxy: Optional[str] = None): | |
"""Fetches a URL and extracts text using threading.""" | |
proxies = {'http': proxy, 'https': proxy} if proxy else None | |
try: | |
response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}, proxies=proxies) | |
response.raise_for_status() | |
html_content = response.text | |
visible_text = extract_text_from_webpage2(html_content) | |
if len(visible_text) > max_chars: | |
visible_text = visible_text[:max_chars] + "..." | |
return {"link": url, "text": visible_text} | |
except (requests.exceptions.RequestException) as e: | |
print(f"Error fetching or processing {url}: {e}") | |
return {"link": url, "text": None} | |
def web_search_and_extract_threading( | |
q: str, | |
max_results: int = 3, | |
timelimit: Optional[str] = None, | |
safesearch: str = "moderate", | |
region: str = "wt-wt", | |
backend: str = "html", | |
max_chars: int = 6000, | |
extract_only: bool = True, | |
proxy: Optional[str] = None | |
): | |
""" | |
Searches using WEBS, extracts text from the top results using threading, and returns both. | |
""" | |
try: | |
with WEBS(proxy=proxy) as webs: | |
# Perform WEBS search | |
search_results = webs.text(keywords=q, region=region, safesearch=safesearch, | |
timelimit=timelimit, backend=backend, max_results=max_results) | |
# Extract text from each result's link using threading | |
extracted_results = [] | |
threads = [] | |
for result in search_results: | |
if 'href' in result: | |
thread = threading.Thread(target=lambda: extracted_results.append(fetch_and_extract2(result['href'], max_chars, proxy))) | |
threads.append(thread) | |
thread.start() | |
# Wait for all threads to finish | |
for thread in threads: | |
thread.join() | |
if extract_only: | |
return JSONResponse(content=jsonable_encoder(extracted_results)) | |
else: | |
return JSONResponse(content=jsonable_encoder({"search_results": search_results, "extracted_results": extracted_results})) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during search and extraction: {e}") | |
async def adv_web_search( | |
q: str, | |
model: str = "llama3-8b", | |
max_results: int = 5, | |
timelimit: Optional[str] = None, | |
safesearch: str = "moderate", | |
region: str = "wt-wt", | |
backend: str = "html", | |
max_chars: int = 6000, | |
system_prompt: str = "You are an advanced AI chatbot. Provide the best answer to the user based on Google search results.", | |
proxy: Optional[str] = None | |
): | |
""" | |
Combines web search, web extraction, and FastAI chat for advanced search. | |
""" | |
try: | |
with WEBS(proxy=proxy) as webs: | |
# 1. Perform the web search | |
search_results = webs.text(keywords=q, region=region, | |
safesearch=safesearch, | |
timelimit=timelimit, backend=backend, | |
max_results=max_results) | |
# 2. Extract text from top search result URLs asynchronously | |
extracted_text = "" | |
tasks = [fetch_and_extract(result['href'], max_chars, proxy) for result in search_results if 'href' in result] | |
extracted_results = await asyncio.gather(*tasks) | |
for result in extracted_results: | |
if result['text']: | |
extracted_text += f"## Content from: {result['link']}\n\n{result['text']}\n\n" | |
# 3. Construct the prompt for FastAI | |
ai_prompt = f"Query by user: {q} . Answer the query asked by user in detail. SEarch Result: {extracted_text}" | |
# 4. Get the FastAI's response using FastAI service | |
try: | |
response = await asyncio.to_thread(FastAI(model=model, system_prompt=system_prompt).get_response, ai_prompt) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during FastAI request: {e}") | |
# 5. Return the results | |
return JSONResponse(content={"response": response}) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during advanced search: {e}") | |
async def website_summarizer(url: str, proxy: Optional[str] = None): | |
"""Summarizes the content of a given URL using a chat model.""" | |
try: | |
# Extract text from the given URL | |
proxies = {'http': proxy, 'https': proxy} if proxy else None | |
response = requests.get(url, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0"}, proxies=proxies) | |
response.raise_for_status() | |
visible_text = extract_text_from_webpage(response.text) | |
if len(visible_text) > 7500: # Adjust max_chars based on your needs | |
visible_text = visible_text[:7500] + "..." | |
# Use chat model to summarize the extracted text | |
with WEBS(proxy=proxy) as webs: | |
summary_prompt = f"Summarize this in detail in Paragraph: {visible_text}" | |
summary_result = webs.chat(keywords=summary_prompt, model="gpt-4o-mini") | |
# Return the summary result | |
return JSONResponse(content=jsonable_encoder({summary_result})) | |
except requests.exceptions.RequestException as e: | |
raise HTTPException(status_code=500, detail=f"Error fetching or processing URL: {e}") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during summarization: {e}") | |
async def ask_website(url: str, question: str, model: str = "llama-3-70b", proxy: Optional[str] = None): | |
""" | |
Asks a question about the content of a given website. | |
""" | |
try: | |
# Extract text from the given URL | |
proxies = {'http': proxy, 'https': proxy} if proxy else None | |
response = requests.get(url, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0"}, proxies=proxies) | |
response.raise_for_status() | |
visible_text = extract_text_from_webpage(response.text) | |
if len(visible_text) > 7500: # Adjust max_chars based on your needs | |
visible_text = visible_text[:7500] + "..." | |
# Construct a prompt for the chat model | |
prompt = f"Based on the following text, answer this question in Paragraph: [QUESTION] {question} [TEXT] {visible_text}" | |
# Use chat model to get the answer | |
with WEBS(proxy=proxy) as webs: | |
answer_result = webs.chat(keywords=prompt, model=model) | |
# Return the answer result | |
return JSONResponse(content=jsonable_encoder({answer_result})) | |
except requests.exceptions.RequestException as e: | |
raise HTTPException(status_code=500, detail=f"Error fetching or processing URL: {e}") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during question answering: {e}") | |
client_sd3 = InferenceClient("stabilityai/stable-diffusion-3-medium-diffusers") | |
def sd3(prompt :str = "", | |
steps: int = 20, | |
width: int = 1000, | |
height: int = 1000 | |
): | |
try: | |
image = client_sd3.text_to_image(prompt = f"{prompt}, hd, high quality, 4k, masterpiece", | |
num_inference_steps = steps, | |
width = width, height = height ) | |
image = Image.open(io.BytesIO(image)) | |
return image | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during image generation: {e}") | |
async def maps( | |
q: str, | |
place: Optional[str] = None, | |
street: Optional[str] = None, | |
city: Optional[str] = None, | |
county: Optional[str] = None, | |
state: Optional[str] = None, | |
country: Optional[str] = None, | |
postalcode: Optional[str] = None, | |
latitude: Optional[str] = None, | |
longitude: Optional[str] = None, | |
radius: int = 0, | |
max_results: int = 10, | |
proxy: Optional[str] = None | |
): | |
"""Perform a maps search.""" | |
try: | |
with WEBS(proxy=proxy) as webs: | |
results = webs.maps(keywords=q, place=place, street=street, city=city, county=county, state=state, country=country, postalcode=postalcode, latitude=latitude, longitude=longitude, radius=radius, max_results=max_results) | |
return JSONResponse(content=jsonable_encoder(results)) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during maps search: {e}") | |
async def translate( | |
q: str, | |
from_: Optional[str] = None, | |
to: str = "en", | |
proxy: Optional[str] = None | |
): | |
"""Translate text.""" | |
try: | |
with WEBS(proxy=proxy) as webs: | |
results = webs.translate(keywords=q, from_=from_, to=to) | |
return JSONResponse(content=jsonable_encoder(results)) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during translation: {e}") | |
from easygoogletranslate import EasyGoogleTranslate | |
def google_translate(q: str, from_: Optional[str] = 'auto', to: str = "en"): | |
try: | |
translator = EasyGoogleTranslate( | |
source_language=from_, | |
target_language=to, | |
timeout=10 | |
) | |
result = translator.translate(q) | |
return JSONResponse(content=jsonable_encoder({"detected_language": from_ , "original": q , "translated": result})) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during translation: {e}") | |
async def youtube_transcript( | |
video_id: str, | |
languages: str = "en", | |
preserve_formatting: bool = False, | |
proxy: Optional[str] = None # Add proxy parameter | |
): | |
"""Get the transcript of a YouTube video.""" | |
try: | |
languages_list = languages.split(",") | |
transcript = transcriber.get_transcript(video_id, languages=languages_list, preserve_formatting=preserve_formatting, proxies=proxy) | |
return JSONResponse(content=jsonable_encoder(transcript)) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error getting YouTube transcript: {e}") | |
import requests | |
def get_weather_json(location: str): | |
url = f"https://wttr.in/{location}?format=j1" | |
response = requests.get(url) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return {"error": f"Unable to fetch weather data. Status code: {response.status_code}"} | |
def get_ascii_weather(location: str): | |
url = f"https://wttr.in/{location}" | |
response = requests.get(url, headers={'User-Agent': 'curl'}) | |
if response.status_code == 200: | |
return response.text | |
else: | |
return {"error": f"Unable to fetch weather data. Status code: {response.status_code}"} | |
# Run the API server if this script is executed | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8083) | |
def main(): | |
# Retrieve the space ID and token from environment variables | |
space_id = os.getenv("SPACE_ID") | |
token = os.getenv("HF_TOKEN") | |
# Initialize the HfApi with the retrieved token | |
api = HfApi(token=token) | |
while True: | |
try: | |
# Restart the space | |
api.restart_space(space_id, factory_reboot=False) | |
print(f"Successfully restarted the space: {space_id}") | |
except Exception as e: | |
print(f"Error restarting the space: {e}") | |
# Wait for 10 minutes before restarting again | |
time.sleep(600) # Sleep for 600 seconds (10 minutes) | |
if __name__ == "__main__": | |
main() | |