|
import logging |
|
import os |
|
import gradio as gr |
|
from transformers import pipeline |
|
from sentence_transformers import SentenceTransformer, util |
|
import PyPDF2 |
|
|
|
|
|
log_file_path = "/tmp/support_bot_log.txt" |
|
logging.basicConfig(filename=log_file_path, level=logging.INFO, format='%(asctime)s - %(message)s') |
|
|
|
class SupportBotAgent: |
|
def __init__(self, document_path): |
|
|
|
self.qa_model = pipeline("question-answering", model="distilbert-base-uncased-distilled-squad") |
|
|
|
self.embedder = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
self.document_text = self.load_document(document_path) |
|
self.sections = self.document_text.split('\n\n') |
|
self.section_embeddings = self.embedder.encode(self.sections, convert_to_tensor=True) |
|
logging.info(f"Loaded document: {document_path}") |
|
|
|
def load_document(self, path): |
|
"""Loads and extracts text from a TXT or PDF file.""" |
|
if path.lower().endswith(".txt"): |
|
file_type = "Text File" |
|
with open(path, 'r', encoding='utf-8') as file: |
|
text = file.read() |
|
elif path.lower().endswith(".pdf"): |
|
file_type = "PDF File" |
|
text = "" |
|
with open(path, "rb") as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
for page in pdf_reader.pages: |
|
page_text = page.extract_text() |
|
if page_text: |
|
text += page_text + "\n" |
|
else: |
|
file_type = "Unsupported Format" |
|
logging.error(f"Unsupported file format: {path}") |
|
raise ValueError("Unsupported file format. Please provide a TXT or PDF file.") |
|
logging.info(f"Loaded {file_type}: {path}") |
|
return text |
|
|
|
def find_relevant_section(self, query): |
|
""" |
|
First uses semantic similarity. If similarity is too low, falls back to a keyword search. |
|
""" |
|
stopwords = {"and", "the", "is", "for", "to", "a", "an", "of", "in", "on", "at", "with", "by", "it", "as", "so", "what"} |
|
query_embedding = self.embedder.encode(query, convert_to_tensor=True) |
|
similarities = util.cos_sim(query_embedding, self.section_embeddings)[0] |
|
best_idx = similarities.argmax().item() |
|
best_section = self.sections[best_idx] |
|
similarity_score = similarities[best_idx].item() |
|
SIMILARITY_THRESHOLD = 0.4 |
|
|
|
if similarity_score >= SIMILARITY_THRESHOLD: |
|
logging.info(f"Found relevant section using embeddings for query: {query}") |
|
return best_section |
|
|
|
logging.info(f"Low similarity ({similarity_score}). Falling back to keyword search.") |
|
query_words = {word for word in query.lower().split() if word not in stopwords} |
|
for section in self.sections: |
|
section_words = {word for word in section.lower().split() if word not in stopwords} |
|
common_words = query_words.intersection(section_words) |
|
if len(common_words) >= 2: |
|
logging.info(f"Keyword match found for query: {query} with common words: {common_words}") |
|
return section |
|
|
|
logging.info(f"No good keyword match found. Returning default fallback response.") |
|
return "I don’t have enough information to answer that." |
|
|
|
def answer_query(self, query): |
|
context = self.find_relevant_section(query) |
|
if not context: |
|
answer = "I don’t have enough information to answer that." |
|
else: |
|
result = self.qa_model(question=query, context=context, max_answer_len=50) |
|
answer = result["answer"] |
|
logging.info(f"Answer for query '{query}': {answer}") |
|
return answer |
|
|
|
def adjust_response(self, query, response, feedback): |
|
"""Modify the response based on user feedback.""" |
|
if feedback == "too vague": |
|
context = self.find_relevant_section(query) |
|
adjusted_response = f"{response}\n\n(More details:\n{context[:500]}...)" |
|
elif feedback == "not helpful": |
|
adjusted_response = self.answer_query(query + " Please provide more detailed information with examples.") |
|
else: |
|
adjusted_response = response |
|
logging.info(f"Adjusted answer for query '{query}': {adjusted_response}") |
|
return adjusted_response |
|
|
|
|
|
|
|
def process_file(file, state): |
|
"""Handles the file upload and initializes the SupportBotAgent.""" |
|
if file is None: |
|
logging.info("No file uploaded") |
|
return [("Bot", "Please upload a TXT or PDF file.")], state |
|
|
|
temp_path = os.path.join("/tmp", file.name) |
|
with open(temp_path, "wb") as f: |
|
f.write(file.read()) |
|
try: |
|
state["agent"] = SupportBotAgent(temp_path) |
|
except Exception as e: |
|
return [("Bot", f"Error processing file: {str(e)}")], state |
|
state["chat_history"] = [("Bot", "File loaded successfully. Enter your query (or type 'exit' to end):")] |
|
state["mode"] = "query" |
|
state["last_query"] = "" |
|
state["last_answer"] = "" |
|
state["feedback_count"] = 0 |
|
return state["chat_history"], state |
|
|
|
def process_input(user_input, state): |
|
""" |
|
Processes user input as either a query or feedback based on the current mode. |
|
Typing 'exit' stops the session. |
|
""" |
|
if state.get("mode", "query") == "ended": |
|
return state["chat_history"], state |
|
if user_input.lower() == "exit": |
|
state["chat_history"].append(("Bot", "Session ended. You may now download the log file.")) |
|
state["mode"] = "ended" |
|
return state["chat_history"], state |
|
if state["mode"] == "query": |
|
state["last_query"] = user_input |
|
answer = state["agent"].answer_query(user_input) |
|
state["last_answer"] = answer |
|
state["feedback_count"] = 0 |
|
state["chat_history"].append(("User", user_input)) |
|
state["chat_history"].append(("Bot", f"Answer: {answer}\nPlease provide feedback (good, too vague, not helpful):")) |
|
state["mode"] = "feedback" |
|
elif state["mode"] == "feedback": |
|
feedback = user_input.lower() |
|
state["chat_history"].append(("User", feedback)) |
|
if feedback == "good" or state["feedback_count"] >= 1: |
|
state["chat_history"].append(("Bot", "Thank you for your feedback. Enter your next query (or type 'exit' to end):")) |
|
state["mode"] = "query" |
|
else: |
|
new_answer = state["agent"].adjust_response(state["last_query"], state["last_answer"], feedback) |
|
state["last_answer"] = new_answer |
|
state["feedback_count"] += 1 |
|
state["chat_history"].append(("Bot", f"Updated Answer: {new_answer}\nPlease provide feedback (good, too vague, not helpful):")) |
|
return state["chat_history"], state |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
state = gr.State({"mode": "idle"}) |
|
gr.Markdown("## Customer Support Bot with Document Training") |
|
file_upload = gr.File(label="Upload TXT or PDF file") |
|
chat = gr.Chatbot() |
|
user_input = gr.Textbox(label="Enter your query or feedback") |
|
submit_btn = gr.Button("Submit") |
|
log_file = gr.File(label="Download Log File", file_count="single", interactive=False, value=log_file_path) |
|
|
|
file_upload.upload(process_file, inputs=[file_upload, state], outputs=[chat, state]) |
|
submit_btn.click(process_input, inputs=[user_input, state], outputs=[chat, state]).then(lambda: "", None, user_input) |
|
|
|
demo.launch(share=True) |