import gradio as gr from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM import fitz # PyMuPDF import docx import pptx import openpyxl import re import nltk from nltk.tokenize import sent_tokenize import torch from fastapi import FastAPI from fastapi.responses import RedirectResponse, FileResponse, JSONResponse from gtts import gTTS import tempfile import os import easyocr from fpdf import FPDF import datetime # Download required NLTK data nltk.download('punkt', quiet=True) # Initialize components app = FastAPI() # Load models (CPU optimized) MODEL_NAME = "facebook/bart-large-cnn" tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_NAME) summarizer = pipeline( "summarization", model=model, tokenizer=tokenizer, device=-1, # Force CPU usage torch_dtype=torch.float32 ) # Initialize EasyOCR reader reader = easyocr.Reader(['en']) # English only for faster initialization def clean_text(text: str) -> str: """Clean and normalize document text""" text = re.sub(r'\s+', ' ', text) # Normalize whitespace text = re.sub(r'•\s*|\d\.\s+', '', text) # Remove bullets and numbering text = re.sub(r'\[.*?\]|\(.*?\)', '', text) # Remove brackets/parentheses text = re.sub(r'\bPage\s*\d+\b', '', text, flags=re.IGNORECASE) # Remove page numbers return text.strip() def extract_text(file_path: str, file_extension: str) -> tuple[str, str]: """Extract text from various document formats""" try: if file_extension == "pdf": with fitz.open(file_path) as doc: text = "\n".join(page.get_text("text") for page in doc) # Try OCR for scanned PDFs if text extraction fails if len(text.strip()) < 50: images = [page.get_pixmap() for page in doc] temp_img = tempfile.NamedTemporaryFile(suffix=".png", delete=False) images[0].save(temp_img.name) ocr_result = reader.readtext(temp_img.name, detail=0) os.unlink(temp_img.name) text = "\n".join(ocr_result) if ocr_result else text return clean_text(text), "" elif file_extension == "docx": doc = docx.Document(file_path) return clean_text("\n".join(p.text for p in doc.paragraphs)), "" elif file_extension == "pptx": prs = pptx.Presentation(file_path) text = [] for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, "text"): text.append(shape.text) return clean_text("\n".join(text)), "" elif file_extension == "xlsx": wb = openpyxl.load_workbook(file_path, read_only=True) text = [] for sheet in wb.sheetnames: for row in wb[sheet].iter_rows(values_only=True): text.append(" ".join(str(cell) for cell in row if cell)) return clean_text("\n".join(text)), "" elif file_extension in ["jpg", "jpeg", "png"]: ocr_result = reader.readtext(file_path, detail=0) return clean_text("\n".join(ocr_result)), "" return "", "Unsupported file format" except Exception as e: return "", f"Error reading {file_extension.upper()} file: {str(e)}" def chunk_text(text: str, max_tokens: int = 768) -> list[str]: """Split text into manageable chunks for summarization""" try: sentences = sent_tokenize(text) except: # Fallback if sentence tokenization fails words = text.split() sentences = [' '.join(words[i:i+20]) for i in range(0, len(words), 20)] chunks = [] current_chunk = "" for sentence in sentences: if len(current_chunk.split()) + len(sentence.split()) <= max_tokens: current_chunk += " " + sentence else: chunks.append(current_chunk.strip()) current_chunk = sentence if current_chunk: chunks.append(current_chunk.strip()) return chunks def generate_summary(text: str, length: str = "medium") -> str: """Generate summary with appropriate length parameters""" length_params = { "short": {"max_length": 80, "min_length": 30}, "medium": {"max_length": 200, "min_length": 80}, "long": {"max_length": 300, "min_length": 210} } chunks = chunk_text(text) summaries = [] for chunk in chunks: try: summary = summarizer( chunk, max_length=length_params[length]["max_length"], min_length=length_params[length]["min_length"], do_sample=False, truncation=True, no_repeat_ngram_size=2, num_beams=2, early_stopping=True ) summaries.append(summary[0]['summary_text']) except Exception as e: summaries.append(f"[Chunk error: {str(e)}]") # Combine and format the final summary final_summary = " ".join(summaries) final_summary = ". ".join(s.strip().capitalize() for s in final_summary.split(". ") if s.strip()) return final_summary if len(final_summary) > 25 else "Summary too short - document may be too brief" def text_to_speech(text: str) -> str: """Convert text to speech and return temporary audio file path""" try: tts = gTTS(text) temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") tts.save(temp_audio.name) return temp_audio.name except Exception as e: print(f"Error in text-to-speech: {e}") return "" def create_pdf(summary: str, original_filename: str) -> str: """Create a PDF file from the summary text""" try: # Create PDF object pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) # Add title pdf.set_font("Arial", 'B', 16) pdf.cell(200, 10, txt="Document Summary", ln=1, align='C') pdf.set_font("Arial", size=12) # Add metadata pdf.cell(200, 10, txt=f"Original file: {original_filename}", ln=1) pdf.cell(200, 10, txt=f"Generated on: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", ln=1) pdf.ln(10) # Add summary content pdf.multi_cell(0, 10, txt=summary) # Save to temporary file temp_pdf = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") pdf.output(temp_pdf.name) return temp_pdf.name except Exception as e: print(f"Error creating PDF: {e}") return "" def summarize_document(file, summary_length: str, enable_tts: bool): """Main processing function for Gradio interface""" if file is None: return "Please upload a document first", "Ready", None, None file_path = file.name file_extension = file_path.split(".")[-1].lower() original_filename = os.path.basename(file_path) text, error = extract_text(file_path, file_extension) if error: return error, "Error", None, None if not text or len(text.split()) < 30: return "Document is too short or contains too little text to summarize", "Ready", None, None try: summary = generate_summary(text, summary_length) audio_path = text_to_speech(summary) if enable_tts else None pdf_path = create_pdf(summary, original_filename) if summary else None return summary, "Summary complete", audio_path, pdf_path except Exception as e: return f"Summarization error: {str(e)}", "Error", None, None # Gradio Interface with gr.Blocks(title="Document Summarizer", theme=gr.themes.Soft()) as demo: gr.Markdown("# 📄 Advanced Document Summarizer") gr.Markdown("Upload a document to generate a summary with optional audio reading and PDF download") with gr.Row(): with gr.Column(): file_input = gr.File( label="Upload Document", file_types=[".pdf", ".docx", ".pptx", ".xlsx", ".jpg", ".jpeg", ".png"], type="filepath" ) length_radio = gr.Radio( ["short", "medium", "long"], value="medium", label="Summary Length" ) tts_checkbox = gr.Checkbox( label="Enable Text-to-Speech", value=False ) submit_btn = gr.Button("Generate Summary", variant="primary") with gr.Column(): output = gr.Textbox(label="Summary", lines=10) status = gr.Textbox(label="Status", interactive=False) audio_output = gr.Audio(label="Audio Summary", visible=False) pdf_download = gr.File(label="Download Summary as PDF", visible=False) def toggle_audio_visibility(enable_tts): return gr.Audio(visible=enable_tts) def update_ui(summary, status, audio_path, pdf_path): return ( summary, status, gr.Audio(visible=audio_path is not None, value=audio_path), gr.File(visible=pdf_path is not None, value=pdf_path) ) tts_checkbox.change( fn=toggle_audio_visibility, inputs=tts_checkbox, outputs=audio_output ) submit_btn.click( fn=summarize_document, inputs=[file_input, length_radio, tts_checkbox], outputs=[output, status, audio_output, pdf_download] ).then( fn=update_ui, inputs=[output, status, audio_output, pdf_download], outputs=[output, status, audio_output, pdf_download] ) # FastAPI endpoints for files @app.get("/files/{file_name}") async def get_file(file_name: str): file_path = os.path.join(tempfile.gettempdir(), file_name) if os.path.exists(file_path): return FileResponse(file_path) return JSONResponse({"error": "File not found"}, status_code=404) # Mount Gradio app to FastAPI app = gr.mount_gradio_app(app, demo, path="/") @app.get("/") def redirect_to_interface(): return RedirectResponse(url="/")