Spaces:
Running
Running
import os | |
import re | |
import tempfile | |
import requests | |
import gradio as gr | |
from PyPDF2 import PdfReader | |
import logging | |
import webbrowser | |
from huggingface_hub import InferenceClient | |
from typing import Dict, List, Optional, Tuple | |
import time | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Constants | |
CONTEXT_SIZES = { | |
"4K": 4000, | |
"8K": 8000, | |
"32K": 32000, | |
"128K": 128000, | |
"200K": 200000 | |
} | |
class ModelRegistry: | |
def __init__(self): | |
self.hf_models = { | |
"Phi-3 Mini 128k": "microsoft/Phi-3-mini-128k-instruct", | |
"Custom Model": "" | |
} | |
self.groq_models = self._fetch_groq_models() | |
def _fetch_groq_models(self) -> Dict[str, str]: | |
"""Fetch available Groq models""" | |
try: | |
headers = { | |
"Authorization": f"Bearer {os.getenv('GROQ_API_KEY')}", | |
"Content-Type": "application/json" | |
} | |
response = requests.get("https://api.groq.com/openai/v1/models", headers=headers) | |
if response.status_code == 200: | |
models = response.json().get("data", []) | |
return {model["id"]: model["id"] for model in models} | |
else: | |
logging.error(f"Failed to fetch Groq models: {response.status_code}") | |
return self._get_default_groq_models() | |
except Exception as e: | |
logging.error(f"Error fetching Groq models: {e}") | |
return self._get_default_groq_models() | |
def _get_default_groq_models(self) -> Dict[str, str]: | |
"""Return default Groq models when API is unavailable""" | |
return { | |
"llama-3.1-70b-versatile": "llama-3.1-70b-versatile", | |
"mixtral-8x7b-32768": "mixtral-8x7b-32768", | |
"llama-3.1-8b-instant": "llama-3.1-8b-instant" | |
} | |
def refresh_groq_models(self) -> Dict[str, str]: | |
"""Refresh the list of available Groq models""" | |
self.groq_models = self._fetch_groq_models() | |
return self.groq_models | |
# Initialize model registry | |
model_registry = ModelRegistry() | |
def extract_text_from_pdf(pdf_path: str) -> str: | |
"""Extract text content from PDF file.""" | |
try: | |
reader = PdfReader(pdf_path) | |
text = "" | |
for page_num, page in enumerate(reader.pages, start=1): | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
else: | |
logging.warning(f"No text found on page {page_num}.") | |
if not text.strip(): | |
return "Error: No extractable text found in the PDF." | |
return text | |
except Exception as e: | |
logging.error(f"Error reading PDF file: {e}") | |
return f"Error reading PDF file: {e}" | |
def format_content(text: str, format_type: str) -> str: | |
"""Format extracted text according to specified format.""" | |
if format_type == 'txt': | |
return text | |
elif format_type == 'md': | |
paragraphs = text.split('\n\n') | |
return '\n\n'.join(paragraphs) | |
elif format_type == 'html': | |
paragraphs = text.split('\n\n') | |
return ''.join([f'<p>{para.strip()}</p>' for para in paragraphs if para.strip()]) | |
else: | |
logging.error(f"Unsupported format: {format_type}") | |
return f"Unsupported format: {format_type}" | |
def split_into_snippets(text: str, context_size: int) -> List[str]: | |
"""Split text into manageable snippets based on context size.""" | |
sentences = re.split(r'(?<=[.!?]) +', text) | |
snippets = [] | |
current_snippet = "" | |
for sentence in sentences: | |
if len(current_snippet) + len(sentence) + 1 > context_size: | |
if current_snippet: | |
snippets.append(current_snippet.strip()) | |
current_snippet = sentence + " " | |
else: | |
snippets.append(sentence.strip()) | |
current_snippet = "" | |
else: | |
current_snippet += sentence + " " | |
if current_snippet.strip(): | |
snippets.append(current_snippet.strip()) | |
return snippets | |
def build_prompts(snippets: List[str], prompt_instruction: str, custom_prompt: Optional[str], snippet_num: Optional[int] = None) -> str: | |
"""Build formatted prompts from text snippets.""" | |
if snippet_num is not None: | |
if 1 <= snippet_num <= len(snippets): | |
selected_snippets = [snippets[snippet_num - 1]] | |
else: | |
return f"Error: Invalid snippet number. Please choose between 1 and {len(snippets)}." | |
else: | |
selected_snippets = snippets | |
prompts = [] | |
base_prompt = custom_prompt if custom_prompt else prompt_instruction | |
for idx, snippet in enumerate(selected_snippets, start=1): | |
if len(selected_snippets) > 1: | |
prompt_header = f"{base_prompt} Part {idx} of {len(selected_snippets)}: ---\n" | |
else: | |
prompt_header = f"{base_prompt} ---\n" | |
framed_prompt = f"{prompt_header}{snippet}\n---" | |
prompts.append(framed_prompt) | |
return "\n\n".join(prompts) | |
def send_to_hf_inference(prompt: str, model_name: str, api_key: str) -> str: | |
"""Send prompt to HuggingFace using Inference API""" | |
try: | |
client = InferenceClient(api_key=api_key) | |
messages = [{"role": "user", "content": prompt}] | |
completion = client.chat.completions.create( | |
model=model_name, | |
messages=messages, | |
max_tokens=500 | |
) | |
return completion.choices[0].message.content | |
except Exception as e: | |
logging.error(f"Error with HF inference: {e}") | |
return f"Error with HF inference: {e}" | |
def send_to_groq(prompt: str, model_name: str, api_key: str) -> str: | |
"""Send prompt to Groq API""" | |
try: | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"model": model_name, | |
"messages": [{"role": "user", "content": prompt}] | |
} | |
response = requests.post( | |
"https://api.groq.com/openai/v1/chat/completions", | |
headers=headers, | |
json=data | |
) | |
return response.json()["choices"][0]["message"]["content"] | |
except Exception as e: | |
logging.error(f"Error with Groq API: {e}") | |
return f"Error with Groq API: {e}" | |
def copy_to_clipboard(text: str) -> str: | |
"""Copy text to clipboard""" | |
return "Text copied to clipboard!" | |
def open_chatgpt() -> str: | |
"""Open ChatGPT in browser""" | |
webbrowser.open('https://chat.openai.com/') | |
return "Opening ChatGPT in browser..." | |
def process_pdf(pdf, fmt, ctx_size, snippet_num, prompt, model_selection, | |
hf_model_choice, hf_custom_model, hf_api_key, | |
groq_model_choice, groq_api_key) -> Tuple[str, str, str, List[str]]: | |
"""Process PDF and generate summary""" | |
try: | |
if not pdf: | |
return "Please upload a PDF file.", "", "", [] | |
# Extract text | |
text = extract_text_from_pdf(pdf.name) | |
if text.startswith("Error"): | |
return text, "", "", [] | |
# Format content | |
formatted_text = format_content(text, fmt) | |
# Split into snippets | |
snippets = split_into_snippets(formatted_text, ctx_size) | |
# Build prompts | |
default_prompt = "Summarize the following text:" | |
full_prompt = build_prompts(snippets, default_prompt, prompt, snippet_num) | |
if isinstance(full_prompt, str) and full_prompt.startswith("Error"): | |
return full_prompt, "", "", [] | |
# Process with selected model | |
if model_selection == "HuggingFace Inference": | |
if not hf_api_key: | |
return "HuggingFace API key required.", full_prompt, "", [] | |
model_id = hf_custom_model if hf_model_choice == "Custom Model" else model_registry.hf_models[hf_model_choice] | |
summary = send_to_hf_inference(full_prompt, model_id, hf_api_key) | |
elif model_selection == "Groq API": | |
if not groq_api_key: | |
return "Groq API key required.", full_prompt, "", [] | |
summary = send_to_groq(full_prompt, groq_model_choice, groq_api_key) | |
else: # OpenAI ChatGPT | |
summary = "Please use the Copy Prompt button and paste into ChatGPT." | |
# Save files for download | |
files_to_download = [] | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as prompt_file: | |
prompt_file.write(full_prompt) | |
files_to_download.append(prompt_file.name) | |
if summary != "Please use the Copy Prompt button and paste into ChatGPT.": | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as summary_file: | |
summary_file.write(summary) | |
files_to_download.append(summary_file.name) | |
return "Processing complete!", full_prompt, summary, files_to_download | |
except Exception as e: | |
logging.error(f"Error processing PDF: {e}") | |
return f"Error processing PDF: {str(e)}", "", "", [] | |
# Main Interface | |
with gr.Blocks(theme=gr.themes.Default()) as demo: | |
# Store context size value | |
context_size_value = gr.State(value=32000) | |
# Header | |
gr.Markdown("# π Smart PDF Summarizer") | |
gr.Markdown("Upload a PDF document and get AI-powered summaries using various AI models.") | |
# Main Content | |
with gr.Row(): | |
# Left Column - Input Options | |
with gr.Column(scale=1): | |
pdf_input = gr.File( | |
label="π Upload PDF", | |
file_types=[".pdf"] | |
) | |
with gr.Row(): | |
format_type = gr.Radio( | |
choices=["txt", "md", "html"], | |
value="txt", | |
label="π Output Format" | |
) | |
gr.Markdown("### Context Window Size") | |
with gr.Row(): | |
context_buttons = [] | |
for size_name, size_value in CONTEXT_SIZES.items(): | |
btn = gr.Button(size_name) | |
context_buttons.append((btn, size_value)) | |
context_size = gr.Slider( | |
minimum=1000, | |
maximum=200000, | |
step=1000, | |
value=32000, | |
label="π Custom Context Size" | |
) | |
snippet_number = gr.Number( | |
label="π’ Snippet Number", | |
value=1, | |
precision=0 | |
) | |
custom_prompt = gr.Textbox( | |
label="βοΈ Custom Prompt", | |
placeholder="Enter your custom prompt here...", | |
lines=2 | |
) | |
model_choice = gr.Radio( | |
choices=["OpenAI ChatGPT", "HuggingFace Inference", "Groq API"], | |
value="OpenAI ChatGPT", | |
label="π€ Model Selection" | |
) | |
with gr.Column(visible=False) as hf_options: | |
hf_model = gr.Dropdown( | |
choices=list(model_registry.hf_models.keys()), | |
label="π§ HuggingFace Model", | |
value="Phi-3 Mini 128k" | |
) | |
hf_custom_model = gr.Textbox( | |
label="Custom Model ID", | |
placeholder="Enter custom model ID...", | |
visible=False | |
) | |
hf_api_key = gr.Textbox( | |
label="π HuggingFace API Key", | |
type="password" | |
) | |
with gr.Column(visible=False) as groq_options: | |
groq_model = gr.Dropdown( | |
choices=list(model_registry.groq_models.keys()), | |
label="π§ Groq Model", | |
value=list(model_registry.groq_models.keys())[0] | |
) | |
groq_refresh_btn = gr.Button("π Refresh Models") | |
groq_api_key = gr.Textbox( | |
label="π Groq API Key", | |
type="password" | |
) | |
# Right Column - Output | |
with gr.Column(scale=1): | |
process_button = gr.Button("π Process PDF", variant="primary") | |
progress_status = gr.Textbox( | |
label="π Progress", | |
interactive=False | |
) | |
generated_prompt = gr.Textbox( | |
label="π Generated Prompt", | |
lines=10 | |
) | |
with gr.Row(): | |
copy_prompt_button = gr.Button("π Copy Prompt") | |
open_chatgpt_button = gr.Button("π Open ChatGPT") | |
summary_output = gr.Textbox( | |
label="π Summary", | |
lines=15 | |
) | |
with gr.Row(): | |
copy_summary_button = gr.Button("π Copy Summary") | |
download_files = gr.Files( | |
label="π₯ Download Files" | |
) | |
# Event Handlers | |
def update_context_size(size): | |
return gr.update(value=size) | |
def toggle_model_options(choice): | |
return ( | |
gr.update(visible=choice == "HuggingFace Inference"), | |
gr.update(visible=choice == "Groq API") | |
) | |
def refresh_groq_models_list(): | |
updated_models = model_registry.refresh_groq_models() | |
return gr.update(choices=list(updated_models.keys())) | |
def toggle_custom_model(model_name): | |
return gr.update(visible=model_name == "Custom Model") | |
# Connect event handlers | |
model_choice.change( | |
toggle_model_options, | |
inputs=[model_choice], | |
outputs=[hf_options, groq_options] | |
) | |
for btn, size_value in context_buttons: | |
btn.click( | |
lambda v=size_value: v, # Simplified to directly return the value | |
None, | |
context_size | |
) | |
hf_model.change( | |
toggle_custom_model, | |
inputs=[hf_model], | |
outputs=[hf_custom_model] | |
) | |
groq_refresh_btn.click( | |
refresh_groq_models_list, | |
outputs=[groq_model] | |
) | |
process_button.click( | |
process_pdf, | |
inputs=[ | |
pdf_input, | |
format_type, | |
context_size, | |
snippet_number, | |
custom_prompt, | |
model_choice, | |
hf_model, | |
hf_custom_model, | |
hf_api_key, | |
groq_model, | |
groq_api_key | |
], | |
outputs=[ | |
progress_status, | |
generated_prompt, | |
summary_output, | |
download_files | |
] | |
) | |
copy_prompt_button.click( | |
copy_to_clipboard, | |
inputs=[generated_prompt], | |
outputs=[progress_status] | |
) | |
copy_summary_button.click( | |
copy_to_clipboard, | |
inputs=[summary_output], | |
outputs=[progress_status] | |
) | |
open_chatgpt_button.click( | |
open_chatgpt, | |
outputs=[progress_status] | |
) | |
# Instructions | |
gr.Markdown(""" | |
### π Instructions: | |
1. Upload a PDF document | |
2. Choose output format and context window size | |
3. Select snippet number (default: 1) or enter custom prompt | |
4. Select your preferred model: | |
- OpenAI ChatGPT: Manual copy/paste workflow | |
- HuggingFace Inference: Direct API integration | |
- Groq API: High-performance inference | |
5. Click 'Process PDF' to generate summary | |
6. Use 'Copy Prompt' and 'Open ChatGPT' for manual processing | |
7. Download generated files as needed | |
### βοΈ Features: | |
- Support for multiple PDF formats | |
- Flexible text formatting options | |
- Predefined context window sizes (4K to 200K) | |
- Multiple model integrations | |
- Copy to clipboard functionality | |
- Direct ChatGPT integration | |
- Downloadable outputs | |
""") | |
# Launch the interface | |
if __name__ == "__main__": | |
demo.launch(share=False, debug=True) |