wisalQA_P1 / web_search.py
afouda's picture
Upload 7 files
ea1e6bd verified
raw
history blame
2.64 kB
import os
import asyncio
import httpx
import nest_asyncio
from dotenv import load_dotenv
# Apply nested asyncio patch
nest_asyncio.apply()
# helper functions
GEMINI_API_KEY="AIzaSyCUCivstFpC9pq_jMHMYdlPrmh9Bx97dFo"
TAVILY_API_KEY="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"
OPENAI_API_KEY="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"
QDRANT_API_KEY="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIiwiZXhwIjoxNzUxMDUxNzg4fQ.I9J-K7OM0BtcNKgj2d4uVM8QYAHYfFCVAyP4rlZkK2E"
QDRANT_URL="https://6a3aade6-e8ad-4a6c-a579-21f5af90b7e8.us-east4-0.gcp.cloud.qdrant.io"
OPENAI_API_KEY="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"
WEAVIATE_URL="yorcqe2sqswhcaivxvt9a.c0.us-west3.gcp.weaviate.cloud"
WEAVIATE_API_KEY="d2d0VGdZQTBmdTFlOWdDZl9tT2h3WDVWd1NpT1dQWHdGK0xjR1hYeWxicUxHVnFRazRUSjY2VlRUVlkwPV92MjAw"
DEEPINFRA_API_KEY="285LUJulGIprqT6hcPhiXtcrphU04FG4"
DEEPINFRA_BASE_URL="https://api.deepinfra.com/v1/openai"
# Try to import tavily with fallback
try:
from tavily import TavilyClient
tavily_client = TavilyClient(api_key=TAVILY_API_KEY)
TAVILY_AVAILABLE = True
except ImportError:
print("Warning: Tavily package not found. Web search will use fallback mode.")
tavily_client = None
TAVILY_AVAILABLE = False
async def search_autism(query: str) -> dict:
"""Performs a web search for information about autism."""
if not TAVILY_AVAILABLE:
print("Web search unavailable - tavily package not installed")
return {
"results": [],
"answer": "Web search functionality is currently unavailable. Please ensure all dependencies are installed."
}
try:
# Execute a search query using Tavily
response = tavily_client.search(
query=query,
max_results=5,
search_depth="advanced",
topic="general",
include_answer=True
)
return {
"results": response.get("results", []),
"answer": response.get("answer", "")
}
except Exception as e:
print(f"Search error: {str(e)}")
return {
"results": [],
"answer": f"Unable to perform web search: {str(e)}"
}
# Test function for development
async def main():
query = "autism symptoms and treatments"
result = await search_autism(query)
print("Search Results:")
for res in result.get("results", []):
print(f"- {res.get('title')} ({res.get('url')})")
print("\nAnswer:")
print(result.get("answer", "No answer provided."))
# Run the script
if __name__ == "__main__":
asyncio.run(main())