Spaces:
Build error
Build error
# app.py | |
import spaces | |
from torch.nn import DataParallel | |
from torch import Tensor | |
from transformers import AutoTokenizer, AutoModel | |
from huggingface_hub import InferenceClient | |
from openai import OpenAI | |
from langchain_community.embeddings import HuggingFaceInstructEmbeddings | |
from langchain_community.document_loaders import UnstructuredFileLoader | |
from langchain_chroma import Chroma | |
from chromadb import Documents, EmbeddingFunction, Embeddings | |
from chromadb.config import Settings | |
import chromadb #import HttpClient | |
from typing import List, Tuple, Dict, Any | |
import os | |
import re | |
import uuid | |
import gradio as gr | |
import torch | |
import torch.nn.functional as F | |
from dotenv import load_dotenv | |
from utils import load_env_variables, parse_and_route , escape_special_characters | |
from globalvars import API_BASE, intention_prompt, tasks, system_message, model_name , metadata_prompt | |
# import time | |
# import httpx | |
from langchain_community.chat_models import ChatOpenAI | |
from langchain.retrievers.document_compressors import LLMChainExtractor | |
from langchain.retrievers.multi_query import MultiQueryRetriever | |
from langchain.retrievers import ContextualCompressionRetriever | |
from langchain.prompts.chat import ChatPromptTemplate, HumanMessagePromptTemplate | |
# from langchain.vectorstores import Chroma | |
load_dotenv() | |
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:50' | |
os.environ['CUDA_LAUNCH_BLOCKING'] = '1' | |
os.environ['CUDA_CACHE_DISABLE'] = '1' | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
### Utils | |
hf_token, yi_token = load_env_variables() | |
# tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token, trust_remote_code=True) | |
# Lazy load model | |
model = None | |
def load_model(): | |
global model | |
if model is None: | |
from transformers import AutoModel | |
model = AutoModel.from_pretrained(model_name, token=hf_token, trust_remote_code=True).to(device) | |
return model | |
# Load model | |
nvidiamodel = load_model() | |
# nvidiamodel.set_pooling_include_prompt(include_prompt=False) | |
def clear_cuda_cache(): | |
torch.cuda.empty_cache() | |
client = OpenAI(api_key=yi_token, base_url=API_BASE) | |
chroma_client = chromadb.Client(Settings()) | |
# Create a collection | |
chroma_collection = chroma_client.create_collection("all-my-documents") | |
class MyEmbeddingFunction(EmbeddingFunction): | |
def __init__(self, model_name: str, token: str, intention_client): | |
self.model_name = model_name | |
self.token = token | |
self.intention_client = intention_client | |
self.hf_embeddings = HuggingFaceInstructEmbeddings( | |
model_name=model_name, | |
model_kwargs={'device': 'cuda' if torch.cuda.is_available() else 'cpu'}, | |
encode_kwargs={'normalize_embeddings': True} | |
) | |
def create_embedding_generator(self): | |
return self.hf_embeddings | |
def __call__(self, input: Documents) -> (List[List[float]], List[Dict[str, Any]]): | |
embeddings_with_metadata = [self.compute_embeddings(doc.page_content) for doc in input] | |
embeddings = [item[0] for item in embeddings_with_metadata] | |
metadata = [item[1] for item in embeddings_with_metadata] | |
embeddings_flattened = [emb for sublist in embeddings for emb in sublist] | |
metadata_flattened = [meta for sublist in metadata for meta in sublist] | |
return embeddings_flattened, metadata_flattened | |
def compute_embeddings(self, input_text: str): | |
escaped_input_text = escape_special_characters(input_text) | |
# Get the intention | |
intention_completion = self.intention_client.chat.completions.create( | |
model="yi-large", | |
messages=[ | |
{"role": "system", "content": escape_special_characters(intention_prompt)}, | |
{"role": "user", "content": escaped_input_text} | |
] | |
) | |
intention_output = intention_completion.choices[0].message.content | |
parsed_task = parse_and_route(intention_output) | |
selected_task = parsed_task if parsed_task in tasks else "DEFAULT" | |
task_description = tasks[selected_task] | |
# query_prefix = "Instruct: " +tasks[selected_task] +"\nQuery: " | |
# Construct the embed_instruction and query_instruction dynamically | |
embed_instruction = f"Instruct: {task_description}" +"\nQuery:" | |
# query_instruction = f"" | |
# Update the hf_embeddings object with the new instructions | |
self.hf_embeddings.embed_instruction = embed_instruction | |
# self.hf_embeddings.query_instruction = query_instruction | |
# Get the metadata | |
metadata_completion = self.intention_client.chat.completions.create( | |
model="yi-large", | |
messages=[ | |
{"role": "system", "content": escape_special_characters(metadata_prompt)}, | |
{"role": "user", "content": escaped_input_text} | |
] | |
) | |
metadata_output = metadata_completion.choices[0].message.content | |
metadata = self.extract_metadata(metadata_output) | |
# Get the embeddings | |
embeddings = self.hf_embeddings.embed_documents([escaped_input_text]) | |
return embeddings[0], metadata | |
def extract_metadata(self, metadata_output: str) -> Dict[str, str]: | |
pattern = re.compile(r'\"(\w+)\": \"([^\"]+)\"') | |
matches = pattern.findall(metadata_output) | |
metadata = {key: value for key, value in matches} | |
return metadata | |
def load_documents(file_path: str, mode: str = "elements"): | |
loader = UnstructuredFileLoader(file_path, mode=mode) | |
docs = loader.load() | |
return [doc.page_content for doc in docs] | |
def initialize_chroma(collection_name: str, embedding_function: MyEmbeddingFunction): | |
db = Chroma(client=chroma_client, collection_name=collection_name, embedding_function=embedding_function) | |
return db | |
def add_documents_to_chroma(documents: list, embedding_function: MyEmbeddingFunction): | |
for doc in documents: | |
embeddings, metadata = embedding_function.compute_embeddings(doc) | |
for embedding, meta in zip(embeddings, metadata): | |
chroma_collection.add( | |
ids=[str(uuid.uuid1())], | |
documents=[doc], | |
embeddings=[embedding], | |
metadatas=[meta] | |
) | |
def query_chroma(query_text: str, embedding_function: MyEmbeddingFunction): | |
model = load_model() | |
query_embeddings, query_metadata = embedding_function.compute_embeddings(query_text) | |
result_docs = chroma_collection.query( | |
query_texts=[query_text], | |
n_results=3 | |
) | |
return result_docs | |
def answer_query(message: str, chat_history: List[Tuple[str, str]]): | |
base_compressor = LLMChainExtractor.from_llm(intention_client) | |
db = Chroma(persist_directory="output/general_knowledge", embedding_function=embedding_function) | |
base_retriever = db.as_retriever() | |
mq_retriever = MultiQueryRetriever.from_llm(retriever=base_retriever, llm=intention_client) | |
compression_retriever = ContextualCompressionRetriever(base_compressor=base_compressor, base_retriever=mq_retriever) | |
matched_docs = compression_retriever.get_relevant_documents(query=message) | |
context = "" | |
for doc in matched_docs: | |
page_content = doc.page_content | |
context += page_content | |
context += "\n\n" | |
template = """ | |
Answer the following question only by using the context given below in the triple backticks, do not use any other information to answer the question. | |
If you can't answer the given question with the given context, you can return an empty string ('') | |
Context: ```{context}``` | |
---------------------------- | |
Question: {query} | |
---------------------------- | |
Answer: """ | |
human_message_prompt = HumanMessagePromptTemplate.from_template(template=template) | |
chat_prompt = ChatPromptTemplate.from_messages([human_message_prompt]) | |
prompt = chat_prompt.format_prompt(query=message, context=context) | |
response = intention_client.chat(messages=prompt.to_messages()).content | |
chat_history.append((message, response)) | |
return "", chat_history | |
# Initialize clients | |
intention_client = OpenAI(api_key=yi_token, base_url=API_BASE) | |
embedding_function = MyEmbeddingFunction(model_name=model_name, token=hf_token, intention_client=intention_client) | |
chroma_db = initialize_chroma(collection_name="Tonic-instruct", embedding_function=embedding_function) | |
def upload_documents(files): | |
for file in files: | |
loader = UnstructuredFileLoader(file.name) | |
documents = loader.load() | |
add_documents_to_chroma(documents, embedding_function) | |
return "Documents uploaded and processed successfully!" | |
def query_documents(query): | |
model = load_model() | |
results = query_chroma(query) | |
return "\n\n".join([result.content for result in results]) | |
with gr.Blocks() as demo: | |
with gr.Tab("Upload Documents"): | |
document_upload = gr.File(file_count="multiple", file_types=["document"]) | |
upload_button = gr.Button("Upload and Process") | |
upload_button.click(upload_documents, inputs=document_upload, outputs=gr.Text()) | |
with gr.Tab("Ask Questions"): | |
with gr.Row(): | |
chat_interface = gr.ChatInterface( | |
answer_query, | |
additional_inputs=[ | |
gr.Textbox(value="You are a friendly Chatbot.", label="System message"), | |
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), | |
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"), | |
], | |
) | |
query_input = gr.Textbox(label="Query") | |
query_button = gr.Button("Query") | |
query_output = gr.Textbox() | |
query_button.click(query_documents, inputs=query_input, outputs=query_output) | |
if __name__ == "__main__": | |
# os.system("chroma run --host localhost --port 8000 &") | |
demo.launch() |