Spaces:
Runtime error
Runtime error
import os | |
import asyncio | |
import httpx | |
import nest_asyncio | |
from dotenv import load_dotenv | |
# Apply nested asyncio patch | |
nest_asyncio.apply() | |
# helper functions | |
GEMINI_API_KEY="AIzaSyCUCivstFpC9pq_jMHMYdlPrmh9Bx97dFo" | |
TAVILY_API_KEY="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" | |
OPENAI_API_KEY="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" | |
QDRANT_API_KEY="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIiwiZXhwIjoxNzUxMDUxNzg4fQ.I9J-K7OM0BtcNKgj2d4uVM8QYAHYfFCVAyP4rlZkK2E" | |
QDRANT_URL="https://6a3aade6-e8ad-4a6c-a579-21f5af90b7e8.us-east4-0.gcp.cloud.qdrant.io" | |
OPENAI_API_KEY="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" | |
WEAVIATE_URL="yorcqe2sqswhcaivxvt9a.c0.us-west3.gcp.weaviate.cloud" | |
WEAVIATE_API_KEY="d2d0VGdZQTBmdTFlOWdDZl9tT2h3WDVWd1NpT1dQWHdGK0xjR1hYeWxicUxHVnFRazRUSjY2VlRUVlkwPV92MjAw" | |
DEEPINFRA_API_KEY="285LUJulGIprqT6hcPhiXtcrphU04FG4" | |
DEEPINFRA_BASE_URL="https://api.deepinfra.com/v1/openai" | |
# Try to import tavily with fallback | |
try: | |
from tavily import TavilyClient | |
tavily_client = TavilyClient(api_key=TAVILY_API_KEY) | |
TAVILY_AVAILABLE = True | |
except ImportError: | |
print("Warning: Tavily package not found. Web search will use fallback mode.") | |
tavily_client = None | |
TAVILY_AVAILABLE = False | |
async def search_autism(query: str) -> dict: | |
"""Performs a web search for information about autism.""" | |
if not TAVILY_AVAILABLE: | |
print("Web search unavailable - tavily package not installed") | |
return { | |
"results": [], | |
"answer": "Web search functionality is currently unavailable. Please ensure all dependencies are installed." | |
} | |
try: | |
# Execute a search query using Tavily | |
response = tavily_client.search( | |
query=query, | |
max_results=5, | |
search_depth="advanced", | |
topic="general", | |
include_answer=True | |
) | |
return { | |
"results": response.get("results", []), | |
"answer": response.get("answer", "") | |
} | |
except Exception as e: | |
print(f"Search error: {str(e)}") | |
return { | |
"results": [], | |
"answer": f"Unable to perform web search: {str(e)}" | |
} | |
# Test function for development | |
async def main(): | |
query = "autism symptoms and treatments" | |
result = await search_autism(query) | |
print("Search Results:") | |
for res in result.get("results", []): | |
print(f"- {res.get('title')} ({res.get('url')})") | |
print("\nAnswer:") | |
print(result.get("answer", "No answer provided.")) | |
# Run the script | |
if __name__ == "__main__": | |
asyncio.run(main()) | |