Spaces:
Running
Running
from dataclasses import dataclass | |
import os | |
from typing import Any, List, Dict, Literal, Tuple, Optional, Union, cast | |
from pydantic import SecretStr | |
from _utils.langchain_utils.Chain_class import Chain | |
from _utils.langchain_utils.LLM_class import LLM | |
from _utils.langchain_utils.Prompt_class import Prompt | |
from _utils.langchain_utils.Vector_store_class import VectorStore | |
from gerar_documento.serializer import ( | |
GerarDocumentoComPDFProprioSerializerData, | |
GerarDocumentoSerializerData, | |
) | |
from setup.easy_imports import ( | |
Chroma, | |
ChatOpenAI, | |
PromptTemplate, | |
BM25Okapi, | |
Response, | |
HuggingFaceEmbeddings, | |
) | |
import logging | |
from _utils.models.gerar_documento import ( | |
RetrievalConfig, | |
) | |
from cohere import Client | |
from _utils.langchain_utils.Splitter_class import Splitter | |
import time | |
from setup.logging import Axiom | |
def reciprocal_rank_fusion(result_lists, weights=None): | |
"""Combine multiple ranked lists using reciprocal rank fusion""" | |
fused_scores = {} | |
num_lists = len(result_lists) | |
if weights is None: | |
weights = [1.0] * num_lists | |
for i in range(num_lists): | |
for doc_id, score in result_lists[i]: | |
if doc_id not in fused_scores: | |
fused_scores[doc_id] = 0 | |
fused_scores[doc_id] += weights[i] * score | |
# Sort by score in descending order | |
sorted_results = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True) | |
return sorted_results | |
class GerarDocumentoUtils: | |
def criar_output_estruturado(self, summaries: List[str | Any], sources: Any): | |
structured_output = [] | |
for idx, summary in enumerate(summaries): | |
source_idx = min(idx, len(sources) - 1) | |
structured_output.append( | |
{ | |
"content": summary, | |
"source": { | |
"page": sources[source_idx]["page"], | |
"text": sources[source_idx]["content"][:200] + "...", | |
"context": sources[source_idx]["context"], | |
"relevance_score": sources[source_idx]["relevance_score"], | |
"chunk_id": sources[source_idx]["chunk_id"], | |
}, | |
} | |
) | |
return structured_output | |
def ultima_tentativa_requisicao(self, prompt_gerar_documento_formatado): | |
llm = LLM() | |
resposta = llm.open_ai().invoke(prompt_gerar_documento_formatado) | |
documento_gerado = resposta.content.strip() # type: ignore | |
if not documento_gerado: | |
raise Exception( | |
"Falha ao tentar gerar o documento final por 5 tentativas e também ao tentar na última tentativa com o chat-gpt 4o mini." | |
) | |
else: | |
return documento_gerado | |
class GerarDocumento: | |
openai_api_key = os.environ.get("OPENAI_API_KEY", "") | |
cohere_api_key = os.environ.get("COHERE_API_KEY", "") | |
resumo_gerado = "" | |
gerar_documento_utils = GerarDocumentoUtils() | |
def __init__( | |
self, | |
serializer: Union[ | |
GerarDocumentoSerializerData, GerarDocumentoComPDFProprioSerializerData, Any | |
], | |
axiom_instance: Axiom, | |
): | |
self.config = RetrievalConfig( | |
num_chunks=serializer.num_chunks_retrieval, | |
embedding_weight=serializer.embedding_weight, | |
bm25_weight=serializer.bm25_weight, | |
context_window=serializer.context_window, | |
chunk_overlap=serializer.chunk_overlap, | |
) | |
self.logger = logging.getLogger(__name__) | |
# self.prompt_auxiliar = prompt_auxiliar | |
self.gpt_model = serializer.model | |
self.gpt_temperature = serializer.gpt_temperature | |
self.prompt_gerar_documento = serializer.prompt_gerar_documento | |
self.openai_api_key = self.openai_api_key | |
self.cohere_client = Client(self.cohere_api_key) | |
self.embeddings = HuggingFaceEmbeddings(model_name=serializer.hf_embedding) | |
self.num_k_rerank = serializer.num_k_rerank | |
self.model_cohere_rerank = serializer.model_cohere_rerank | |
self.splitter = Splitter(serializer.chunk_size, serializer.chunk_overlap) | |
self.prompt_gerar_documento_etapa_2 = serializer.prompt_gerar_documento_etapa_2 | |
self.prompt_gerar_documento_etapa_3 = serializer.prompt_gerar_documento_etapa_3 | |
self.vector_store = VectorStore(serializer.hf_embedding) | |
self.axiom_instance: Axiom = axiom_instance | |
def retrieve_with_rank_fusion( | |
self, vector_store: Chroma, bm25: BM25Okapi, chunk_ids: List[str], query: str | |
) -> List[Dict]: | |
"""Combine embedding and BM25 retrieval results""" | |
try: | |
# Get embedding results | |
embedding_results = vector_store.similarity_search_with_score( | |
query, k=self.config.num_chunks | |
) | |
# Convert embedding results to list of (chunk_id, score) | |
embedding_list = [ | |
(doc.metadata["chunk_id"], 1 / (1 + score)) | |
for doc, score in embedding_results | |
] | |
# Get BM25 results | |
tokenized_query = query.split() | |
bm25_scores = bm25.get_scores(tokenized_query) | |
# Convert BM25 scores to list of (chunk_id, score) | |
bm25_list = [ | |
(chunk_ids[i], float(score)) for i, score in enumerate(bm25_scores) | |
] | |
# Sort bm25_list by score in descending order and limit to top N results | |
bm25_list = sorted(bm25_list, key=lambda x: x[1], reverse=True)[ | |
: self.config.num_chunks | |
] | |
# Normalize BM25 scores | |
calculo_max = max( | |
[score for _, score in bm25_list] | |
) # Criei este max() pois em alguns momentos estava vindo valores 0, e reclamava que não podia dividir por 0 | |
max_bm25 = calculo_max if bm25_list and calculo_max else 1 | |
bm25_list = [(doc_id, score / max_bm25) for doc_id, score in bm25_list] | |
# Pass the lists to rank fusion | |
result_lists = [embedding_list, bm25_list] | |
weights = [self.config.embedding_weight, self.config.bm25_weight] | |
combined_results = reciprocal_rank_fusion(result_lists, weights=weights) | |
return combined_results # type: ignore | |
except Exception as e: | |
self.logger.error(f"Error in rank fusion retrieval: {str(e)}") | |
raise | |
def rank_fusion_get_top_results( | |
self, | |
vector_store: Chroma, | |
bm25: BM25Okapi, | |
chunk_ids: List[str], | |
query: str = "Summarize the main points of this document", | |
): | |
# Get combined results using rank fusion | |
ranked_results = self.retrieve_with_rank_fusion( | |
vector_store, bm25, chunk_ids, query | |
) | |
# Prepare context and track sources | |
contexts = [] | |
sources = [] | |
# Get full documents for top results | |
for chunk_id, score in ranked_results[: self.config.num_chunks]: | |
results = vector_store.get( | |
where={"chunk_id": chunk_id}, include=["documents", "metadatas"] | |
) | |
if results["documents"]: | |
context = results["documents"][0] | |
metadata = results["metadatas"][0] | |
contexts.append(context) | |
sources.append( | |
{ | |
"content": context, | |
"page": metadata["page"], | |
"chunk_id": chunk_id, | |
"relevance_score": score, | |
"context": metadata.get("context", ""), | |
} | |
) | |
return sources, contexts | |
def select_model_for_last_requests( | |
self, | |
llm_ultimas_requests: Literal[ | |
"gpt-4o-mini", "deepseek-chat", "gemini-2.0-flash", "gemini-2.5-pro" | |
], | |
): | |
llm_instance = LLM() | |
if llm_ultimas_requests == "gpt-4o-mini": | |
llm = ChatOpenAI( | |
temperature=self.gpt_temperature, | |
model=self.gpt_model, | |
api_key=SecretStr(self.openai_api_key), | |
) | |
elif llm_ultimas_requests == "deepseek-chat": | |
llm = llm_instance.deepseek() | |
elif llm_ultimas_requests == "gemini-2.0-flash": | |
llm = llm_instance.google_gemini("gemini-2.0-flash") | |
elif llm_ultimas_requests == "gemini-2.5-pro": | |
llm = llm_instance.google_gemini("gemini-2.5-pro-exp-03-25") | |
return llm | |
async def gerar_documento_final( | |
self, | |
vector_store: Chroma, | |
bm25: BM25Okapi, | |
chunk_ids: List[str], | |
llm_ultimas_requests: str, | |
query: str = "Summarize the main points of this document", | |
) -> List[Dict]: | |
try: | |
sources, contexts = self.rank_fusion_get_top_results( | |
vector_store, bm25, chunk_ids, query | |
) | |
prompt_gerar_documento = PromptTemplate( | |
template=cast(str, self.prompt_gerar_documento), | |
input_variables=["context"], | |
) | |
llm = self.select_model_for_last_requests(llm_ultimas_requests) # type: ignore | |
prompt_instance = Prompt() | |
context_do_prompt_primeira_etapa = "\n\n".join(contexts) | |
prompt_primeira_etapa = prompt_gerar_documento.format( | |
context=context_do_prompt_primeira_etapa, | |
) | |
documento_gerado = await self.checar_se_resposta_vazia_do_documento_final( | |
llm_ultimas_requests, prompt_primeira_etapa | |
) | |
texto_final_juntando_as_etapas = "" | |
resposta_primeira_etapa = documento_gerado | |
texto_final_juntando_as_etapas += resposta_primeira_etapa | |
self.axiom_instance.send_axiom( | |
f"RESULTADO ETAPA 1: {resposta_primeira_etapa}" | |
) | |
if self.prompt_gerar_documento_etapa_2: | |
self.axiom_instance.send_axiom("GERANDO DOCUMENTO - COMEÇANDO ETAPA 2") | |
prompt_etapa_2 = prompt_instance.create_and_invoke_prompt( | |
self.prompt_gerar_documento_etapa_2, | |
dynamic_dict={"context": context_do_prompt_primeira_etapa}, | |
) | |
documento_gerado = llm.invoke(prompt_etapa_2).content | |
resposta_segunda_etapa = documento_gerado | |
texto_final_juntando_as_etapas += ( | |
f"\n\nresposta_segunda_etapa:{resposta_segunda_etapa}" | |
) | |
self.axiom_instance.send_axiom(f"RESULTADO ETAPA 2: {documento_gerado}") | |
if self.prompt_gerar_documento_etapa_3: | |
self.axiom_instance.send_axiom("GERANDO DOCUMENTO - COMEÇANDO ETAPA 3") | |
prompt_etapa_3 = prompt_instance.create_and_invoke_prompt( | |
self.prompt_gerar_documento_etapa_3, | |
dynamic_dict={ | |
"context": f"{resposta_primeira_etapa}\n\n{resposta_segunda_etapa}" | |
}, | |
) | |
documento_gerado = llm.invoke(prompt_etapa_3).content | |
texto_final_juntando_as_etapas += f"\n\n{documento_gerado}" | |
self.axiom_instance.send_axiom(f"RESULTADO ETAPA 3: {documento_gerado}") | |
# Split the response into paragraphs | |
summaries = [ | |
p.strip() for p in texto_final_juntando_as_etapas.split("\n\n") if p.strip() # type: ignore | |
] | |
structured_output = self.gerar_documento_utils.criar_output_estruturado( | |
summaries, sources | |
) | |
return structured_output | |
except Exception as e: | |
self.logger.error(f"Error generating enhanced summary: {str(e)}") | |
raise | |
async def checar_se_resposta_vazia_do_documento_final( | |
self, llm_ultimas_requests: str, prompt: str | |
): | |
llm = self.select_model_for_last_requests(llm_ultimas_requests) # type: ignore | |
documento_gerado = "" | |
tentativas = 0 | |
while tentativas < 5 and not documento_gerado: | |
tentativas += 1 | |
resposta = llm.invoke(prompt) | |
if hasattr(resposta, "content") and resposta.content.strip(): # type: ignore | |
documento_gerado = resposta.content.strip() # type: ignore | |
else: | |
print(f"Tentativa {tentativas}: resposta vazia ou inexistente.") | |
time.sleep(5) | |
if not documento_gerado: | |
self.axiom_instance.send_axiom( | |
"TENTANDO GERAR DOCUMENTO FINAL COM GPT 4o-mini COMO ÚLTIMA TENTATIVA" | |
) | |
documento_gerado = self.gerar_documento_utils.ultima_tentativa_requisicao( | |
prompt | |
) | |
return documento_gerado | |