Shreyas094's picture
Update app.py
d60fab0 verified
raw
history blame
15.5 kB
import fitz # PyMuPDF
import gradio as gr
import requests
from bs4 import BeautifulSoup
import urllib.parse
import random
import os
from dotenv import load_dotenv
load_dotenv() # Load environment variables from .env file
# Now replace the hard-coded token with the environment variable
HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
_useragent_list = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36",
]
# Function to extract visible text from HTML content of a webpage
def extract_text_from_webpage(html):
print("Extracting text from webpage...")
soup = BeautifulSoup(html, 'html.parser')
for script in soup(["script", "style"]):
script.extract() # Remove scripts and styles
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
print(f"Extracted text length: {len(text)}")
return text
# Function to perform a Google search and retrieve results
def google_search(term, num_results=5, lang="en", timeout=5, safe="active", ssl_verify=None):
"""Performs a Google search and returns the results."""
print(f"Searching for term: {term}")
escaped_term = urllib.parse.quote_plus(term)
start = 0
all_results = []
max_chars_per_page = 8000 # Limit the number of characters from each webpage to stay under the token limit
with requests.Session() as session:
while start < num_results:
print(f"Fetching search results starting from: {start}")
try:
# Choose a random user agent
user_agent = random.choice(_useragent_list)
headers = {
'User-Agent': user_agent
}
print(f"Using User-Agent: {headers['User-Agent']}")
resp = session.get(
url="https://www.google.com/search",
headers=headers,
params={
"q": term,
"num": num_results - start,
"hl": lang,
"start": start,
"safe": safe,
},
timeout=timeout,
verify=ssl_verify,
)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Error fetching search results: {e}")
break
soup = BeautifulSoup(resp.text, "html.parser")
result_block = soup.find_all("div", attrs={"class": "g"})
if not result_block:
print("No more results found.")
break
for result in result_block:
link = result.find("a", href=True)
if link:
link = link["href"]
print(f"Found link: {link}")
try:
webpage = session.get(link, headers=headers, timeout=timeout)
webpage.raise_for_status()
visible_text = extract_text_from_webpage(webpage.text)
if len(visible_text) > max_chars_per_page:
visible_text = visible_text[:max_chars_per_page] + "..."
all_results.append({"link": link, "text": visible_text})
except requests.exceptions.RequestException as e:
print(f"Error fetching or processing {link}: {e}")
all_results.append({"link": link, "text": None})
else:
print("No link found in result.")
all_results.append({"link": None, "text": None})
start += len(result_block)
print(f"Total results fetched: {len(all_results)}")
return all_results
# Function to format the prompt for the Hugging Face API
def format_prompt(query, search_results, instructions):
formatted_results = ""
for result in search_results:
link = result["link"]
text = result["text"]
if link:
formatted_results += f"URL: {link}\nContent: {text}\n{'-'*80}\n"
else:
formatted_results += "No link found.\n" + '-'*80 + '\n'
prompt = f"{instructions}User Query: {query}\n\nWeb Search Results:\n{formatted_results}\n\nAssistant:"
return prompt
# Function to generate text using Hugging Face API
def generate_text(input_text, temperature=0.7, repetition_penalty=1.0, top_p=0.9):
print("Generating text using Hugging Face API...")
endpoint = "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.3"
headers = {
"Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}", # Use the environment variable
"Content-Type": "application/json"
}
data = {
"inputs": input_text,
"parameters": {
"max_new_tokens": 8000, # Adjust as needed
"temperature": temperature,
"repetition_penalty": repetition_penalty,
"top_p": top_p
}
}
try:
response = requests.post(endpoint, headers=headers, json=data)
response.raise_for_status()
# Check if response is JSON
try:
json_data = response.json()
except ValueError:
print("Response is not JSON.")
return None
# Extract generated text from response JSON
if isinstance(json_data, list):
# Handle list response (if applicable for your use case)
generated_text = json_data[0].get("generated_text") if json_data else None
elif isinstance(json_data, dict):
# Handle dictionary response
generated_text = json_data.get("generated_text")
else:
print("Unexpected response format.")
return None
if generated_text is not None:
print("Text generation complete using Hugging Face API.")
print(f"Generated text: {generated_text}") # Debugging line
return generated_text
else:
print("Generated text not found in response.")
return None
except requests.exceptions.RequestException as e:
print(f"Error generating text using Hugging Face API: {e}")
return None
# Function to read and extract text from a PDF
def read_pdf(file_obj):
with fitz.open(file_obj.name) as document:
text = ""
for page_num in range(document.page_count):
page = document.load_page(page_num)
text += page.get_text()
return text
# Function to format the prompt with instructions for text generation
def format_prompt_with_instructions(text, instructions):
prompt = f"{instructions}{text}\n\nAssistant:"
return prompt
# Function to save text to a PDF
def save_text_to_pdf(text, output_path):
print(f"Saving text to PDF at {output_path}...")
doc = fitz.open() # Create a new PDF document
page = doc.new_page() # Create a new page
# Set the page margins
margin = 50 # 50 points margin
page_width = page.rect.width
page_height = page.rect.height
text_width = page_width - 2 * margin
text_height = page_height - 2 * margin
# Define font size and line spacing
font_size = 9
line_spacing = 1 * font_size
fontname = "times-roman" # Use a supported font name
# Process the text to handle line breaks and paragraphs
paragraphs = text.split("\n") # Split text into paragraphs
y_position = margin
for paragraph in paragraphs:
words = paragraph.split()
current_line = ""
for word in words:
word = str(word) # Ensure word is treated as string
# Calculate the length of the current line plus the new word
current_line_length = fitz.get_text_length(current_line + " " + word, fontsize=font_size, fontname=fontname)
if current_line_length <= text_width:
current_line += " " + word
else:
page.insert_text(fitz.Point(margin, y_position), current_line.strip(), fontsize=font_size, fontname=fontname)
y_position += line_spacing
if y_position + line_spacing > page_height - margin:
page = doc.new_page() # Add a new page if text exceeds page height
y_position = margin
current_line = word
# Add the last line of the paragraph
page.insert_text(fitz.Point(margin, y_position), current_line.strip(), fontsize=font_size, fontname=fontname)
y_position += line_spacing
# Add extra space for new paragraph
y_position += line_spacing
if y_position + line_spacing > page_height - margin:
page = doc.new_page() # Add a new page if text exceeds page height
y_position = margin
doc.save(output_path) # Save the PDF to the specified path
print("PDF saved successfully.")
def get_predefined_queries(company):
return [
f"Recent earnings for {company}",
f"Recent News on {company}",
f"Recent Credit rating of {company}",
f"Recent conference call transcript of {company}"
]
# Integrated function to perform web scraping, formatting, and text generation
def scrape_and_display(query, num_results, earnings_instructions, news_instructions,
credit_rating_instructions, conference_call_instructions, final_instructions,
web_search=True, temperature=0.7, repetition_penalty=1.0, top_p=0.9):
print(f"Scraping and displaying results for query: {query} with num_results: {num_results}")
if web_search:
company = query.strip()
predefined_queries = get_predefined_queries(company)
all_results = []
all_summaries = []
instructions = [earnings_instructions, news_instructions, credit_rating_instructions, conference_call_instructions]
for pq, instruction in zip(predefined_queries, instructions):
search_results = google_search(pq, num_results=num_results // len(predefined_queries))
all_results.extend(search_results)
# Generate a summary for each predefined query
formatted_prompt = format_prompt(pq, search_results, instruction)
summary = generate_text(formatted_prompt, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p)
all_summaries.append(summary)
# Combine all summaries
combined_summary = "\n\n".join(all_summaries)
# Generate final summary using the combined results and final instructions
final_prompt = f"{final_instructions}\n\nHere are the summaries for each aspect of {company}:\n\n{combined_summary}\n\nPlease provide a comprehensive summary based on the above information:"
generated_summary = generate_text(final_prompt, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p)
else:
formatted_prompt = format_prompt_with_instructions(query, final_instructions)
generated_summary = generate_text(formatted_prompt, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p)
print("Scraping and display complete.")
if generated_summary:
assistant_index = generated_summary.find("Assistant:")
if assistant_index != -1:
generated_summary = generated_summary[assistant_index:]
else:
generated_summary = "Assistant: No response generated."
print(f"Generated summary: {generated_summary}")
return generated_summary
# Main Gradio interface function
def gradio_interface(query, use_pdf, pdf, num_results, earnings_instructions, news_instructions,
credit_rating_instructions, conference_call_instructions, final_instructions,
temperature, repetition_penalty, top_p):
if use_pdf and pdf is not None:
pdf_text = read_pdf(pdf)
generated_summary = scrape_and_display(pdf_text, num_results=0, instructions=final_instructions,
web_search=False, temperature=temperature,
repetition_penalty=repetition_penalty, top_p=top_p)
else:
generated_summary = scrape_and_display(query, num_results=num_results,
earnings_instructions=earnings_instructions,
news_instructions=news_instructions,
credit_rating_instructions=credit_rating_instructions,
conference_call_instructions=conference_call_instructions,
final_instructions=final_instructions,
web_search=True, temperature=temperature,
repetition_penalty=repetition_penalty, top_p=top_p)
output_pdf_path = "output_summary.pdf"
save_text_to_pdf(generated_summary, output_pdf_path)
return generated_summary, output_pdf_path
# Update the Gradio Interface
gr.Interface(
fn=gradio_interface,
inputs=[
gr.Textbox(label="Company Name"),
gr.Checkbox(label="Use PDF"),
gr.File(label="Upload PDF"),
gr.Slider(minimum=4, maximum=40, step=4, value=20, label="Number of Results (total for all queries)"),
gr.Textbox(label="Earnings Instructions", lines=2, placeholder="Instructions for recent earnings query..."),
gr.Textbox(label="News Instructions", lines=2, placeholder="Instructions for recent news query..."),
gr.Textbox(label="Credit Rating Instructions", lines=2, placeholder="Instructions for credit rating query..."),
gr.Textbox(label="Conference Call Instructions", lines=2, placeholder="Instructions for conference call transcript query..."),
gr.Textbox(label="Final Summary Instructions", lines=2, placeholder="Instructions for the final summary..."),
gr.Slider(minimum=0.1, maximum=1.0, value=0.7, label="Temperature"),
gr.Slider(minimum=1.0, maximum=2.0, value=1.0, label="Repetition Penalty"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.9, label="Top p")
],
outputs=["text", "file"],
title="Financial Analyst AI Assistant",
description="Enter a company name and provide specific instructions for each query. The AI will use these instructions to gather and summarize information on recent earnings, news, credit ratings, and conference call transcripts.",
)