File size: 5,682 Bytes
bf8498e
 
 
 
 
 
 
 
 
25b7c57
bf8498e
 
 
 
 
 
 
 
 
1e21755
bf8498e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# extract_text_from_pdf.py

import os
import torch
from PyPDF2 import PdfReader
from accelerate import Accelerator
from transformers import AutoModelForCausalLM, AutoTokenizer
from tqdm import tqdm
import warnings


warnings.filterwarnings('ignore')


class PDFTextExtractor:
    """
    A class to handle PDF text extraction and preprocessing for podcast preparation.
    """

    def __init__(self, pdf_path, output_path, model_name="meta-llama/Llama-3.2-1B-Instruct"):
        """
        Initialize the PDFTextExtractor with paths and model details.
        
        Args:
            pdf_path (str): Path to the PDF file.
            output_path (str): Path to save the cleaned text file.
            model_name (str): Name of the model to use for text processing.
        """
        self.pdf_path = pdf_path
        self.output_path = output_path
        self.max_chars = 100000
        self.chunk_size = 1000
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        
        # Initialize model and tokenizer
        self.accelerator = Accelerator()
        self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16).to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model, self.tokenizer = self.accelerator.prepare(self.model, self.tokenizer)
        
        # System prompt for text processing
        self.system_prompt = """
        You are a world class text pre-processor, here is the raw data from a PDF, please parse and return it in a way that is crispy and usable to send to a podcast writer.
        
        Be smart and aggressive with removing details; you're only cleaning up the text without summarizing.
        Here is the text:
        """
    
    def validate_pdf(self):
        """Check if the file exists and is a valid PDF."""
        if not os.path.exists(self.pdf_path):
            print(f"Error: File not found at path: {self.pdf_path}")
            return False
        if not self.pdf_path.lower().endswith('.pdf'):
            print("Error: File is not a PDF")
            return False
        return True

    def extract_text(self):
        """Extract text from the PDF, limited by max_chars."""
        if not self.validate_pdf():
            return None
        
        with open(self.pdf_path, 'rb') as file:
            pdf_reader = PdfReader(file)
            num_pages = len(pdf_reader.pages)
            print(f"Processing PDF with {num_pages} pages...")
            
            extracted_text = []
            total_chars = 0
            
            for page_num in range(num_pages):
                page = pdf_reader.pages[page_num]
                text = page.extract_text() or ""
                
                if total_chars + len(text) > self.max_chars:
                    remaining_chars = self.max_chars - total_chars
                    extracted_text.append(text[:remaining_chars])
                    print(f"Reached {self.max_chars} character limit at page {page_num + 1}")
                    break
                
                extracted_text.append(text)
                total_chars += len(text)
                print(f"Processed page {page_num + 1}/{num_pages}")
            
            final_text = '\n'.join(extracted_text)
            print(f"Extraction complete! Total characters: {len(final_text)}")
            return final_text

    def create_word_bounded_chunks(self, text):
        """Split text into chunks around the target size."""
        words = text.split()
        chunks = []
        current_chunk = []
        current_length = 0
        
        for word in words:
            word_length = len(word) + 1  # +1 for the space
            if current_length + word_length > self.chunk_size and current_chunk:
                chunks.append(' '.join(current_chunk))
                current_chunk = [word]
                current_length = word_length
            else:
                current_chunk.append(word)
                current_length += word_length
        
        if current_chunk:
            chunks.append(' '.join(current_chunk))
        
        return chunks

    def process_chunk(self, text_chunk):
        """Process a text chunk with the model and return the cleaned text."""
        conversation = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": text_chunk}
        ]
        
        prompt = self.tokenizer.apply_chat_template(conversation, tokenize=False)
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        
        with torch.no_grad():
            output = self.model.generate(**inputs, temperature=0.7, top_p=0.9, max_new_tokens=512)
        
        processed_text = self.tokenizer.decode(output[0], skip_special_tokens=True)[len(prompt):].strip()
        return processed_text

    def clean_and_save_text(self):
        """Extract, clean, and save processed text to a file."""
        extracted_text = self.extract_text()
        if not extracted_text:
            return None
        
        chunks = self.create_word_bounded_chunks(extracted_text)
        processed_text = ""
        
        with open(self.output_path, 'w', encoding='utf-8') as out_file:
            for chunk_num, chunk in enumerate(tqdm(chunks, desc="Processing chunks")):
                processed_chunk = self.process_chunk(chunk)
                processed_text += processed_chunk + "\n"
                out_file.write(processed_chunk + "\n")
                out_file.flush()
        
        print(f"\nExtracted and cleaned text has been saved to {self.output_path}")
        return self.output_path