import fitz # PyMuPDF import gradio as gr import requests from bs4 import BeautifulSoup import urllib.parse import random import os from dotenv import load_dotenv load_dotenv() # Load environment variables from .env file # Now replace the hard-coded token with the environment variable HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_TOKEN") _useragent_list = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", ] # Function to extract visible text from HTML content of a webpage def extract_text_from_webpage(html): print("Extracting text from webpage...") soup = BeautifulSoup(html, 'html.parser') for script in soup(["script", "style"]): script.extract() # Remove scripts and styles text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) print(f"Extracted text length: {len(text)}") return text # Function to perform a Google search and retrieve results def google_search(term, num_results=5, lang="en", timeout=5, safe="active", ssl_verify=None): """Performs a Google search and returns the results.""" print(f"Searching for term: {term}") escaped_term = urllib.parse.quote_plus(term) start = 0 all_results = [] max_chars_per_page = 8000 # Limit the number of characters from each webpage to stay under the token limit with requests.Session() as session: while start < num_results: print(f"Fetching search results starting from: {start}") try: # Choose a random user agent user_agent = random.choice(_useragent_list) headers = { 'User-Agent': user_agent } print(f"Using User-Agent: {headers['User-Agent']}") resp = session.get( url="https://www.google.com/search", headers=headers, params={ "q": term, "num": num_results - start, "hl": lang, "start": start, "safe": safe, }, timeout=timeout, verify=ssl_verify, ) resp.raise_for_status() except requests.exceptions.RequestException as e: print(f"Error fetching search results: {e}") break soup = BeautifulSoup(resp.text, "html.parser") result_block = soup.find_all("div", attrs={"class": "g"}) if not result_block: print("No more results found.") break for result in result_block: link = result.find("a", href=True) if link: link = link["href"] print(f"Found link: {link}") try: webpage = session.get(link, headers=headers, timeout=timeout) webpage.raise_for_status() visible_text = extract_text_from_webpage(webpage.text) if len(visible_text) > max_chars_per_page: visible_text = visible_text[:max_chars_per_page] + "..." all_results.append({"link": link, "text": visible_text}) except requests.exceptions.RequestException as e: print(f"Error fetching or processing {link}: {e}") all_results.append({"link": link, "text": None}) else: print("No link found in result.") all_results.append({"link": None, "text": None}) start += len(result_block) print(f"Total results fetched: {len(all_results)}") return all_results # Function to format the prompt for the Hugging Face API def format_prompt(query, search_results, instructions): formatted_results = "" for result in search_results: link = result["link"] text = result["text"] if link: formatted_results += f"URL: {link}\nContent: {text}\n{'-'*80}\n" else: formatted_results += "No link found.\n" + '-'*80 + '\n' prompt = f"{instructions}User Query: {query}\n\nWeb Search Results:\n{formatted_results}\n\nAssistant:" return prompt # Function to generate text using Hugging Face API def generate_text(input_text, temperature=0.7, repetition_penalty=1.0, top_p=0.9): print("Generating text using Hugging Face API...") endpoint = "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.3" headers = { "Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}", # Use the environment variable "Content-Type": "application/json" } data = { "inputs": input_text, "parameters": { "max_new_tokens": 8000, # Adjust as needed "temperature": temperature, "repetition_penalty": repetition_penalty, "top_p": top_p } } try: response = requests.post(endpoint, headers=headers, json=data) response.raise_for_status() # Check if response is JSON try: json_data = response.json() except ValueError: print("Response is not JSON.") return None # Extract generated text from response JSON if isinstance(json_data, list): # Handle list response (if applicable for your use case) generated_text = json_data[0].get("generated_text") if json_data else None elif isinstance(json_data, dict): # Handle dictionary response generated_text = json_data.get("generated_text") else: print("Unexpected response format.") return None if generated_text is not None: print("Text generation complete using Hugging Face API.") print(f"Generated text: {generated_text}") # Debugging line return generated_text else: print("Generated text not found in response.") return None except requests.exceptions.RequestException as e: print(f"Error generating text using Hugging Face API: {e}") return None # Function to read and extract text from a PDF def read_pdf(file_obj): with fitz.open(file_obj.name) as document: text = "" for page_num in range(document.page_count): page = document.load_page(page_num) text += page.get_text() return text # Function to format the prompt with instructions for text generation def format_prompt_with_instructions(text, instructions): prompt = f"{instructions}{text}\n\nAssistant:" return prompt # Function to save text to a PDF def save_text_to_pdf(text, output_path): print(f"Saving text to PDF at {output_path}...") doc = fitz.open() # Create a new PDF document page = doc.new_page() # Create a new page # Set the page margins margin = 50 # 50 points margin page_width = page.rect.width page_height = page.rect.height text_width = page_width - 2 * margin text_height = page_height - 2 * margin # Define font size and line spacing font_size = 9 line_spacing = 1 * font_size fontname = "times-roman" # Use a supported font name # Process the text to handle line breaks and paragraphs paragraphs = text.split("\n") # Split text into paragraphs y_position = margin for paragraph in paragraphs: words = paragraph.split() current_line = "" for word in words: word = str(word) # Ensure word is treated as string # Calculate the length of the current line plus the new word current_line_length = fitz.get_text_length(current_line + " " + word, fontsize=font_size, fontname=fontname) if current_line_length <= text_width: current_line += " " + word else: page.insert_text(fitz.Point(margin, y_position), current_line.strip(), fontsize=font_size, fontname=fontname) y_position += line_spacing if y_position + line_spacing > page_height - margin: page = doc.new_page() # Add a new page if text exceeds page height y_position = margin current_line = word # Add the last line of the paragraph page.insert_text(fitz.Point(margin, y_position), current_line.strip(), fontsize=font_size, fontname=fontname) y_position += line_spacing # Add extra space for new paragraph y_position += line_spacing if y_position + line_spacing > page_height - margin: page = doc.new_page() # Add a new page if text exceeds page height y_position = margin doc.save(output_path) # Save the PDF to the specified path print("PDF saved successfully.") def get_predefined_queries(company): return [ f"Recent earnings for {company}", f"Recent News on {company}", f"Recent Credit rating of {company}", f"Recent conference call transcript of {company}" ] # Integrated function to perform web scraping, formatting, and text generation def scrape_and_display(query, num_results, earnings_instructions, news_instructions, credit_rating_instructions, conference_call_instructions, final_instructions, web_search=True, temperature=0.7, repetition_penalty=1.0, top_p=0.9): print(f"Scraping and displaying results for query: {query} with num_results: {num_results}") if web_search: company = query.strip() predefined_queries = get_predefined_queries(company) all_results = [] all_summaries = [] instructions = [earnings_instructions, news_instructions, credit_rating_instructions, conference_call_instructions] for pq, instruction in zip(predefined_queries, instructions): search_results = google_search(pq, num_results=num_results // len(predefined_queries)) all_results.extend(search_results) # Generate a summary for each predefined query formatted_prompt = format_prompt(pq, search_results, instruction) summary = generate_text(formatted_prompt, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p) all_summaries.append(summary) # Combine all summaries combined_summary = "\n\n".join(all_summaries) # Generate final summary using the combined results and final instructions final_prompt = f"{final_instructions}\n\nHere are the summaries for each aspect of {company}:\n\n{combined_summary}\n\nPlease provide a comprehensive summary based on the above information:" generated_summary = generate_text(final_prompt, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p) else: formatted_prompt = format_prompt_with_instructions(query, final_instructions) generated_summary = generate_text(formatted_prompt, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p) print("Scraping and display complete.") if generated_summary: assistant_index = generated_summary.find("Assistant:") if assistant_index != -1: generated_summary = generated_summary[assistant_index:] else: generated_summary = "Assistant: No response generated." print(f"Generated summary: {generated_summary}") return generated_summary # Main Gradio interface function def gradio_interface(query, use_pdf, pdf, num_results, earnings_instructions, news_instructions, credit_rating_instructions, conference_call_instructions, final_instructions, temperature, repetition_penalty, top_p): if use_pdf and pdf is not None: pdf_text = read_pdf(pdf) generated_summary = scrape_and_display(pdf_text, num_results=0, instructions=final_instructions, web_search=False, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p) else: generated_summary = scrape_and_display(query, num_results=num_results, earnings_instructions=earnings_instructions, news_instructions=news_instructions, credit_rating_instructions=credit_rating_instructions, conference_call_instructions=conference_call_instructions, final_instructions=final_instructions, web_search=True, temperature=temperature, repetition_penalty=repetition_penalty, top_p=top_p) output_pdf_path = "output_summary.pdf" save_text_to_pdf(generated_summary, output_pdf_path) return generated_summary, output_pdf_path # Update the Gradio Interface gr.Interface( fn=gradio_interface, inputs=[ gr.Textbox(label="Company Name"), gr.Checkbox(label="Use PDF"), gr.File(label="Upload PDF"), gr.Slider(minimum=4, maximum=40, step=4, value=20, label="Number of Results (total for all queries)"), gr.Textbox(label="Earnings Instructions", lines=2, placeholder="Instructions for recent earnings query..."), gr.Textbox(label="News Instructions", lines=2, placeholder="Instructions for recent news query..."), gr.Textbox(label="Credit Rating Instructions", lines=2, placeholder="Instructions for credit rating query..."), gr.Textbox(label="Conference Call Instructions", lines=2, placeholder="Instructions for conference call transcript query..."), gr.Textbox(label="Final Summary Instructions", lines=2, placeholder="Instructions for the final summary..."), gr.Slider(minimum=0.1, maximum=1.0, value=0.7, label="Temperature"), gr.Slider(minimum=1.0, maximum=2.0, value=1.0, label="Repetition Penalty"), gr.Slider(minimum=0.1, maximum=1.0, value=0.9, label="Top p") ], outputs=["text", "file"], title="Financial Analyst AI Assistant", description="Enter a company name and provide specific instructions for each query. The AI will use these instructions to gather and summarize information on recent earnings, news, credit ratings, and conference call transcripts.", )