Spaces:
Running
Running
File size: 15,156 Bytes
3290198 31e9175 3290198 31e9175 3290198 31e9175 3290198 31e9175 3290198 31e9175 3290198 31e9175 3290198 31e9175 3290198 31e9175 3290198 31e9175 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
import asyncio
import aiohttp
import json
import nest_asyncio
nest_asyncio.apply()
# API Endpoints
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
SERPAPI_URL = "https://serpapi.com/search"
JINA_BASE_URL = "https://r.jina.ai/"
# Modify the default model selection
DEFAULT_MODEL = "google/gemini-2.0-flash-lite-preview-02-05:free" # Gemini Flash 2.0 model identifier
# Helper class to hold extracted content along with its source URL
class SourcedContext:
def __init__(self, text, source_url):
self.text = text
self.source_url = source_url
async def call_openrouter_async(session, messages, model=DEFAULT_MODEL):
"""
Make an asynchronous request to the OpenRouter chat completion API with the given messages.
Returns the assistant's reply text.
"""
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"HTTP-Referer": "https://github.com/Pygen",
"X-Title": "Research Assistant",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages,
"temperature": 0.7,
"max_tokens": 4096
}
try:
async with session.post(OPENROUTER_URL, headers=headers, json=payload) as resp:
if resp.status == 200:
result = await resp.json()
try:
return result['choices'][0]['message']['content']
except (KeyError, IndexError) as e:
print("Unexpected response structure from OpenRouter:", result)
return None
else:
text = await resp.text()
print(f"OpenRouter API error: {resp.status} - {text}")
return None
except Exception as e:
print("Error during OpenRouter call:", e)
return None
async def generate_search_queries_async(session, user_query):
"""
Use the LLM to produce up to four clear search queries based on the user's topic.
"""
prompt = (
"You are a seasoned research assistant. Based on the user's topic, produce as many as four distinct and precise "
"search queries that will help collect thorough information on the subject. "
"Return a Python list of strings only, without any code formatting or backticks. "
"For example: ['query1', 'query2', 'query3']"
)
messages = [
{"role": "system", "content": "You are a precise and supportive research assistant."},
{"role": "user", "content": f"User Topic: {user_query}\n\n{prompt}"}
]
response = await call_openrouter_async(session, messages)
if response:
try:
cleaned_response = response.strip()
if cleaned_response.startswith("```"):
cleaned_response = cleaned_response.split("```")[1]
if cleaned_response.startswith("python"):
cleaned_response = cleaned_response[6:]
cleaned_response = cleaned_response.strip()
search_queries = eval(cleaned_response)
if isinstance(search_queries, list):
return search_queries
else:
print("The LLM response is not a list. Response:", response)
return []
except Exception as e:
print("Error interpreting search queries:", e, "\nResponse:", response)
return []
return []
# Modify perform_search_async function
async def perform_search_async(session, query, result_limit=5):
"""
Make an asynchronous SERPAPI call to perform a Google search for the provided query.
result_limit: Maximum number of search results to return
"""
params = {
"q": query,
"api_key": SERPAPI_API_KEY,
"engine": "google",
"num": result_limit # Add this parameter for limiting results
}
try:
async with session.get(SERPAPI_URL, params=params) as resp:
if resp.status == 200:
results = await resp.json()
if "organic_results" in results:
links = [item.get("link") for item in results["organic_results"] if "link" in item]
return links[:result_limit] # Ensure we don't exceed the limit
else:
print("No organic results found in SERPAPI response.")
return []
else:
text = await resp.text()
print(f"SERPAPI error: {resp.status} - {text}")
return []
except Exception as e:
print("Error during SERPAPI search:", e)
return []
async def fetch_webpage_text_async(session, url):
"""
Fetch the textual content of a webpage asynchronously using the Jina service.
"""
full_url = f"{JINA_BASE_URL}{url}"
headers = {
"Authorization": f"Bearer {JINA_API_KEY}"
}
try:
async with session.get(full_url, headers=headers) as resp:
if resp.status == 200:
return await resp.text()
else:
text = await resp.text()
print(f"Jina fetch error for {url}: {resp.status} - {text}")
return ""
except Exception as e:
print("Error retrieving webpage text with Jina:", e)
return ""
async def is_page_useful_async(session, user_query, page_text):
"""
Request the LLM to determine if the provided webpage content is pertinent to answering the user's topic.
"""
prompt = (
"You are a discerning evaluator of research. Given the user's topic and a snippet of webpage content, "
"decide if the page contains valuable information to address the query. "
"Reply strictly with one word: 'Yes' if the content is useful, or 'No' if it is not. Provide no extra text."
)
messages = [
{"role": "system", "content": "You are a concise and strict research relevance evaluator."},
{"role": "user", "content": f"User Topic: {user_query}\n\nWebpage Snippet (up to 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
]
response = await call_openrouter_async(session, messages)
if response:
answer = response.strip()
if answer in ["Yes", "No"]:
return answer
else:
if "Yes" in answer:
return "Yes"
elif "No" in answer:
return "No"
return "No"
async def extract_relevant_context_async(session, user_query, search_query, page_text):
"""
Derive and return the important details from the webpage text to address the user's topic.
"""
prompt = (
"You are an expert extractor of information. Given the user's topic, the search query that produced this page, "
"and the webpage text, extract all pertinent details needed to answer the inquiry. "
"Return only the relevant text without any additional commentary."
)
messages = [
{"role": "system", "content": "You excel at summarizing and extracting relevant details."},
{"role": "user", "content": f"User Topic: {user_query}\nSearch Query: {search_query}\n\nWebpage Snippet (up to 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
]
response = await call_openrouter_async(session, messages)
if response:
return response.strip()
return ""
async def get_new_search_queries_async(session, user_query, previous_search_queries, all_contexts):
"""
Evaluate if additional search queries are necessary based on the current research progress.
"""
context_combined = "\n".join(all_contexts)
prompt = (
"You are a systematic research planner. Taking into account the original topic, prior search queries, "
"and the extracted information from webpages, determine if more research is required. "
"If so, produce up to four new search queries as a Python list "
"(for example: ['new query1', 'new query2']). If no further research is needed, reply with an empty string."
"\nReturn only a Python list or an empty string without extra commentary."
)
messages = [
{"role": "system", "content": "You are methodical in planning further research steps."},
{"role": "user", "content": f"User Topic: {user_query}\nPrevious Queries: {previous_search_queries}\n\nCollected Context:\n{context_combined}\n\n{prompt}"}
]
response = await call_openrouter_async(session, messages)
if response:
cleaned = response.strip()
if cleaned == "":
return ""
try:
if cleaned.startswith("```"):
cleaned = cleaned.split("```")[1]
if cleaned.startswith("python"):
cleaned = cleaned[6:]
cleaned = cleaned.strip()
new_queries = eval(cleaned)
if isinstance(new_queries, list):
return new_queries
else:
print("LLM response is not a list for extra search queries. Response:", response)
return []
except Exception as e:
print("Failed to parse additional search queries:", e, "\nResponse:", response)
return []
return []
async def generate_final_report_async(session, user_query, sourced_contexts):
"""
Construct the ultimate detailed report including proper citations and references.
"""
# Assign citation numbers to contexts based on source URL
references = {}
ref_number = 1
formatted_contexts = []
for ctx in sourced_contexts:
if ctx.source_url not in references:
references[ctx.source_url] = ref_number
ref_number += 1
formatted_contexts.append(f"{ctx.text} [{references[ctx.source_url]}]")
context_combined = "\n".join(formatted_contexts)
# Build the reference section
reference_list = [f"[{num}] {url}" for url, num in sorted(references.items(), key=lambda x: x[1])]
reference_section = "\n\nReferences:\n" + "\n".join(reference_list)
prompt = (
"You are a proficient academic report writer. Using the compiled contexts below and the original topic, "
"compose a comprehensive, well-organized, and in-depth report that fully addresses the inquiry. "
"Ensure that each piece of evidence is tagged with citation numbers in square brackets (e.g., [1], [2]). "
"Maintain these tags in your final report to show the references. "
"The style should be academic with proper in-text citations. Do not alter or add citation numbers."
)
messages = [
{"role": "system", "content": "You are an expert academic report composer."},
{"role": "user", "content": f"User Topic: {user_query}\n\nCollected Context:\n{context_combined}\n\n{prompt}"}
]
report = await call_openrouter_async(session, messages)
if report:
return report + reference_section
return "Error occurred while generating the report."
async def process_link(session, link, user_query, search_query):
"""
Handle a single URL: fetch its content, assess its relevance, and if it qualifies, extract the associated context.
Returns a SourcedContext object upon success, or None otherwise.
"""
print(f"Retrieving content from: {link}")
page_text = await fetch_webpage_text_async(session, link)
if not page_text:
return None
usefulness = await is_page_useful_async(session, user_query, page_text)
print(f"Relevance of {link}: {usefulness}")
if usefulness == "Yes":
context = await extract_relevant_context_async(session, user_query, search_query, page_text)
if context:
print(f"Context extracted from {link} (first 200 characters): {context[:200]}")
return SourcedContext(context, link)
return None
# Modify research_flow function to accept search_limit parameter
async def research_flow(user_query, iteration_limit, search_limit=5):
"""
Primary research procedure intended for integration with Streamlit.
search_limit: Maximum number of search results per query
"""
sourced_contexts = []
all_search_queries = []
iteration = 0
async with aiohttp.ClientSession() as session:
new_search_queries = await generate_search_queries_async(session, user_query)
if not new_search_queries:
return "No search queries were generated by the LLM. Terminating process."
all_search_queries.extend(new_search_queries)
while iteration < iteration_limit:
print(f"\n--- Iteration {iteration + 1} ---")
iteration_contexts = []
# Update to include search_limit
search_tasks = [perform_search_async(session, query, search_limit) for query in new_search_queries]
search_results = await asyncio.gather(*search_tasks)
unique_links = {}
for idx, links in enumerate(search_results):
query = new_search_queries[idx]
for link in links:
if link not in unique_links:
unique_links[link] = query
print(f"Collected {len(unique_links)} distinct links in this iteration.")
link_tasks = [
process_link(session, link, user_query, unique_links[link])
for link in unique_links
]
link_results = await asyncio.gather(*link_tasks)
for res in link_results:
if res:
iteration_contexts.append(res)
if iteration_contexts:
sourced_contexts.extend(iteration_contexts)
else:
print("No relevant information was found in this iteration.")
context_texts = [ctx.text for ctx in sourced_contexts]
new_search_queries = await get_new_search_queries_async(
session, user_query, all_search_queries, context_texts
)
if new_search_queries == "":
print("LLM has determined that additional research is unnecessary.")
break
elif new_search_queries:
print("LLM provided extra search queries:", new_search_queries)
all_search_queries.extend(new_search_queries)
else:
print("LLM returned no further search queries. Concluding the loop.")
break
iteration += 1
final_report = await generate_final_report_async(session, user_query, sourced_contexts)
return final_report
def main():
"""
CLI entry point for testing this research module.
"""
user_query = input("Enter your research topic/question: ").strip()
iter_limit_input = input("Enter the maximum number of iterations (default is 10): ").strip()
iteration_limit = int(iter_limit_input) if iter_limit_input.isdigit() else 10
final_report = asyncio.run(research_flow(user_query, iteration_limit))
print("\n==== FINAL REPORT ====\n")
print(final_report)
if __name__ == "__main__":
main()
|