import os import time from _utils.bubble_integrations.obter_arquivo import get_pdf_from_bubble from _utils.handle_files import return_document_list_with_llama_parser from _utils.langchain_utils.splitter_util import ( Splitter_Simple, SplitterUtils, combine_documents_without_losing_pagination, ) from setup.easy_imports import ( PyPDFLoader, RecursiveCharacterTextSplitter, Document, Docx2txtLoader, TextLoader, PyMuPDFLoader, ) from typing import Any, List, Dict, Tuple, Optional, cast from _utils.models.gerar_documento import ( DocumentChunk, ) import uuid import json from _utils.google_integration.google_cloud import ( DOCUMENT_API_ID, GCP_PROJECT, GCP_REGION, GCS_BUCKET_NAME, upload_to_gcs, ) from google.cloud import documentai from google.cloud import storage class Splitter: def __init__( self, chunk_size, chunk_overlap, ): self.splitter_util = SplitterUtils() self.splitter_simple = Splitter_Simple(chunk_size, chunk_overlap) self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap ) self.chunk_metadata = {} # Store chunk metadata for tracing async def load_and_split_document( self, pdf_path: str, should_use_llama_parse: bool, isBubble: bool, ): """Load PDF and split into chunks with metadata""" # loader = PyPDFLoader(pdf_path) # if not pages: # pages = get_pdf_from_bubble( # pdf_path # ) # Gera uma lista de objetos Document, sendo cada item da lista referente a UMA PÁGINA inteira do PDF. chunks_of_string_only: List[str] = [] if isBubble: print("\nPEGANDO PDF DO BUBBLE") pages = await get_pdf_from_bubble(pdf_path, should_use_llama_parse) # type: ignore page_boundaries, combined_text = ( combine_documents_without_losing_pagination(pages) ) chunks_of_string_only = ( chunks_of_string_only + self.splitter_simple.get_chunks_of_string_only_from_list_of_documents( pages ) ) else: if should_use_llama_parse: print("\nENVIANDO PDFS PARA LLAMA PARSE") pages = await return_document_list_with_llama_parser(pdf_path) page_boundaries, combined_text = ( combine_documents_without_losing_pagination(pages) ) chunks_of_string_only = ( chunks_of_string_only + self.text_splitter.split_text(combined_text) ) else: print("\nCOMEÇANDO LEITURA DO PDF") file_extension = self.splitter_util.get_file_type(pdf_path) print("file_extension: ", file_extension) if file_extension == "pdf": try: pages = PyPDFLoader(pdf_path).load() except: pages = PyMuPDFLoader(pdf_path).load() elif file_extension == "odt": full_text = self.splitter_util.load_odt_file(pdf_path) pages = self.splitter_simple.load_and_split_text(full_text) elif file_extension == "txt": pages = TextLoader(pdf_path).load() elif file_extension == "doc": # full_text_binary = textract.process(pdf_path) full_text = self.splitter_util.getTextFromDotDoc(pdf_path) pages = self.splitter_simple.load_and_split_text(full_text) else: pages = Docx2txtLoader(pdf_path).load() print("TERMINOU LEITURA DO PDF") print("pages: ", pages) page_boundaries, combined_text = ( combine_documents_without_losing_pagination(pages) ) chunks_of_string_only = ( chunks_of_string_only + self.text_splitter.split_text(combined_text) ) chunks: List[DocumentChunk] = [] char_count = 0 # for page in pages: # text = page.page_content # page_chunks = self.text_splitter.split_text( # text # ) # Quebra o item que é um Document de UMA PÁGINA inteira em um lista onde cada item é referente a um chunk, que são pedaços menores do que uma página. text_char = 0 print("\nQUEBRANDO PDF EM CHUNKS ORGANIZADOS") for chunk in chunks_of_string_only: chunk_id = str(uuid.uuid4()) start_char = text_char + 1 end_char = start_char + len(chunk) text_char = end_char if should_use_llama_parse: somar_pages = 0 else: somar_pages = 1 page_number = 0 for start, end, page_number in page_boundaries: if start <= start_char < end: page_number = page_number break doc_chunk = DocumentChunk( # Gera o objeto do chunk com informações adicionais, como a posição e id do chunk content=chunk, contextual_summary="", page_number=page_number + somar_pages, # 1-based page numbering chunk_id=chunk_id, start_char=char_count + start_char, end_char=char_count + end_char, ) chunks.append(doc_chunk) # Store metadata for later retrieval self.chunk_metadata[chunk_id] = { "page": doc_chunk.page_number, "start_char": doc_chunk.start_char, "end_char": doc_chunk.end_char, } # char_count += len(text) print("TERMINOU DE ORGANIZAR PDFS EM CHUNKS") if len(pages) == 0 or len(chunks) == 0: text = await self.getOCRFromGoogleDocumentAPI(pdf_path) chunks = self.load_and_split_text(text) # type: ignore chunks_of_string_only = [chunk.content for chunk in chunks] return chunks, chunks_of_string_only def load_and_split_text(self, text: str) -> List[DocumentChunk]: """Load Text and split into chunks with metadata - Criei essa função apenas para o ragas""" page = Document(page_content=text, metadata={"page": 1}) chunks = [] char_count = 0 text = page.page_content page_chunks = self.text_splitter.split_text( text ) # Quebra o item que é um Document de UMA PÁGINA inteira em um lista onde cada item é referente a um chunk, que são pedaços menores do que uma página. print("\n\n\npage_chunks: ", page_chunks) for chunk in page_chunks: chunk_id = str(uuid.uuid4()) start_char = text.find( chunk ) # Retorna a posição onde se encontra o chunk dentro da página inteira end_char = start_char + len(chunk) doc_chunk = DocumentChunk( # Gera o objeto do chunk com informações adicionais, como a posição e id do chunk content=chunk, page_number=cast(int, page.metadata.get("page")) + 1, # 1-based page numbering chunk_id=chunk_id, start_char=char_count + start_char, end_char=char_count + end_char, ) chunks.append(doc_chunk) # Store metadata for later retrieval self.chunk_metadata[chunk_id] = { "page": doc_chunk.page_number, "start_char": doc_chunk.start_char, "end_char": doc_chunk.end_char, } char_count += len(text) return chunks async def getOCRFromGoogleDocumentAPI(self, pdf_path: str): pdf_gcs_uri = upload_to_gcs(pdf_path) GCS_OUTPUT_PREFIX = "documentai_output/" # GCS_INPUT_URI = f"gs://{GCS_BUCKET_NAME}/{f"gemini_uploads/{pdf_gcs_uri}"}" GCS_INPUT_URI = pdf_gcs_uri GCS_OUTPUT_URI = f"gs://{GCS_BUCKET_NAME}/{GCS_OUTPUT_PREFIX}" docai_client = documentai.DocumentProcessorServiceClient() processor_name = docai_client.processor_path( project=GCP_PROJECT, location="us", processor=DOCUMENT_API_ID ) gcs_document = documentai.GcsDocument( gcs_uri=GCS_INPUT_URI, mime_type="application/pdf", # Mime type is specified here for GcsDocument ) gcs_documents = documentai.GcsDocuments(documents=[gcs_document]) # 3. Create the BatchDocumentsInputConfig input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents) # Note: If GCS_INPUT_URI was a prefix for multiple files, you'd use GcsPrefix: # gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=GCS_INPUT_URI_PREFIX) # input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix, mime_type="application/pdf") # 4. Create the DocumentOutputConfig # GCS_OUTPUT_URI should be a gs:// URI prefix where the output JSONs will be stored output_config = documentai.DocumentOutputConfig( gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig( gcs_uri=GCS_OUTPUT_URI ) ) # 5. Construct the BatchProcessRequest request = documentai.BatchProcessRequest( name=processor_name, input_documents=input_config, # Use 'input_documents' document_output_config=output_config, # Use 'document_output_config' ) # Submit the batch process request (this is a long-running operation) operation = docai_client.batch_process_documents(request) print("Batch processing operation started. Waiting for completion...") while not operation.done(): time.sleep(15) # Wait for 30 seconds before checking again print("Waiting...") print("Batch processing operation finished.") # --- Download the results from GCS --- storage_client = storage.Client( project=GCP_PROJECT ) # Uses GOOGLE_APPLICATION_CREDENTIALS/ADC bucket = storage_client.bucket(GCS_BUCKET_NAME) output_blobs = storage_client.list_blobs( GCS_BUCKET_NAME, prefix=GCS_OUTPUT_PREFIX ) downloaded_files_texts = [] try: for blob in output_blobs: # Document AI adds suffixes and subdirectories. Look for the actual JSON output files. # The exact naming depends on the processor and options. Common pattern is ending with .json if blob.name.endswith(".json"): local_download_path = os.path.basename( blob.name ) # Download to current directory with blob name print(f"Downloading {blob.name} to {local_download_path}...") blob.download_to_filename(local_download_path) with open(local_download_path, "r", encoding="utf-8") as f: document_data = json.load(f) # The top-level 'text' field contains the concatenated plain text. if "text" in document_data and document_data["text"] is not None: raw_text = document_data["text"] print(f"\n--- Raw Text Extracted from {blob.name} ---") # Print only a snippet or process as needed print( raw_text[:1000] + "..." if len(raw_text) > 1000 else raw_text ) print("--------------------------------------------") return raw_text # Optional: Store the text. If you processed a batch of files, # you might want to associate the text with the original file name. # Document AI metadata might link output JSONs back to input files. # For simplicity here, let's just show the extraction. # If you know it was a single input PDF, this is all the text. # If it was multiple, you'd need a mapping or process each JSON. else: print( f"Warning: 'text' field not found in {blob.name} or is empty." ) # Optional: Read and print a snippet of the JSON content # with open(local_download_path, 'r', encoding='utf-8') as f: # data = json.load(f) # # Print some extracted text, for example (structure varies by processor) # if 'text' in data: # print(f"Extracted text snippet: {data['text'][:500]}...") # Print first 500 chars # elif 'entities' in data: # print(f"Number of entities found: {len(data['entities'])}") # else: # print("Output JSON structure not immediately recognizable.") # break # Uncomment if you only expect/need to process the first output file if len(downloaded_files_texts) == 0 or not downloaded_files_texts: print("No JSON output files found in the specified output location.") except Exception as e: print(f"Error listing or downloading output files: {e}") print("\nProcess complete.") if downloaded_files_texts: print(f"Downloaded output file(s): {', '.join(downloaded_files_texts)}") print("These files contain the OCR results in JSON format.") else: print("No output files were successfully downloaded.")