|
import os
|
|
import pinecone
|
|
import openai
|
|
import gradio as gr
|
|
import torch
|
|
from dotenv import load_dotenv
|
|
from pinecone import Pinecone
|
|
from langchain_community.embeddings import HuggingFaceEmbeddings
|
|
from rapidfuzz import fuzz
|
|
import logging
|
|
import re
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
print(f"Running on device: {device}")
|
|
|
|
|
|
import warnings
|
|
warnings.filterwarnings("ignore", category=FutureWarning, message="clean_up_tokenization_spaces was not set")
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
pinecone_api_key = os.getenv("PINECONE_API_KEY")
|
|
openai.api_key = os.getenv("OPENAI_API_KEY")
|
|
index_name = "amtrak-acela-ai-demo"
|
|
|
|
|
|
pc = Pinecone(api_key=pinecone_api_key)
|
|
|
|
|
|
def initialize_pinecone_index(index_name):
|
|
available_indexes = pc.list_indexes().names()
|
|
if index_name not in available_indexes:
|
|
print(f"Index '{index_name}' does not exist.")
|
|
|
|
return pc.Index(index_name)
|
|
|
|
index = initialize_pinecone_index(index_name)
|
|
|
|
|
|
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/msmarco-distilbert-base-v4")
|
|
|
|
|
|
chat_history = []
|
|
|
|
|
|
def preprocess_text(text):
|
|
|
|
text = re.sub(r'[^\w\s]', '', text.lower())
|
|
return text.strip()
|
|
|
|
|
|
def flatten_to_string(data):
|
|
if isinstance(data, list):
|
|
return " ".join([flatten_to_string(item) for item in data])
|
|
if data is None:
|
|
return ""
|
|
return str(data)
|
|
|
|
|
|
def get_model_response(human_input):
|
|
try:
|
|
|
|
processed_input = preprocess_text(human_input)
|
|
|
|
|
|
query_embedding = torch.tensor(embedding_model.embed_query(human_input)).to(device)
|
|
query_embedding = query_embedding.cpu().numpy().tolist()
|
|
|
|
|
|
search_results = index.query(vector=query_embedding, top_k=5, include_metadata=True)
|
|
|
|
context_list, images = [], []
|
|
for ind, result in enumerate(search_results['matches']):
|
|
document_content = flatten_to_string(result.get('metadata', {}).get('content', 'No content found'))
|
|
image_url = flatten_to_string(result.get('metadata', {}).get('image_path', None))
|
|
figure_desc = flatten_to_string(result.get('metadata', {}).get('figure_description', ''))
|
|
|
|
|
|
processed_figure_desc = preprocess_text(figure_desc)
|
|
similarity_score = fuzz.token_set_ratio(processed_input, processed_figure_desc)
|
|
logging.info(f"Matching '{processed_input}' with '{processed_figure_desc}', similarity score: {similarity_score}")
|
|
|
|
if similarity_score >= 80:
|
|
context_list.append(f"Relevant information: {document_content}")
|
|
if image_url and figure_desc:
|
|
images.append((figure_desc, image_url))
|
|
|
|
context_string = '\n\n'.join(context_list)
|
|
|
|
|
|
chat_history.append({"role": "user", "content": human_input})
|
|
|
|
|
|
messages = [{"role": "system", "content": "You are a helpful assistant."}] + chat_history + [
|
|
{"role": "system", "content": f"Here is some context:\n{context_string}"},
|
|
{"role": "user", "content": human_input}
|
|
]
|
|
|
|
|
|
for message in messages:
|
|
if not isinstance(message, dict) or "role" not in message or "content" not in message:
|
|
raise ValueError(f"Invalid message format: {message}")
|
|
|
|
|
|
response = openai.ChatCompletion.create(
|
|
model="gpt-3.5-turbo",
|
|
messages=messages,
|
|
max_tokens=500,
|
|
temperature=0.5
|
|
)
|
|
|
|
output_text = response['choices'][0]['message']['content'].strip()
|
|
|
|
|
|
chat_history.append({"role": "assistant", "content": output_text})
|
|
|
|
return output_text, images
|
|
|
|
except Exception as e:
|
|
return f"Error invoking model: {str(e)}", []
|
|
|
|
|
|
def get_model_response_with_images(human_input, history=None):
|
|
output_text, images = get_model_response(human_input)
|
|
if images:
|
|
|
|
image_output = "".join([f"\n\n**{figure_desc}**\n" for figure_desc, image_path in images])
|
|
return output_text + image_output
|
|
return output_text
|
|
|
|
|
|
gr_interface = gr.ChatInterface(
|
|
fn=get_model_response_with_images,
|
|
title="Maintenance Assistant",
|
|
description="Ask questions related to the RMMM documents."
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
gr_interface.launch() |