import fitz # PyMuPDF import gradio as gr import requests from bs4 import BeautifulSoup import urllib.parse import random import os from dotenv import load_dotenv import shutil import tempfile load_dotenv() # Load environment variables from .env file # Now replace the hard-coded token with the environment variable HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_TOKEN") def clear_cache(): try: # Clear Gradio cache cache_dir = tempfile.gettempdir() shutil.rmtree(os.path.join(cache_dir, "gradio"), ignore_errors=True) # Clear any custom cache you might have # For example, if you're caching PDF files or search results: if os.path.exists("output_summary.pdf"): os.remove("output_summary.pdf") # Add any other cache clearing operations here print("Cache cleared successfully.") return "Cache cleared successfully." except Exception as e: print(f"Error clearing cache: {e}") return f"Error clearing cache: {e}" _useragent_list = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", ] # Function to extract visible text from HTML content of a webpage def extract_text_from_webpage(html): print("Extracting text from webpage...") soup = BeautifulSoup(html, 'html.parser') for script in soup(["script", "style"]): script.extract() # Remove scripts and styles text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) print(f"Extracted text length: {len(text)}") return text # Function to perform a Google search and retrieve results def google_search(term, num_results=5, lang="en", timeout=5, safe="active", ssl_verify=None): """Performs a Google search and returns the results.""" print(f"Searching for term: {term}") escaped_term = urllib.parse.quote_plus(term) start = 0 all_results = [] max_chars_per_page = 8000 # Limit the number of characters from each webpage to stay under the token limit with requests.Session() as session: while start < num_results: print(f"Fetching search results starting from: {start}") try: # Choose a random user agent user_agent = random.choice(_useragent_list) headers = { 'User-Agent': user_agent } print(f"Using User-Agent: {headers['User-Agent']}") resp = session.get( url="https://www.google.com/search", headers=headers, params={ "q": term, "num": num_results - start, "hl": lang, "start": start, "safe": safe, }, timeout=timeout, verify=ssl_verify, ) resp.raise_for_status() except requests.exceptions.RequestException as e: print(f"Error fetching search results: {e}") break soup = BeautifulSoup(resp.text, "html.parser") result_block = soup.find_all("div", attrs={"class": "g"}) if not result_block: print("No more results found.") break for result in result_block: link = result.find("a", href=True) if link: link = link["href"] print(f"Found link: {link}") try: webpage = session.get(link, headers=headers, timeout=timeout) webpage.raise_for_status() visible_text = extract_text_from_webpage(webpage.text) if len(visible_text) > max_chars_per_page: visible_text = visible_text[:max_chars_per_page] + "..." all_results.append({"link": link, "text": visible_text}) except requests.exceptions.RequestException as e: print(f"Error fetching or processing {link}: {e}") all_results.append({"link": link, "text": None}) else: print("No link found in result.") all_results.append({"link": None, "text": None}) start += len(result_block) print(f"Total results fetched: {len(all_results)}") return all_results # Function to format the prompt for the Hugging Face API def format_prompt(query, search_results, instructions): formatted_results = "" for result in search_results: link = result["link"] text = result["text"] if link: formatted_results += f"URL: {link}\nContent: {text}\n{'-' * 80}\n" else: formatted_results += "No link found.\n" + '-' * 80 + '\n' prompt = f"{instructions}User Query: {query}\n\nWeb Search Results:\n{formatted_results}\n\nAssistant:" return prompt # Function to generate text using Hugging Face API def generate_text(input_text, temperature=0.7, repetition_penalty=1.0, top_p=0.9): print("Generating text using Hugging Face API...") endpoint = "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.3" headers = { "Authorization": f"Bearer {HUGGINGFACE_API_TOKEN}", # Use the environment variable "Content-Type": "application/json" } data = { "inputs": input_text, "parameters": { "max_new_tokens": 8000, # Adjust as needed "temperature": temperature, "repetition_penalty": repetition_penalty, "top_p": top_p } } try: response = requests.post(endpoint, headers=headers, json=data) response.raise_for_status() # Check if response is JSON try: json_data = response.json() except ValueError: print("Response is not JSON.") return None # Extract generated text from response JSON if isinstance(json_data, list): # Handle list response (if applicable for your use case) generated_text = json_data[0].get("generated_text") if json_data else None elif isinstance(json_data, dict): # Handle dictionary response generated_text = json_data.get("generated_text") else: print("Unexpected response format.") return None if generated_text is not None: print("Text generation complete using Hugging Face API.") print(f"Generated text: {generated_text}") # Debugging line return generated_text else: print("Generated text not found in response.") return None except requests.exceptions.RequestException as e: print(f"Error generating text using Hugging Face API: {e}") return None # Function to read and extract text from a PDF def read_pdf(file_obj): with fitz.open(file_obj.name) as document: text = "" for page_num in range(document.page_count): page = document.load_page(page_num) text += page.get_text() return text # Function to format the prompt with instructions for text generation def format_prompt_with_instructions(text, instructions): prompt = f"{instructions}{text}\n\nAssistant:" return prompt # Function to save text to a PDF def save_text_to_pdf(text, output_path): print(f"Saving text to PDF at {output_path}...") doc = fitz.open() # Create a new PDF document page = doc.new_page() # Create a new page # Set the page margins margin = 50 # 50 points margin page_width = page.rect.width page_height = page.rect.height text_width = page_width - 2 * margin text_height = page_height - 2 * margin # Define font size and line spacing font_size = 9 line_spacing = 1 * font_size fontname = "times-roman" # Use a supported font name # Process the text into lines that fit within the text_width lines = [] current_line = "" current_line_width = 0 words = text.split(" ") for word in words: word_width = fitz.get_text_length(word, fontname, font_size) if current_line_width + word_width <= text_width: current_line += word + " " current_line_width += word_width + fitz.get_text_length(" ", fontname, font_size) else: lines.append(current_line.strip()) current_line = word + " " current_line_width = word_width + fitz.get_text_length(" ", fontname, font_size) if current_line: lines.append(current_line.strip()) # Add the lines to the page with margins x = margin y = margin for line in lines: if y + line_spacing > text_height: # Create a new page if text exceeds the page height page = doc.new_page() y = margin # Reset y-coordinate for the new page page.insert_text((x, y), line, fontname=fontname, fontsize=font_size) y += line_spacing doc.save(output_path) # Save the PDF to the specified output path print(f"Text saved to PDF at {output_path}") # Function to process the PDF or search query and generate a summary def process_input(query_or_file, is_pdf, instructions, temperature, top_p, repetition_penalty): load_dotenv() # Load environment variables from .env file HUGGINGFACE_API_TOKEN = os.getenv("HUGGINGFACE_TOKEN") if is_pdf: print(f"Processing PDF: {query_or_file.name}") input_text = read_pdf(query_or_file) else: print(f"Processing search query: {query_or_file}") search_results = google_search(query_or_file) input_text = "\n\n".join(result["text"] for result in search_results if result["text"]) # Split the input text into smaller chunks to fit within the token limit chunk_size = 1024 # Adjust as needed to stay within the token limit text_chunks = [input_text[i:i + chunk_size] for i in range(0, len(input_text), chunk_size)] print(f"Total number of chunks: {len(text_chunks)}") # Generate summaries for each chunk and concatenate them concatenated_summary = "" for chunk in text_chunks: prompt = format_prompt_with_instructions(chunk, instructions) chunk_summary = generate_text(prompt, temperature, repetition_penalty, top_p) concatenated_summary += f"{chunk_summary}\n\n" print("Final concatenated summary generated.") return concatenated_summary # Function to clear cache def clear_cache(): try: # Clear Gradio cache cache_dir = tempfile.gettempdir() shutil.rmtree(os.path.join(cache_dir, "gradio"), ignore_errors=True) # Clear any custom cache you might have # For example, if you're caching PDF files or search results: if os.path.exists("output_summary.pdf"): os.remove("output_summary.pdf") # Add any other cache clearing operations here print("Cache cleared successfully.") return "Cache cleared successfully." except Exception as e: print(f"Error clearing cache: {e}") return f"Error clearing cache: {e}" def summarization_interface(): with gr.Blocks() as demo: gr.Markdown("# PDF and Web Summarization Tool") with gr.Tab("Summarize PDF"): pdf_file = gr.File(label="Upload PDF", file_types=[".pdf"]) pdf_instructions = gr.Textbox(label="Instructions for Summarization", placeholder="Enter instructions for summarization", lines=3) pdf_temperature = gr.Slider(label="Temperature", minimum=0.0, maximum=1.0, value=0.7, step=0.01) pdf_top_p = gr.Slider(label="Top P", minimum=0.0, maximum=1.0, value=0.9, step=0.01) pdf_repetition_penalty = gr.Slider(label="Repetition Penalty", minimum=0.5, maximum=2.0, value=1.0, step=0.1) pdf_summary_output = gr.Textbox(label="Concatenated Summary Output") pdf_summarize_button = gr.Button("Generate Summary") pdf_clear_cache_button = gr.Button("Clear Cache") with gr.Tab("Summarize Web Search"): search_query = gr.Textbox(label="Enter Search Query", placeholder="Enter search query") search_instructions = gr.Textbox(label="Instructions for Summarization", placeholder="Enter instructions for summarization", lines=3) search_temperature = gr.Slider(label="Temperature", minimum=0.0, maximum=1.0, value=0.7, step=0.01) search_top_p = gr.Slider(label="Top P", minimum=0.0, maximum=1.0, value=0.9, step=0.01) search_repetition_penalty = gr.Slider(label="Repetition Penalty", minimum=0.5, maximum=2.0, value=1.0, step=0.1) search_summary_output = gr.Textbox(label="Concatenated Summary Output") search_summarize_button = gr.Button("Generate Summary") search_clear_cache_button = gr.Button("Clear Cache") # Bind functions to button clicks pdf_summarize_button.click( fn=lambda file, instructions, temperature, top_p, repetition_penalty: generate_and_save_summary(file, True, instructions, temperature, top_p, repetition_penalty), inputs=[pdf_file, pdf_instructions, pdf_temperature, pdf_top_p, pdf_repetition_penalty], outputs=[pdf_summary_output] ) search_summarize_button.click( fn=lambda query, instructions, temperature, top_p, repetition_penalty: generate_and_save_summary(query, False, instructions, temperature, top_p, repetition_penalty), inputs=[search_query, search_instructions, search_temperature, search_top_p, search_repetition_penalty], outputs=[search_summary_output] ) pdf_clear_cache_button.click(fn=clear_cache, inputs=None, outputs=pdf_summary_output) search_clear_cache_button.click(fn=clear_cache, inputs=None, outputs=search_summary_output) return demo # Launch the Gradio interface demo = summarization_interface() demo.launch()