vella-backend / _utils /gerar_documento.py
luanpoppe
refactor: melhorando a função gerar_documento e a classe GerarDocumento
f9a1a18
raw
history blame
3.11 kB
import os
from langchain_core.messages import HumanMessage
from typing import Any, Union, cast
from _utils.Utils_Class import UtilsClass
from _utils.axiom_logs import AxiomLogs
from _utils.langchain_utils.LLM_class import LLM
from _utils.bubble_integrations.enviar_resposta_final import enviar_resposta_final
from _utils.custom_exception_handler import custom_exception_handler_without_api_handler
from rest_framework.response import Response
from _utils.gerar_documento_utils.GerarDocumento import (
GerarDocumento,
)
from _utils.gerar_documento_utils.contextual_retriever import (
ContextualRetriever,
)
from _utils.gerar_documento_utils.utils import (
generate_document_title,
gerar_resposta_compilada,
get_response_from_auxiliar_contextual_prompt,
)
from _utils.models.gerar_documento import (
RetrievalConfig,
)
import markdown
from _utils.langchain_utils.Prompt_class import Prompt
from _utils.utils import convert_markdown_to_HTML
from gerar_documento.serializer import (
GerarDocumentoComPDFProprioSerializer,
GerarDocumentoComPDFProprioSerializerData,
GerarDocumentoSerializerData,
)
from setup.logging import Axiom
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ.get("LANGCHAIN_API_KEY")
os.environ["LANGCHAIN_PROJECT"] = "VELLA"
async def gerar_documento(
serializer: Union[
GerarDocumentoSerializerData, GerarDocumentoComPDFProprioSerializerData, Any
],
listaPDFs,
axiom_instance: Axiom,
isBubble=False,
):
try:
axiom = axiom_instance.send_axiom
ax = AxiomLogs(axiom_instance)
utils = UtilsClass()
summarizer = GerarDocumento(serializer, isBubble, axiom_instance)
all_PDFs_chunks, full_text_as_array = await summarizer.get_text_and_pdf_chunks()
is_contextualized_chunk = serializer.should_have_contextual_chunks
response_auxiliar_summary = await get_response_from_auxiliar_contextual_prompt(
full_text_as_array
)
summarizer.resumo_auxiliar = response_auxiliar_summary
ax.resumo_inicial_processo(response_auxiliar_summary)
await summarizer.generate_chunks_processados()
await summarizer.generate_query_for_vector_store()
await summarizer.create_enhanced_vector_store()
structured_summaries = await summarizer.do_last_requests()
if not isinstance(structured_summaries, list):
return Response({"erro": structured_summaries})
await summarizer.generate_complete_text()
await summarizer.get_document_title()
if isBubble:
await summarizer.send_to_bubble()
return {
"texto_completo": summarizer.texto_completo_como_html,
"titulo_do_documento": summarizer.titulo_do_documento,
"resultado": structured_summaries,
"parametros-utilizados": gerar_resposta_compilada(serializer),
}
except Exception as e:
custom_exception_handler_without_api_handler(e, serializer, axiom_instance)
raise