from dataclasses import dataclass import os from typing import Any, List, Dict, Literal, Tuple, Optional, Union, cast from pydantic import SecretStr from _utils.langchain_utils.Chain_class import Chain from _utils.langchain_utils.LLM_class import LLM from _utils.langchain_utils.Prompt_class import Prompt from _utils.langchain_utils.Vector_store_class import VectorStore from gerar_documento.serializer import ( GerarDocumentoComPDFProprioSerializerData, GerarDocumentoSerializerData, ) from setup.easy_imports import ( Chroma, ChatOpenAI, PromptTemplate, BM25Okapi, Response, HuggingFaceEmbeddings, ) import logging from _utils.models.gerar_documento import ( RetrievalConfig, ) from cohere import Client from _utils.langchain_utils.Splitter_class import Splitter import time from setup.logging import Axiom def reciprocal_rank_fusion(result_lists, weights=None): """Combine multiple ranked lists using reciprocal rank fusion""" fused_scores = {} num_lists = len(result_lists) if weights is None: weights = [1.0] * num_lists for i in range(num_lists): for doc_id, score in result_lists[i]: if doc_id not in fused_scores: fused_scores[doc_id] = 0 fused_scores[doc_id] += weights[i] * score # Sort by score in descending order sorted_results = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True) return sorted_results @dataclass class GerarDocumentoUtils: def criar_output_estruturado(self, summaries: List[str | Any], sources: Any): structured_output = [] for idx, summary in enumerate(summaries): source_idx = min(idx, len(sources) - 1) structured_output.append( { "content": summary, "source": { "page": sources[source_idx]["page"], "text": sources[source_idx]["content"][:200] + "...", "context": sources[source_idx]["context"], "relevance_score": sources[source_idx]["relevance_score"], "chunk_id": sources[source_idx]["chunk_id"], }, } ) return structured_output def ultima_tentativa_requisicao(self, prompt_gerar_documento_formatado): llm = LLM() resposta = llm.open_ai().invoke(prompt_gerar_documento_formatado) documento_gerado = resposta.content.strip() # type: ignore if not documento_gerado: raise Exception( "Falha ao tentar gerar o documento final por 5 tentativas e também ao tentar na última tentativa com o chat-gpt 4o mini." ) else: return documento_gerado class GerarDocumento: openai_api_key = os.environ.get("OPENAI_API_KEY", "") cohere_api_key = os.environ.get("COHERE_API_KEY", "") resumo_gerado = "" gerar_documento_utils = GerarDocumentoUtils() def __init__( self, serializer: Union[ GerarDocumentoSerializerData, GerarDocumentoComPDFProprioSerializerData, Any ], axiom_instance: Axiom, ): self.config = RetrievalConfig( num_chunks=serializer.num_chunks_retrieval, embedding_weight=serializer.embedding_weight, bm25_weight=serializer.bm25_weight, context_window=serializer.context_window, chunk_overlap=serializer.chunk_overlap, ) self.logger = logging.getLogger(__name__) # self.prompt_auxiliar = prompt_auxiliar self.gpt_model = serializer.model self.gpt_temperature = serializer.gpt_temperature self.prompt_gerar_documento = serializer.prompt_gerar_documento self.openai_api_key = self.openai_api_key self.cohere_client = Client(self.cohere_api_key) self.embeddings = HuggingFaceEmbeddings(model_name=serializer.hf_embedding) self.num_k_rerank = serializer.num_k_rerank self.model_cohere_rerank = serializer.model_cohere_rerank self.splitter = Splitter(serializer.chunk_size, serializer.chunk_overlap) self.prompt_gerar_documento_etapa_2 = serializer.prompt_gerar_documento_etapa_2 self.prompt_gerar_documento_etapa_3 = serializer.prompt_gerar_documento_etapa_3 self.vector_store = VectorStore(serializer.hf_embedding) self.axiom_instance: Axiom = axiom_instance def retrieve_with_rank_fusion( self, vector_store: Chroma, bm25: BM25Okapi, chunk_ids: List[str], query: str ) -> List[Dict]: """Combine embedding and BM25 retrieval results""" try: # Get embedding results embedding_results = vector_store.similarity_search_with_score( query, k=self.config.num_chunks ) # Convert embedding results to list of (chunk_id, score) embedding_list = [ (doc.metadata["chunk_id"], 1 / (1 + score)) for doc, score in embedding_results ] # Get BM25 results tokenized_query = query.split() bm25_scores = bm25.get_scores(tokenized_query) # Convert BM25 scores to list of (chunk_id, score) bm25_list = [ (chunk_ids[i], float(score)) for i, score in enumerate(bm25_scores) ] # Sort bm25_list by score in descending order and limit to top N results bm25_list = sorted(bm25_list, key=lambda x: x[1], reverse=True)[ : self.config.num_chunks ] # Normalize BM25 scores calculo_max = max( [score for _, score in bm25_list] ) # Criei este max() pois em alguns momentos estava vindo valores 0, e reclamava que não podia dividir por 0 max_bm25 = calculo_max if bm25_list and calculo_max else 1 bm25_list = [(doc_id, score / max_bm25) for doc_id, score in bm25_list] # Pass the lists to rank fusion result_lists = [embedding_list, bm25_list] weights = [self.config.embedding_weight, self.config.bm25_weight] combined_results = reciprocal_rank_fusion(result_lists, weights=weights) return combined_results # type: ignore except Exception as e: self.logger.error(f"Error in rank fusion retrieval: {str(e)}") raise def rank_fusion_get_top_results( self, vector_store: Chroma, bm25: BM25Okapi, chunk_ids: List[str], query: str = "Summarize the main points of this document", ): # Get combined results using rank fusion ranked_results = self.retrieve_with_rank_fusion( vector_store, bm25, chunk_ids, query ) # Prepare context and track sources contexts = [] sources = [] # Get full documents for top results for chunk_id, score in ranked_results[: self.config.num_chunks]: results = vector_store.get( where={"chunk_id": chunk_id}, include=["documents", "metadatas"] ) if results["documents"]: context = results["documents"][0] metadata = results["metadatas"][0] contexts.append(context) sources.append( { "content": context, "page": metadata["page"], "chunk_id": chunk_id, "relevance_score": score, "context": metadata.get("context", ""), } ) return sources, contexts def select_model_for_last_requests( self, llm_ultimas_requests: Literal[ "gpt-4o-mini", "deepseek-chat", "gemini-2.0-flash", "gemini-2.5-pro" ], ): llm_instance = LLM() if llm_ultimas_requests == "gpt-4o-mini": llm = ChatOpenAI( temperature=self.gpt_temperature, model=self.gpt_model, api_key=SecretStr(self.openai_api_key), ) elif llm_ultimas_requests == "deepseek-chat": llm = llm_instance.deepseek() elif llm_ultimas_requests == "gemini-2.0-flash": llm = llm_instance.google_gemini("gemini-2.0-flash") elif llm_ultimas_requests == "gemini-2.5-pro": llm = llm_instance.google_gemini("gemini-2.5-pro-exp-03-25") return llm async def gerar_documento_final( self, vector_store: Chroma, bm25: BM25Okapi, chunk_ids: List[str], llm_ultimas_requests: str, query: str = "Summarize the main points of this document", ) -> List[Dict]: try: sources, contexts = self.rank_fusion_get_top_results( vector_store, bm25, chunk_ids, query ) prompt_gerar_documento = PromptTemplate( template=cast(str, self.prompt_gerar_documento), input_variables=["context"], ) llm = self.select_model_for_last_requests(llm_ultimas_requests) # type: ignore prompt_instance = Prompt() context_do_prompt_primeira_etapa = "\n\n".join(contexts) prompt_primeira_etapa = prompt_gerar_documento.format( context=context_do_prompt_primeira_etapa, ) documento_gerado = await self.checar_se_resposta_vazia_do_documento_final( llm_ultimas_requests, prompt_primeira_etapa ) texto_final_juntando_as_etapas = "" resposta_primeira_etapa = documento_gerado texto_final_juntando_as_etapas += resposta_primeira_etapa self.axiom_instance.send_axiom( f"RESULTADO ETAPA 1: {resposta_primeira_etapa}" ) if self.prompt_gerar_documento_etapa_2: self.axiom_instance.send_axiom("GERANDO DOCUMENTO - COMEÇANDO ETAPA 2") prompt_etapa_2 = prompt_instance.create_and_invoke_prompt( self.prompt_gerar_documento_etapa_2, dynamic_dict={"context": context_do_prompt_primeira_etapa}, ) documento_gerado = llm.invoke(prompt_etapa_2).content resposta_segunda_etapa = documento_gerado texto_final_juntando_as_etapas += ( f"\n\nresposta_segunda_etapa:{resposta_segunda_etapa}" ) self.axiom_instance.send_axiom(f"RESULTADO ETAPA 2: {documento_gerado}") if self.prompt_gerar_documento_etapa_3: self.axiom_instance.send_axiom("GERANDO DOCUMENTO - COMEÇANDO ETAPA 3") prompt_etapa_3 = prompt_instance.create_and_invoke_prompt( self.prompt_gerar_documento_etapa_3, dynamic_dict={ "context": f"{resposta_primeira_etapa}\n\n{resposta_segunda_etapa}" }, ) documento_gerado = llm.invoke(prompt_etapa_3).content texto_final_juntando_as_etapas += f"\n\n{documento_gerado}" self.axiom_instance.send_axiom(f"RESULTADO ETAPA 3: {documento_gerado}") # Split the response into paragraphs summaries = [ p.strip() for p in texto_final_juntando_as_etapas.split("\n\n") if p.strip() # type: ignore ] structured_output = self.gerar_documento_utils.criar_output_estruturado( summaries, sources ) return structured_output except Exception as e: self.logger.error(f"Error generating enhanced summary: {str(e)}") raise async def checar_se_resposta_vazia_do_documento_final( self, llm_ultimas_requests: str, prompt: str ): llm = self.select_model_for_last_requests(llm_ultimas_requests) # type: ignore documento_gerado = "" tentativas = 0 while tentativas < 5 and not documento_gerado: tentativas += 1 resposta = llm.invoke(prompt) if hasattr(resposta, "content") and resposta.content.strip(): # type: ignore documento_gerado = resposta.content.strip() # type: ignore else: print(f"Tentativa {tentativas}: resposta vazia ou inexistente.") time.sleep(5) if not documento_gerado: self.axiom_instance.send_axiom( "TENTANDO GERAR DOCUMENTO FINAL COM GPT 4o-mini COMO ÚLTIMA TENTATIVA" ) documento_gerado = self.gerar_documento_utils.ultima_tentativa_requisicao( prompt ) return documento_gerado