luanpoppe
feat: adicionando OCR em casos de PDFs com problema
edd5b40
raw
history blame
14.2 kB
import os
import time
from _utils.bubble_integrations.obter_arquivo import get_pdf_from_bubble
from _utils.handle_files import return_document_list_with_llama_parser
from _utils.langchain_utils.splitter_util import (
Splitter_Simple,
SplitterUtils,
combine_documents_without_losing_pagination,
)
from setup.easy_imports import (
PyPDFLoader,
RecursiveCharacterTextSplitter,
Document,
Docx2txtLoader,
TextLoader,
PyMuPDFLoader,
)
from typing import Any, List, Dict, Tuple, Optional, cast
from _utils.models.gerar_documento import (
DocumentChunk,
)
import uuid
import json
from _utils.google_integration.google_cloud import (
DOCUMENT_API_ID,
GCP_PROJECT,
GCP_REGION,
GCS_BUCKET_NAME,
upload_to_gcs,
)
from google.cloud import documentai
from google.cloud import storage
class Splitter:
def __init__(
self,
chunk_size,
chunk_overlap,
):
self.splitter_util = SplitterUtils()
self.splitter_simple = Splitter_Simple(chunk_size, chunk_overlap)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
self.chunk_metadata = {} # Store chunk metadata for tracing
async def load_and_split_document(
self,
pdf_path: str,
should_use_llama_parse: bool,
isBubble: bool,
):
"""Load PDF and split into chunks with metadata"""
# loader = PyPDFLoader(pdf_path)
# if not pages:
# pages = get_pdf_from_bubble(
# pdf_path
# ) # Gera uma lista de objetos Document, sendo cada item da lista referente a UMA PÁGINA inteira do PDF.
chunks_of_string_only: List[str] = []
if isBubble:
print("\nPEGANDO PDF DO BUBBLE")
pages = await get_pdf_from_bubble(pdf_path, should_use_llama_parse) # type: ignore
page_boundaries, combined_text = (
combine_documents_without_losing_pagination(pages)
)
chunks_of_string_only = (
chunks_of_string_only
+ self.splitter_simple.get_chunks_of_string_only_from_list_of_documents(
pages
)
)
else:
if should_use_llama_parse:
print("\nENVIANDO PDFS PARA LLAMA PARSE")
pages = await return_document_list_with_llama_parser(pdf_path)
page_boundaries, combined_text = (
combine_documents_without_losing_pagination(pages)
)
chunks_of_string_only = (
chunks_of_string_only + self.text_splitter.split_text(combined_text)
)
else:
print("\nCOMEÇANDO LEITURA DO PDF")
file_extension = self.splitter_util.get_file_type(pdf_path)
print("file_extension: ", file_extension)
if file_extension == "pdf":
try:
pages = PyPDFLoader(pdf_path).load()
except:
pages = PyMuPDFLoader(pdf_path).load()
elif file_extension == "odt":
full_text = self.splitter_util.load_odt_file(pdf_path)
pages = self.splitter_simple.load_and_split_text(full_text)
elif file_extension == "txt":
pages = TextLoader(pdf_path).load()
elif file_extension == "doc":
# full_text_binary = textract.process(pdf_path)
full_text = self.splitter_util.getTextFromDotDoc(pdf_path)
pages = self.splitter_simple.load_and_split_text(full_text)
else:
pages = Docx2txtLoader(pdf_path).load()
print("TERMINOU LEITURA DO PDF")
print("pages: ", pages)
page_boundaries, combined_text = (
combine_documents_without_losing_pagination(pages)
)
chunks_of_string_only = (
chunks_of_string_only + self.text_splitter.split_text(combined_text)
)
chunks: List[DocumentChunk] = []
char_count = 0
# for page in pages:
# text = page.page_content
# page_chunks = self.text_splitter.split_text(
# text
# ) # Quebra o item que é um Document de UMA PÁGINA inteira em um lista onde cada item é referente a um chunk, que são pedaços menores do que uma página.
text_char = 0
print("\nQUEBRANDO PDF EM CHUNKS ORGANIZADOS")
for chunk in chunks_of_string_only:
chunk_id = str(uuid.uuid4())
start_char = text_char + 1
end_char = start_char + len(chunk)
text_char = end_char
if should_use_llama_parse:
somar_pages = 0
else:
somar_pages = 1
page_number = 0
for start, end, page_number in page_boundaries:
if start <= start_char < end:
page_number = page_number
break
doc_chunk = DocumentChunk( # Gera o objeto do chunk com informações adicionais, como a posição e id do chunk
content=chunk,
contextual_summary="",
page_number=page_number + somar_pages, # 1-based page numbering
chunk_id=chunk_id,
start_char=char_count + start_char,
end_char=char_count + end_char,
)
chunks.append(doc_chunk)
# Store metadata for later retrieval
self.chunk_metadata[chunk_id] = {
"page": doc_chunk.page_number,
"start_char": doc_chunk.start_char,
"end_char": doc_chunk.end_char,
}
# char_count += len(text)
print("TERMINOU DE ORGANIZAR PDFS EM CHUNKS")
if len(pages) == 0 or len(chunks) == 0:
text = await self.getOCRFromGoogleDocumentAPI(pdf_path)
chunks = self.load_and_split_text(text) # type: ignore
chunks_of_string_only = [chunk.content for chunk in chunks]
return chunks, chunks_of_string_only
def load_and_split_text(self, text: str) -> List[DocumentChunk]:
"""Load Text and split into chunks with metadata - Criei essa função apenas para o ragas"""
page = Document(page_content=text, metadata={"page": 1})
chunks = []
char_count = 0
text = page.page_content
page_chunks = self.text_splitter.split_text(
text
) # Quebra o item que é um Document de UMA PÁGINA inteira em um lista onde cada item é referente a um chunk, que são pedaços menores do que uma página.
print("\n\n\npage_chunks: ", page_chunks)
for chunk in page_chunks:
chunk_id = str(uuid.uuid4())
start_char = text.find(
chunk
) # Retorna a posição onde se encontra o chunk dentro da página inteira
end_char = start_char + len(chunk)
doc_chunk = DocumentChunk( # Gera o objeto do chunk com informações adicionais, como a posição e id do chunk
content=chunk,
page_number=cast(int, page.metadata.get("page"))
+ 1, # 1-based page numbering
chunk_id=chunk_id,
start_char=char_count + start_char,
end_char=char_count + end_char,
)
chunks.append(doc_chunk)
# Store metadata for later retrieval
self.chunk_metadata[chunk_id] = {
"page": doc_chunk.page_number,
"start_char": doc_chunk.start_char,
"end_char": doc_chunk.end_char,
}
char_count += len(text)
return chunks
async def getOCRFromGoogleDocumentAPI(self, pdf_path: str):
pdf_gcs_uri = upload_to_gcs(pdf_path)
GCS_OUTPUT_PREFIX = "documentai_output/"
# GCS_INPUT_URI = f"gs://{GCS_BUCKET_NAME}/{f"gemini_uploads/{pdf_gcs_uri}"}"
GCS_INPUT_URI = pdf_gcs_uri
GCS_OUTPUT_URI = f"gs://{GCS_BUCKET_NAME}/{GCS_OUTPUT_PREFIX}"
docai_client = documentai.DocumentProcessorServiceClient()
processor_name = docai_client.processor_path(
project=GCP_PROJECT, location="us", processor=DOCUMENT_API_ID
)
gcs_document = documentai.GcsDocument(
gcs_uri=GCS_INPUT_URI,
mime_type="application/pdf", # Mime type is specified here for GcsDocument
)
gcs_documents = documentai.GcsDocuments(documents=[gcs_document])
# 3. Create the BatchDocumentsInputConfig
input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
# Note: If GCS_INPUT_URI was a prefix for multiple files, you'd use GcsPrefix:
# gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=GCS_INPUT_URI_PREFIX)
# input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix, mime_type="application/pdf")
# 4. Create the DocumentOutputConfig
# GCS_OUTPUT_URI should be a gs:// URI prefix where the output JSONs will be stored
output_config = documentai.DocumentOutputConfig(
gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig(
gcs_uri=GCS_OUTPUT_URI
)
)
# 5. Construct the BatchProcessRequest
request = documentai.BatchProcessRequest(
name=processor_name,
input_documents=input_config, # Use 'input_documents'
document_output_config=output_config, # Use 'document_output_config'
)
# Submit the batch process request (this is a long-running operation)
operation = docai_client.batch_process_documents(request)
print("Batch processing operation started. Waiting for completion...")
while not operation.done():
time.sleep(15) # Wait for 30 seconds before checking again
print("Waiting...")
print("Batch processing operation finished.")
# --- Download the results from GCS ---
storage_client = storage.Client(
project=GCP_PROJECT
) # Uses GOOGLE_APPLICATION_CREDENTIALS/ADC
bucket = storage_client.bucket(GCS_BUCKET_NAME)
output_blobs = storage_client.list_blobs(
GCS_BUCKET_NAME, prefix=GCS_OUTPUT_PREFIX
)
downloaded_files_texts = []
try:
for blob in output_blobs:
# Document AI adds suffixes and subdirectories. Look for the actual JSON output files.
# The exact naming depends on the processor and options. Common pattern is ending with .json
if blob.name.endswith(".json"):
local_download_path = os.path.basename(
blob.name
) # Download to current directory with blob name
print(f"Downloading {blob.name} to {local_download_path}...")
blob.download_to_filename(local_download_path)
with open(local_download_path, "r", encoding="utf-8") as f:
document_data = json.load(f)
# The top-level 'text' field contains the concatenated plain text.
if "text" in document_data and document_data["text"] is not None:
raw_text = document_data["text"]
print(f"\n--- Raw Text Extracted from {blob.name} ---")
# Print only a snippet or process as needed
print(
raw_text[:1000] + "..."
if len(raw_text) > 1000
else raw_text
)
print("--------------------------------------------")
return raw_text
# Optional: Store the text. If you processed a batch of files,
# you might want to associate the text with the original file name.
# Document AI metadata might link output JSONs back to input files.
# For simplicity here, let's just show the extraction.
# If you know it was a single input PDF, this is all the text.
# If it was multiple, you'd need a mapping or process each JSON.
else:
print(
f"Warning: 'text' field not found in {blob.name} or is empty."
)
# Optional: Read and print a snippet of the JSON content
# with open(local_download_path, 'r', encoding='utf-8') as f:
# data = json.load(f)
# # Print some extracted text, for example (structure varies by processor)
# if 'text' in data:
# print(f"Extracted text snippet: {data['text'][:500]}...") # Print first 500 chars
# elif 'entities' in data:
# print(f"Number of entities found: {len(data['entities'])}")
# else:
# print("Output JSON structure not immediately recognizable.")
# break # Uncomment if you only expect/need to process the first output file
if len(downloaded_files_texts) == 0 or not downloaded_files_texts:
print("No JSON output files found in the specified output location.")
except Exception as e:
print(f"Error listing or downloading output files: {e}")
print("\nProcess complete.")
if downloaded_files_texts:
print(f"Downloaded output file(s): {', '.join(downloaded_files_texts)}")
print("These files contain the OCR results in JSON format.")
else:
print("No output files were successfully downloaded.")