Spaces:
Running
Running
import os | |
import time | |
from _utils.bubble_integrations.obter_arquivo import get_pdf_from_bubble | |
from _utils.handle_files import return_document_list_with_llama_parser | |
from _utils.langchain_utils.splitter_util import ( | |
Splitter_Simple, | |
SplitterUtils, | |
combine_documents_without_losing_pagination, | |
) | |
from setup.easy_imports import ( | |
PyPDFLoader, | |
RecursiveCharacterTextSplitter, | |
Document, | |
Docx2txtLoader, | |
TextLoader, | |
PyMuPDFLoader, | |
) | |
from typing import Any, List, Dict, Tuple, Optional, cast | |
from _utils.models.gerar_documento import ( | |
DocumentChunk, | |
) | |
import uuid | |
import json | |
from _utils.google_integration.google_cloud import ( | |
DOCUMENT_API_ID, | |
GCP_PROJECT, | |
GCP_REGION, | |
GCS_BUCKET_NAME, | |
upload_to_gcs, | |
) | |
from google.cloud import documentai | |
from google.cloud import storage | |
class Splitter: | |
def __init__( | |
self, | |
chunk_size, | |
chunk_overlap, | |
): | |
self.splitter_util = SplitterUtils() | |
self.splitter_simple = Splitter_Simple(chunk_size, chunk_overlap) | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, chunk_overlap=chunk_overlap | |
) | |
self.chunk_metadata = {} # Store chunk metadata for tracing | |
async def load_and_split_document( | |
self, | |
pdf_path: str, | |
should_use_llama_parse: bool, | |
isBubble: bool, | |
): | |
"""Load PDF and split into chunks with metadata""" | |
# loader = PyPDFLoader(pdf_path) | |
# if not pages: | |
# pages = get_pdf_from_bubble( | |
# pdf_path | |
# ) # Gera uma lista de objetos Document, sendo cada item da lista referente a UMA PÁGINA inteira do PDF. | |
chunks_of_string_only: List[str] = [] | |
if isBubble: | |
print("\nPEGANDO PDF DO BUBBLE") | |
pages = await get_pdf_from_bubble(pdf_path, should_use_llama_parse) # type: ignore | |
page_boundaries, combined_text = ( | |
combine_documents_without_losing_pagination(pages) | |
) | |
chunks_of_string_only = ( | |
chunks_of_string_only | |
+ self.splitter_simple.get_chunks_of_string_only_from_list_of_documents( | |
pages | |
) | |
) | |
else: | |
if should_use_llama_parse: | |
print("\nENVIANDO PDFS PARA LLAMA PARSE") | |
pages = await return_document_list_with_llama_parser(pdf_path) | |
page_boundaries, combined_text = ( | |
combine_documents_without_losing_pagination(pages) | |
) | |
chunks_of_string_only = ( | |
chunks_of_string_only + self.text_splitter.split_text(combined_text) | |
) | |
else: | |
print("\nCOMEÇANDO LEITURA DO PDF") | |
file_extension = self.splitter_util.get_file_type(pdf_path) | |
print("file_extension: ", file_extension) | |
if file_extension == "pdf": | |
try: | |
pages = PyPDFLoader(pdf_path).load() | |
except: | |
pages = PyMuPDFLoader(pdf_path).load() | |
elif file_extension == "odt": | |
full_text = self.splitter_util.load_odt_file(pdf_path) | |
pages = self.splitter_simple.load_and_split_text(full_text) | |
elif file_extension == "txt": | |
pages = TextLoader(pdf_path).load() | |
elif file_extension == "doc": | |
# full_text_binary = textract.process(pdf_path) | |
full_text = self.splitter_util.getTextFromDotDoc(pdf_path) | |
pages = self.splitter_simple.load_and_split_text(full_text) | |
else: | |
pages = Docx2txtLoader(pdf_path).load() | |
print("TERMINOU LEITURA DO PDF") | |
print("pages: ", pages) | |
page_boundaries, combined_text = ( | |
combine_documents_without_losing_pagination(pages) | |
) | |
chunks_of_string_only = ( | |
chunks_of_string_only + self.text_splitter.split_text(combined_text) | |
) | |
chunks: List[DocumentChunk] = [] | |
char_count = 0 | |
# for page in pages: | |
# text = page.page_content | |
# page_chunks = self.text_splitter.split_text( | |
# text | |
# ) # Quebra o item que é um Document de UMA PÁGINA inteira em um lista onde cada item é referente a um chunk, que são pedaços menores do que uma página. | |
text_char = 0 | |
print("\nQUEBRANDO PDF EM CHUNKS ORGANIZADOS") | |
for chunk in chunks_of_string_only: | |
chunk_id = str(uuid.uuid4()) | |
start_char = text_char + 1 | |
end_char = start_char + len(chunk) | |
text_char = end_char | |
if should_use_llama_parse: | |
somar_pages = 0 | |
else: | |
somar_pages = 1 | |
page_number = 0 | |
for start, end, page_number in page_boundaries: | |
if start <= start_char < end: | |
page_number = page_number | |
break | |
doc_chunk = DocumentChunk( # Gera o objeto do chunk com informações adicionais, como a posição e id do chunk | |
content=chunk, | |
contextual_summary="", | |
page_number=page_number + somar_pages, # 1-based page numbering | |
chunk_id=chunk_id, | |
start_char=char_count + start_char, | |
end_char=char_count + end_char, | |
) | |
chunks.append(doc_chunk) | |
# Store metadata for later retrieval | |
self.chunk_metadata[chunk_id] = { | |
"page": doc_chunk.page_number, | |
"start_char": doc_chunk.start_char, | |
"end_char": doc_chunk.end_char, | |
} | |
# char_count += len(text) | |
print("TERMINOU DE ORGANIZAR PDFS EM CHUNKS") | |
if len(pages) == 0 or len(chunks) == 0: | |
text = await self.getOCRFromGoogleDocumentAPI(pdf_path) | |
chunks = self.load_and_split_text(text) # type: ignore | |
chunks_of_string_only = [chunk.content for chunk in chunks] | |
return chunks, chunks_of_string_only | |
def load_and_split_text(self, text: str) -> List[DocumentChunk]: | |
"""Load Text and split into chunks with metadata - Criei essa função apenas para o ragas""" | |
page = Document(page_content=text, metadata={"page": 1}) | |
chunks = [] | |
char_count = 0 | |
text = page.page_content | |
page_chunks = self.text_splitter.split_text( | |
text | |
) # Quebra o item que é um Document de UMA PÁGINA inteira em um lista onde cada item é referente a um chunk, que são pedaços menores do que uma página. | |
print("\n\n\npage_chunks: ", page_chunks) | |
for chunk in page_chunks: | |
chunk_id = str(uuid.uuid4()) | |
start_char = text.find( | |
chunk | |
) # Retorna a posição onde se encontra o chunk dentro da página inteira | |
end_char = start_char + len(chunk) | |
doc_chunk = DocumentChunk( # Gera o objeto do chunk com informações adicionais, como a posição e id do chunk | |
content=chunk, | |
page_number=cast(int, page.metadata.get("page")) | |
+ 1, # 1-based page numbering | |
chunk_id=chunk_id, | |
start_char=char_count + start_char, | |
end_char=char_count + end_char, | |
) | |
chunks.append(doc_chunk) | |
# Store metadata for later retrieval | |
self.chunk_metadata[chunk_id] = { | |
"page": doc_chunk.page_number, | |
"start_char": doc_chunk.start_char, | |
"end_char": doc_chunk.end_char, | |
} | |
char_count += len(text) | |
return chunks | |
async def getOCRFromGoogleDocumentAPI(self, pdf_path: str): | |
pdf_gcs_uri = upload_to_gcs(pdf_path) | |
GCS_OUTPUT_PREFIX = "documentai_output/" | |
# GCS_INPUT_URI = f"gs://{GCS_BUCKET_NAME}/{f"gemini_uploads/{pdf_gcs_uri}"}" | |
GCS_INPUT_URI = pdf_gcs_uri | |
GCS_OUTPUT_URI = f"gs://{GCS_BUCKET_NAME}/{GCS_OUTPUT_PREFIX}" | |
docai_client = documentai.DocumentProcessorServiceClient() | |
processor_name = docai_client.processor_path( | |
project=GCP_PROJECT, location="us", processor=DOCUMENT_API_ID | |
) | |
gcs_document = documentai.GcsDocument( | |
gcs_uri=GCS_INPUT_URI, | |
mime_type="application/pdf", # Mime type is specified here for GcsDocument | |
) | |
gcs_documents = documentai.GcsDocuments(documents=[gcs_document]) | |
# 3. Create the BatchDocumentsInputConfig | |
input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents) | |
# Note: If GCS_INPUT_URI was a prefix for multiple files, you'd use GcsPrefix: | |
# gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=GCS_INPUT_URI_PREFIX) | |
# input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix, mime_type="application/pdf") | |
# 4. Create the DocumentOutputConfig | |
# GCS_OUTPUT_URI should be a gs:// URI prefix where the output JSONs will be stored | |
output_config = documentai.DocumentOutputConfig( | |
gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig( | |
gcs_uri=GCS_OUTPUT_URI | |
) | |
) | |
# 5. Construct the BatchProcessRequest | |
request = documentai.BatchProcessRequest( | |
name=processor_name, | |
input_documents=input_config, # Use 'input_documents' | |
document_output_config=output_config, # Use 'document_output_config' | |
) | |
# Submit the batch process request (this is a long-running operation) | |
operation = docai_client.batch_process_documents(request) | |
print("Batch processing operation started. Waiting for completion...") | |
while not operation.done(): | |
time.sleep(15) # Wait for 30 seconds before checking again | |
print("Waiting...") | |
print("Batch processing operation finished.") | |
# --- Download the results from GCS --- | |
storage_client = storage.Client( | |
project=GCP_PROJECT | |
) # Uses GOOGLE_APPLICATION_CREDENTIALS/ADC | |
bucket = storage_client.bucket(GCS_BUCKET_NAME) | |
output_blobs = storage_client.list_blobs( | |
GCS_BUCKET_NAME, prefix=GCS_OUTPUT_PREFIX | |
) | |
downloaded_files_texts = [] | |
try: | |
for blob in output_blobs: | |
# Document AI adds suffixes and subdirectories. Look for the actual JSON output files. | |
# The exact naming depends on the processor and options. Common pattern is ending with .json | |
if blob.name.endswith(".json"): | |
local_download_path = os.path.basename( | |
blob.name | |
) # Download to current directory with blob name | |
print(f"Downloading {blob.name} to {local_download_path}...") | |
blob.download_to_filename(local_download_path) | |
with open(local_download_path, "r", encoding="utf-8") as f: | |
document_data = json.load(f) | |
# The top-level 'text' field contains the concatenated plain text. | |
if "text" in document_data and document_data["text"] is not None: | |
raw_text = document_data["text"] | |
print(f"\n--- Raw Text Extracted from {blob.name} ---") | |
# Print only a snippet or process as needed | |
print( | |
raw_text[:1000] + "..." | |
if len(raw_text) > 1000 | |
else raw_text | |
) | |
print("--------------------------------------------") | |
return raw_text | |
# Optional: Store the text. If you processed a batch of files, | |
# you might want to associate the text with the original file name. | |
# Document AI metadata might link output JSONs back to input files. | |
# For simplicity here, let's just show the extraction. | |
# If you know it was a single input PDF, this is all the text. | |
# If it was multiple, you'd need a mapping or process each JSON. | |
else: | |
print( | |
f"Warning: 'text' field not found in {blob.name} or is empty." | |
) | |
# Optional: Read and print a snippet of the JSON content | |
# with open(local_download_path, 'r', encoding='utf-8') as f: | |
# data = json.load(f) | |
# # Print some extracted text, for example (structure varies by processor) | |
# if 'text' in data: | |
# print(f"Extracted text snippet: {data['text'][:500]}...") # Print first 500 chars | |
# elif 'entities' in data: | |
# print(f"Number of entities found: {len(data['entities'])}") | |
# else: | |
# print("Output JSON structure not immediately recognizable.") | |
# break # Uncomment if you only expect/need to process the first output file | |
if len(downloaded_files_texts) == 0 or not downloaded_files_texts: | |
print("No JSON output files found in the specified output location.") | |
except Exception as e: | |
print(f"Error listing or downloading output files: {e}") | |
print("\nProcess complete.") | |
if downloaded_files_texts: | |
print(f"Downloaded output file(s): {', '.join(downloaded_files_texts)}") | |
print("These files contain the OCR results in JSON format.") | |
else: | |
print("No output files were successfully downloaded.") | |