luanpoppe
feat: renomeando arquivos
588b95c
raw
history blame
12.5 kB
from dataclasses import dataclass
import os
from typing import Any, List, Dict, Literal, Tuple, Optional, Union, cast
from pydantic import SecretStr
from _utils.langchain_utils.Chain_class import Chain
from _utils.langchain_utils.LLM_class import LLM
from _utils.langchain_utils.Prompt_class import Prompt
from _utils.langchain_utils.Vector_store_class import VectorStore
from gerar_documento.serializer import (
GerarDocumentoComPDFProprioSerializerData,
GerarDocumentoSerializerData,
)
from setup.easy_imports import (
Chroma,
ChatOpenAI,
PromptTemplate,
BM25Okapi,
Response,
HuggingFaceEmbeddings,
)
import logging
from _utils.models.gerar_documento import (
RetrievalConfig,
)
from cohere import Client
from _utils.langchain_utils.Splitter_class import Splitter
import time
from setup.logging import Axiom
def reciprocal_rank_fusion(result_lists, weights=None):
"""Combine multiple ranked lists using reciprocal rank fusion"""
fused_scores = {}
num_lists = len(result_lists)
if weights is None:
weights = [1.0] * num_lists
for i in range(num_lists):
for doc_id, score in result_lists[i]:
if doc_id not in fused_scores:
fused_scores[doc_id] = 0
fused_scores[doc_id] += weights[i] * score
# Sort by score in descending order
sorted_results = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_results
@dataclass
class GerarDocumentoUtils:
def criar_output_estruturado(self, summaries: List[str | Any], sources: Any):
structured_output = []
for idx, summary in enumerate(summaries):
source_idx = min(idx, len(sources) - 1)
structured_output.append(
{
"content": summary,
"source": {
"page": sources[source_idx]["page"],
"text": sources[source_idx]["content"][:200] + "...",
"context": sources[source_idx]["context"],
"relevance_score": sources[source_idx]["relevance_score"],
"chunk_id": sources[source_idx]["chunk_id"],
},
}
)
return structured_output
def ultima_tentativa_requisicao(self, prompt_gerar_documento_formatado):
llm = LLM()
resposta = llm.open_ai().invoke(prompt_gerar_documento_formatado)
documento_gerado = resposta.content.strip() # type: ignore
if not documento_gerado:
raise Exception(
"Falha ao tentar gerar o documento final por 5 tentativas e também ao tentar na última tentativa com o chat-gpt 4o mini."
)
else:
return documento_gerado
class GerarDocumento:
openai_api_key = os.environ.get("OPENAI_API_KEY", "")
cohere_api_key = os.environ.get("COHERE_API_KEY", "")
resumo_gerado = ""
gerar_documento_utils = GerarDocumentoUtils()
def __init__(
self,
serializer: Union[
GerarDocumentoSerializerData, GerarDocumentoComPDFProprioSerializerData, Any
],
axiom_instance: Axiom,
):
self.config = RetrievalConfig(
num_chunks=serializer.num_chunks_retrieval,
embedding_weight=serializer.embedding_weight,
bm25_weight=serializer.bm25_weight,
context_window=serializer.context_window,
chunk_overlap=serializer.chunk_overlap,
)
self.logger = logging.getLogger(__name__)
# self.prompt_auxiliar = prompt_auxiliar
self.gpt_model = serializer.model
self.gpt_temperature = serializer.gpt_temperature
self.prompt_gerar_documento = serializer.prompt_gerar_documento
self.openai_api_key = self.openai_api_key
self.cohere_client = Client(self.cohere_api_key)
self.embeddings = HuggingFaceEmbeddings(model_name=serializer.hf_embedding)
self.num_k_rerank = serializer.num_k_rerank
self.model_cohere_rerank = serializer.model_cohere_rerank
self.splitter = Splitter(serializer.chunk_size, serializer.chunk_overlap)
self.prompt_gerar_documento_etapa_2 = serializer.prompt_gerar_documento_etapa_2
self.prompt_gerar_documento_etapa_3 = serializer.prompt_gerar_documento_etapa_3
self.vector_store = VectorStore(serializer.hf_embedding)
self.axiom_instance: Axiom = axiom_instance
def retrieve_with_rank_fusion(
self, vector_store: Chroma, bm25: BM25Okapi, chunk_ids: List[str], query: str
) -> List[Dict]:
"""Combine embedding and BM25 retrieval results"""
try:
# Get embedding results
embedding_results = vector_store.similarity_search_with_score(
query, k=self.config.num_chunks
)
# Convert embedding results to list of (chunk_id, score)
embedding_list = [
(doc.metadata["chunk_id"], 1 / (1 + score))
for doc, score in embedding_results
]
# Get BM25 results
tokenized_query = query.split()
bm25_scores = bm25.get_scores(tokenized_query)
# Convert BM25 scores to list of (chunk_id, score)
bm25_list = [
(chunk_ids[i], float(score)) for i, score in enumerate(bm25_scores)
]
# Sort bm25_list by score in descending order and limit to top N results
bm25_list = sorted(bm25_list, key=lambda x: x[1], reverse=True)[
: self.config.num_chunks
]
# Normalize BM25 scores
calculo_max = max(
[score for _, score in bm25_list]
) # Criei este max() pois em alguns momentos estava vindo valores 0, e reclamava que não podia dividir por 0
max_bm25 = calculo_max if bm25_list and calculo_max else 1
bm25_list = [(doc_id, score / max_bm25) for doc_id, score in bm25_list]
# Pass the lists to rank fusion
result_lists = [embedding_list, bm25_list]
weights = [self.config.embedding_weight, self.config.bm25_weight]
combined_results = reciprocal_rank_fusion(result_lists, weights=weights)
return combined_results # type: ignore
except Exception as e:
self.logger.error(f"Error in rank fusion retrieval: {str(e)}")
raise
def rank_fusion_get_top_results(
self,
vector_store: Chroma,
bm25: BM25Okapi,
chunk_ids: List[str],
query: str = "Summarize the main points of this document",
):
# Get combined results using rank fusion
ranked_results = self.retrieve_with_rank_fusion(
vector_store, bm25, chunk_ids, query
)
# Prepare context and track sources
contexts = []
sources = []
# Get full documents for top results
for chunk_id, score in ranked_results[: self.config.num_chunks]:
results = vector_store.get(
where={"chunk_id": chunk_id}, include=["documents", "metadatas"]
)
if results["documents"]:
context = results["documents"][0]
metadata = results["metadatas"][0]
contexts.append(context)
sources.append(
{
"content": context,
"page": metadata["page"],
"chunk_id": chunk_id,
"relevance_score": score,
"context": metadata.get("context", ""),
}
)
return sources, contexts
def select_model_for_last_requests(
self,
llm_ultimas_requests: Literal[
"gpt-4o-mini", "deepseek-chat", "gemini-2.0-flash"
],
):
llm_instance = LLM()
if llm_ultimas_requests == "gpt-4o-mini":
llm = ChatOpenAI(
temperature=self.gpt_temperature,
model=self.gpt_model,
api_key=SecretStr(self.openai_api_key),
)
elif llm_ultimas_requests == "deepseek-chat":
llm = llm_instance.deepseek()
elif llm_ultimas_requests == "gemini-2.0-flash":
llm = llm_instance.google_gemini("gemini-2.0-flash")
return llm
async def gerar_documento_final(
self,
vector_store: Chroma,
bm25: BM25Okapi,
chunk_ids: List[str],
llm_ultimas_requests: str,
query: str = "Summarize the main points of this document",
) -> List[Dict]:
try:
sources, contexts = self.rank_fusion_get_top_results(
vector_store, bm25, chunk_ids, query
)
prompt_gerar_documento = PromptTemplate(
template=cast(str, self.prompt_gerar_documento),
input_variables=["context"],
)
llm = self.select_model_for_last_requests(llm_ultimas_requests) # type: ignore
prompt_instance = Prompt()
documento_gerado = ""
tentativas = 0
context_do_prompt_primeira_etapa = "\n\n".join(contexts)
prompt_primeira_etapa = prompt_gerar_documento.format(
context=context_do_prompt_primeira_etapa,
)
while tentativas < 5 and not documento_gerado:
tentativas += 1
resposta = llm.invoke(prompt_primeira_etapa)
if hasattr(resposta, "content") and resposta.content.strip(): # type: ignore
documento_gerado = resposta.content.strip() # type: ignore
else:
print(f"Tentativa {tentativas}: resposta vazia ou inexistente.")
time.sleep(5)
if not documento_gerado:
self.axiom_instance.send_axiom(
"TENTANDO GERAR DOCUMENTO FINAL COM GPT 4o-mini COMO ÚLTIMA TENTATIVA"
)
documento_gerado = (
self.gerar_documento_utils.ultima_tentativa_requisicao(
prompt_primeira_etapa
)
)
texto_final_juntando_as_etapas = ""
resposta_primeira_etapa = documento_gerado
texto_final_juntando_as_etapas += resposta_primeira_etapa
self.axiom_instance.send_axiom(
f"RESULTADO ETAPA 1: {resposta_primeira_etapa}"
)
if self.prompt_gerar_documento_etapa_2:
self.axiom_instance.send_axiom("GERANDO DOCUMENTO - COMEÇANDO ETAPA 2")
prompt_etapa_2 = prompt_instance.create_and_invoke_prompt(
self.prompt_gerar_documento_etapa_2,
dynamic_dict={"context": context_do_prompt_primeira_etapa},
)
documento_gerado = llm.invoke(prompt_etapa_2).content
resposta_segunda_etapa = documento_gerado
texto_final_juntando_as_etapas += (
f"\n\nresposta_segunda_etapa:{resposta_segunda_etapa}"
)
self.axiom_instance.send_axiom(f"RESULTADO ETAPA 2: {documento_gerado}")
if self.prompt_gerar_documento_etapa_3:
self.axiom_instance.send_axiom("GERANDO DOCUMENTO - COMEÇANDO ETAPA 3")
prompt_etapa_3 = prompt_instance.create_and_invoke_prompt(
self.prompt_gerar_documento_etapa_3,
dynamic_dict={
"context": f"{resposta_primeira_etapa}\n\n{resposta_segunda_etapa}"
},
)
documento_gerado = llm.invoke(prompt_etapa_3).content
texto_final_juntando_as_etapas += f"\n\n{documento_gerado}"
self.axiom_instance.send_axiom(f"RESULTADO ETAPA 3: {documento_gerado}")
# Split the response into paragraphs
summaries = [
p.strip() for p in texto_final_juntando_as_etapas.split("\n\n") if p.strip() # type: ignore
]
structured_output = self.gerar_documento_utils.criar_output_estruturado(
summaries, sources
)
return structured_output
except Exception as e:
self.logger.error(f"Error generating enhanced summary: {str(e)}")
raise