File size: 5,793 Bytes
bf8498e
 
 
 
2fd0e2b
bf8498e
 
 
 
 
25b7c57
bf8498e
2fd0e2b
bf8498e
 
 
 
 
 
 
2fd0e2b
65a0de7
bf8498e
 
 
 
 
 
 
 
65a0de7
 
bf8498e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea4f634
bf8498e
 
 
 
 
 
 
 
 
 
ea4f634
bf8498e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea4f634
bf8498e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2fd0e2b
bf8498e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea4f634
bf8498e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# extract_text_from_pdf.py

import os
import torch
import spaces
from PyPDF2 import PdfReader
from accelerate import Accelerator
from transformers import AutoModelForCausalLM, AutoTokenizer
from tqdm import tqdm
import warnings



warnings.filterwarnings('ignore')


class PDFTextExtractor:
    """
    A class to handle PDF text extraction and preprocessing for podcast preparation.
    """
    @spaces.GPU
    def __init__(self, pdf_path, output_path):
        """
        Initialize the PDFTextExtractor with paths and model details.
        
        Args:
            pdf_path (str): Path to the PDF file.
            output_path (str): Path to save the cleaned text file.
            model_name (str): Name of the model to use for text processing.
        """

        model_name="meta-llama/Llama-3.2-1B-Instruct"
        self.pdf_path = pdf_path
        self.output_path = output_path
        self.max_chars = 100000
        self.chunk_size = 1000
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        
        # Initialize model and tokenizer
        self.accelerator = Accelerator()
        self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16).to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model, self.tokenizer = self.accelerator.prepare(self.model, self.tokenizer)
        
        # System prompt for text processing
        self.system_prompt = """
        You are a world class text pre-processor, here is the raw data from a PDF, please parse and return it in a way that is crispy and usable to send to a podcast writer.
        
        Be smart and aggressive with removing details; you're only cleaning up the text without summarizing.
        Here is the text:
        """
    @spaces.GPU
    def validate_pdf(self):
        """Check if the file exists and is a valid PDF."""
        if not os.path.exists(self.pdf_path):
            print(f"Error: File not found at path: {self.pdf_path}")
            return False
        if not self.pdf_path.lower().endswith('.pdf'):
            print("Error: File is not a PDF")
            return False
        return True

    @spaces.GPU
    def extract_text(self):
        """Extract text from the PDF, limited by max_chars."""
        if not self.validate_pdf():
            return None
        
        with open(self.pdf_path, 'rb') as file:
            pdf_reader = PdfReader(file)
            num_pages = len(pdf_reader.pages)
            print(f"Processing PDF with {num_pages} pages...")
            
            extracted_text = []
            total_chars = 0
            
            for page_num in range(num_pages):
                page = pdf_reader.pages[page_num]
                text = page.extract_text() or ""
                
                if total_chars + len(text) > self.max_chars:
                    remaining_chars = self.max_chars - total_chars
                    extracted_text.append(text[:remaining_chars])
                    print(f"Reached {self.max_chars} character limit at page {page_num + 1}")
                    break
                
                extracted_text.append(text)
                total_chars += len(text)
                print(f"Processed page {page_num + 1}/{num_pages}")
            
            final_text = '\n'.join(extracted_text)
            print(f"Extraction complete! Total characters: {len(final_text)}")
            return final_text
    @spaces.GPU
    def create_word_bounded_chunks(self, text):
        """Split text into chunks around the target size."""
        words = text.split()
        chunks = []
        current_chunk = []
        current_length = 0
        
        for word in words:
            word_length = len(word) + 1  # +1 for the space
            if current_length + word_length > self.chunk_size and current_chunk:
                chunks.append(' '.join(current_chunk))
                current_chunk = [word]
                current_length = word_length
            else:
                current_chunk.append(word)
                current_length += word_length
        
        if current_chunk:
            chunks.append(' '.join(current_chunk))
        
        return chunks

    @spaces.GPU
    def process_chunk(self, text_chunk):
        """Process a text chunk with the model and return the cleaned text."""
        conversation = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": text_chunk}
        ]
        
        prompt = self.tokenizer.apply_chat_template(conversation, tokenize=False)
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        
        with torch.no_grad():
            output = self.model.generate(**inputs, temperature=0.7, top_p=0.9, max_new_tokens=512)
        
        processed_text = self.tokenizer.decode(output[0], skip_special_tokens=True)[len(prompt):].strip()
        return processed_text
    @spaces.GPU
    def clean_and_save_text(self):
        """Extract, clean, and save processed text to a file."""
        extracted_text = self.extract_text()
        if not extracted_text:
            return None
        
        chunks = self.create_word_bounded_chunks(extracted_text)
        processed_text = ""
        
        with open(self.output_path, 'w', encoding='utf-8') as out_file:
            for chunk_num, chunk in enumerate(tqdm(chunks, desc="Processing chunks")):
                processed_chunk = self.process_chunk(chunk)
                processed_text += processed_chunk + "\n"
                out_file.write(processed_chunk + "\n")
                out_file.flush()
        
        print(f"\nExtracted and cleaned text has been saved to {self.output_path}")
        return self.output_path