Spaces:
Build error
Build error
import subprocess | |
import os | |
import torch | |
from dotenv import load_dotenv | |
from langchain_community.vectorstores import Qdrant | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain.prompts import ChatPromptTemplate | |
from langchain.schema.runnable import RunnablePassthrough | |
from langchain.schema.output_parser import StrOutputParser | |
from qdrant_client import QdrantClient, models | |
from langchain_openai import ChatOpenAI | |
import gradio as gr | |
import logging | |
from typing import List, Tuple, Generator | |
from dataclasses import dataclass | |
from datetime import datetime | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
from langchain_huggingface.llms import HuggingFacePipeline | |
from langchain_cerebras import ChatCerebras | |
from queue import Queue | |
from threading import Thread | |
from langchain.chains import LLMChain | |
from langchain_core.prompts import PromptTemplate | |
from langchain_huggingface import HuggingFaceEndpoint | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class Message: | |
role: str | |
content: str | |
timestamp: str | |
class ChatHistory: | |
def __init__(self): | |
self.messages: List[Message] = [] | |
def add_message(self, role: str, content: str): | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
self.messages.append(Message(role=role, content=content, timestamp=timestamp)) | |
def get_formatted_history(self, max_messages: int = 10) -> str: | |
recent_messages = self.messages[-max_messages:] if len(self.messages) > max_messages else self.messages | |
formatted_history = "\n".join([ | |
f"{msg.role}: {msg.content}" for msg in recent_messages | |
]) | |
return formatted_history | |
def clear(self): | |
self.messages = [] | |
# Load environment variables and setup | |
load_dotenv() | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
C_apikey = os.getenv("C_apikey") | |
OPENAPI_KEY = os.getenv("OPENAPI_KEY") | |
if not HF_TOKEN: | |
logger.error("HF_TOKEN is not set in the environment variables.") | |
exit(1) | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
try: | |
client = QdrantClient( | |
url=os.getenv("QDRANT_URL"), | |
api_key=os.getenv("QDRANT_API_KEY"), | |
prefer_grpc=False | |
) | |
except Exception as e: | |
logger.error("Failed to connect to Qdrant.") | |
exit(1) | |
collection_name = "mawared" | |
try: | |
client.create_collection( | |
collection_name=collection_name, | |
vectors_config=models.VectorParams( | |
size=384, | |
distance=models.Distance.COSINE | |
) | |
) | |
except Exception as e: | |
if "already exists" not in str(e): | |
logger.error(f"Error creating collection: {e}") | |
exit(1) | |
db = Qdrant( | |
client=client, | |
collection_name=collection_name, | |
embeddings=embeddings, | |
) | |
retriever = db.as_retriever( | |
search_type="similarity", | |
search_kwargs={"k": 5} | |
) | |
llm = ChatCerebras( | |
model="llama-3.3-70b", | |
api_key=C_apikey, | |
streaming=True | |
) | |
# llm = ChatOpenAI( | |
# model="meta-llama/Llama-3.3-70B-Instruct", | |
# temperature=0, | |
# max_tokens=None, | |
# timeout=None, | |
# max_retries=2, | |
# api_key=HF_TOKEN, # if you prefer to pass api key in directly instaed of using env vars | |
# base_url="https://api-inference.huggingface.co/v1/", | |
# stream=True, | |
# ) | |
template = """ | |
You are a specialized friendly AI assistant for the Mawared HR System, designed to provide accurate and contextually relevant support based solely on the provided context and chat history. | |
Core Principles | |
Source of Truth: Use only the information available in the retrieved context and chat history. Do not fabricate details or access external knowledge. | |
Clarity and Precision: Communicate clearly, concisely, and professionally, using straightforward language for easy comprehension. | |
Actionable Guidance: Deliver practical solutions, step-by-step workflows, and troubleshooting advice directly related to Mawared HR queries. | |
Structured Instructions: Provide numbered, easy-to-follow instructions when explaining complex processes. | |
Targeted Clarification: If a query lacks detail, ask specific questions to obtain the necessary information, explicitly stating what is missing. | |
Exclusive Focus: Address only Mawared HR-related topics and avoid unrelated discussions. | |
Professional Tone: Maintain a friendly, approachable, and professional demeanor. | |
Response Guidelines | |
Analyze the Query Thoughtfully: | |
Start by thoroughly examining the user's question and reviewing the chat history. | |
Consider what the user explicitly asked and infer their intent from the context provided. | |
Mentally identify potential gaps in information before proceeding. | |
Break Down Context Relevance: | |
Isolate and interpret relevant pieces of context that apply directly to the query. | |
Match the user's needs with the most relevant data from the context or chat history. | |
Develop the Response in a Stepwise Manner: | |
Construct a logical chain of thoughts: | |
What does the user want to achieve? | |
Which parts of the context can address this? | |
What steps or details are needed for clarity? | |
Provide responses in a structured, easy-to-follow format (e.g., numbered lists, bullet points). | |
Ask for Clarifications Strategically: | |
If the query lacks sufficient detail, identify the precise information missing. | |
Frame your clarification politely and explicitly (e.g., “Could you confirm [specific detail] to proceed with [action/task]?”). | |
Ensure Directness and Professionalism: | |
Avoid unnecessary elaborations or irrelevant information. | |
Maintain a friendly, professional tone throughout the response. | |
Double-Check for Exclusivity: | |
Ensure all guidance is strictly based on the retrieved context and chat history. | |
Avoid speculating or introducing external knowledge about Mawared HR. | |
Handling Information Gaps | |
If the provided context is insufficient to answer the query: | |
State explicitly that additional information is required to proceed. | |
Clearly outline what details are missing. | |
Avoid fabricating details or making assumptions. | |
Critical Constraint | |
STRICTLY rely on the provided context and chat history for all responses. Do not generate information about Mawared HR beyond these sources. | |
Note: Do not mention a human support contact unless explicitly asked. | |
Previous Conversation: {chat_history} | |
Retrieved Context: {context} | |
Current Question: {question} | |
Answer:{{answer}} | |
""" | |
prompt = ChatPromptTemplate.from_template(template) | |
def create_rag_chain(chat_history: str): | |
chain = ( | |
{ | |
"context": retriever, | |
"question": RunnablePassthrough(), | |
"chat_history": lambda x: chat_history | |
} | |
| prompt | |
| llm | |
| StrOutputParser() | |
) | |
return chain | |
chat_history = ChatHistory() | |
def process_stream(stream_queue: Queue, history: List[List[str]]) -> Generator[List[List[str]], None, None]: | |
"""Process the streaming response and update the chat interface""" | |
current_response = "" | |
while True: | |
chunk = stream_queue.get() | |
if chunk is None: # Signal that streaming is complete | |
break | |
current_response += chunk | |
new_history = history.copy() | |
new_history[-1][1] = current_response # Update the assistant's message | |
yield new_history | |
def ask_question_gradio(question: str, history: List[List[str]]) -> Generator[tuple, None, None]: | |
try: | |
if history is None: | |
history = [] | |
chat_history.add_message("user", question) | |
formatted_history = chat_history.get_formatted_history() | |
rag_chain = create_rag_chain(formatted_history) | |
# Update history with user message and empty assistant message | |
history.append([question, ""]) # User message | |
# Create a queue for streaming responses | |
stream_queue = Queue() | |
# Function to process the stream in a separate thread | |
def stream_processor(): | |
try: | |
for chunk in rag_chain.stream(question): | |
stream_queue.put(chunk) | |
stream_queue.put(None) # Signal completion | |
except Exception as e: | |
logger.error(f"Streaming error: {e}") | |
stream_queue.put(None) | |
# Start streaming in a separate thread | |
Thread(target=stream_processor).start() | |
# Yield updates to the chat interface | |
response = "" | |
for updated_history in process_stream(stream_queue, history): | |
response = updated_history[-1][1] | |
yield "", updated_history | |
# Add final response to chat history | |
chat_history.add_message("assistant", response) | |
except Exception as e: | |
logger.error(f"Error during question processing: {e}") | |
if not history: | |
history = [] | |
history.append([question, "An error occurred. Please try again later."]) | |
yield "", history | |
def clear_chat(): | |
chat_history.clear() | |
return [], "" | |
# Gradio Interface | |
with gr.Blocks() as iface: | |
gr.Image("Image.jpg", width=750, height=300, show_label=False, show_download_button=False) | |
gr.Markdown("# Mawared HR Assistant 3.0.0") | |
gr.Markdown('### Instructions') | |
gr.Markdown("Ask a question about MawaredHR and get a detailed answer") | |
chatbot = gr.Chatbot( | |
height=750, | |
show_label=False, | |
bubble_full_width=False, | |
) | |
with gr.Row(): | |
with gr.Column(scale=20): | |
question_input = gr.Textbox( | |
label="Ask a question:", | |
placeholder="Type your question here...", | |
show_label=False | |
) | |
with gr.Column(scale=4): | |
with gr.Row(): | |
with gr.Column(): | |
send_button = gr.Button("Send", variant="primary", size="sm") | |
clear_button = gr.Button("Clear Chat", size="sm") | |
# Handle both submit events (Enter key and Send button) | |
submit_events = [question_input.submit, send_button.click] | |
for submit_event in submit_events: | |
submit_event( | |
ask_question_gradio, | |
inputs=[question_input, chatbot], | |
outputs=[question_input, chatbot] | |
) | |
clear_button.click( | |
clear_chat, | |
outputs=[chatbot, question_input] | |
) | |
if __name__ == "__main__": | |
iface.launch() |