Spaces:
Running
Running
import os | |
from langchain_core.messages import HumanMessage | |
from typing import Any, Union, cast | |
from _utils.Utils_Class import UtilsClass | |
from _utils.axiom_logs import AxiomLogs | |
from _utils.langchain_utils.LLM_class import LLM | |
from _utils.bubble_integrations.enviar_resposta_final import enviar_resposta_final | |
from _utils.custom_exception_handler import custom_exception_handler_without_api_handler | |
from rest_framework.response import Response | |
from _utils.gerar_documento_utils.GerarDocumento import ( | |
GerarDocumento, | |
) | |
from _utils.gerar_documento_utils.contextual_retriever import ( | |
ContextualRetriever, | |
) | |
from _utils.gerar_documento_utils.utils import ( | |
generate_document_title, | |
gerar_resposta_compilada, | |
get_response_from_auxiliar_contextual_prompt, | |
) | |
from _utils.models.gerar_documento import ( | |
RetrievalConfig, | |
) | |
import markdown | |
from _utils.langchain_utils.Prompt_class import Prompt | |
from _utils.utils import convert_markdown_to_HTML | |
from gerar_documento.serializer import ( | |
GerarDocumentoComPDFProprioSerializer, | |
GerarDocumentoComPDFProprioSerializerData, | |
GerarDocumentoSerializerData, | |
) | |
from setup.logging import Axiom | |
os.environ["LANGCHAIN_TRACING_V2"] = "true" | |
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com" | |
os.environ.get("LANGCHAIN_API_KEY") | |
os.environ["LANGCHAIN_PROJECT"] = "VELLA" | |
async def gerar_documento( | |
serializer: Union[ | |
GerarDocumentoSerializerData, GerarDocumentoComPDFProprioSerializerData, Any | |
], | |
listaPDFs, | |
axiom_instance: Axiom, | |
isBubble=False, | |
): | |
try: | |
axiom = axiom_instance.send_axiom | |
ax = AxiomLogs(axiom_instance) | |
utils = UtilsClass() | |
summarizer = GerarDocumento(serializer, isBubble, axiom_instance) | |
all_PDFs_chunks, full_text_as_array = await summarizer.get_text_and_pdf_chunks() | |
is_contextualized_chunk = serializer.should_have_contextual_chunks | |
response_auxiliar_summary = await get_response_from_auxiliar_contextual_prompt( | |
full_text_as_array | |
) | |
summarizer.resumo_auxiliar = response_auxiliar_summary | |
ax.resumo_inicial_processo(response_auxiliar_summary) | |
await summarizer.generate_chunks_processados() | |
await summarizer.generate_query_for_vector_store() | |
await summarizer.create_enhanced_vector_store() | |
structured_summaries = await summarizer.do_last_requests() | |
if not isinstance(structured_summaries, list): | |
return Response({"erro": structured_summaries}) | |
await summarizer.generate_complete_text() | |
await summarizer.get_document_title() | |
if isBubble: | |
await summarizer.send_to_bubble() | |
return { | |
"texto_completo": summarizer.texto_completo_como_html, | |
"titulo_do_documento": summarizer.titulo_do_documento, | |
"resultado": structured_summaries, | |
"parametros-utilizados": gerar_resposta_compilada(serializer), | |
} | |
except Exception as e: | |
custom_exception_handler_without_api_handler(e, serializer, axiom_instance) | |
raise | |