Spaces:
Runtime error
Runtime error
# Created by Leandro Carneiro at 19/01/2024 | |
# Description: | |
# ------------------------------------------------ | |
#from langchain.embeddings import OpenAIEmbeddings | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.document_loaders import DirectoryLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.prompts import PromptTemplate | |
from langchain_openai import ChatOpenAI | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import ConversationalRetrievalChain | |
import os | |
import csv | |
def read_csv_to_dict(filename): | |
data_dict = {} | |
with open(filename, mode='r', encoding='utf-8') as file: | |
csv_reader = csv.reader(file) | |
for row in csv_reader: | |
key, value = row[0].split(';') | |
data_dict[key] = value | |
return data_dict | |
def generate_embeddings_and_vectorstore(path): | |
try: | |
loader = DirectoryLoader(path=path, glob="**/*.txt") | |
corpus = loader.load() | |
print(f' Total de documentos antes do text_split = {len(corpus)}') | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=400) | |
docs = text_splitter.split_documents(corpus) | |
num_total_characters = sum([len(x.page_content) for x in docs]) | |
print(f" Total de chunks depois do text_split = {len(docs)}") | |
print(f" Média de caracteres por chunk = {num_total_characters / len(docs):,.0f}") | |
dict_filename_url = read_csv_to_dict('./local_base/filename_url.csv') | |
for doc in docs: | |
filename = os.path.basename(doc.metadata["source"]) | |
doc.metadata["link"] = dict_filename_url.get(filename) | |
#print('docs') | |
#print(docs) | |
fc_embeddings = OpenAIEmbeddings(openai_api_key=os.environ['OPENAI_KEY']) | |
vectorstore = Chroma.from_documents(docs, fc_embeddings) | |
print('total de docs no vectorstore=',len(vectorstore.get()['documents'])) | |
return vectorstore | |
except Exception as e: | |
print(str(e)) | |
return str(e) | |
class Rag: | |
def __init__(self, vectorstore, min_words, max_words): | |
self.text = None | |
self.vectorstore = vectorstore | |
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True, output_key="answer") | |
prompt_template = """Your task is to create news to a newspaper based on pieces of texts delimited by <> and a question delimited by <>. | |
Do not make up any information, create the news just based on the given information on the pieces of texts delimited by <>. | |
Do not use only your knowledge to make the news. Make the news based on the pieces of text. If the pieces of text doesn't have any relevant information, just say that you need more information to make the news. | |
The news should have a tittle. | |
The news should be written in a formal language. | |
The news should have between {min_words} and {max_words} words and it should be in portuguese language. | |
The news should be about the following context: <{context}> | |
Question: <{question}> | |
Answer here:""" | |
self.prompt = PromptTemplate(template=prompt_template, | |
input_variables=["context", "question"], | |
partial_variables={"min_words": min_words, "max_words": max_words}) | |
self.qa = ConversationalRetrievalChain.from_llm( | |
llm=ChatOpenAI(model_name="gpt-3.5-turbo-0125", | |
temperature=0.1, | |
openai_api_key=os.environ['OPENAI_KEY'], | |
max_tokens=int(int(max_words) + (int(max_words) / 2))), #número máximo de tokens para a resposta | |
memory=self.memory, | |
#retriever=vectorstore.as_retriever(search_type='similarity_score_threshold', | |
# search_kwargs={'k':4, 'score_threshold':0.5}), #search_kwargs={'k': 3} | |
retriever=vectorstore.as_retriever(), | |
combine_docs_chain_kwargs={"prompt": self.prompt}, | |
chain_type="stuff",#map_reduce, refine, map_rerank | |
return_source_documents=True, | |
) | |
def generate_text(self, subject): | |
try: | |
query = f"Elabore uma nova notícia sobre {subject}." | |
result_text = self.qa.invoke({"question": query}) | |
print('##### result', result_text) | |
list_result_sources = [] | |
str_result_sources = '' | |
for doc in result_text["source_documents"]: | |
list_result_sources.append(doc.metadata['link']) | |
result_sources = list(set(list_result_sources)) | |
for i in range(len(result_sources)): | |
str_result_sources += f'{i + 1}) {result_sources[i]}' + '\n' | |
self.vectorstore.delete_collection() | |
return (result_text["answer"], str_result_sources) | |
except Exception as e: | |
self.vectorstore.delete_collection() | |
return str(e) | |