|
import gradio as gr |
|
from transformers import pipeline |
|
from sentence_transformers import SentenceTransformer, util |
|
import PyPDF2 |
|
import datetime |
|
import os |
|
|
|
|
|
qa_model = pipeline("question-answering", model="distilbert-base-uncased-distilled-squad") |
|
embedder = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
def extract_text_from_pdf(file_path): |
|
text = "" |
|
with open(file_path, "rb") as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
for page in pdf_reader.pages: |
|
text += page.extract_text() + "\n" |
|
return text |
|
|
|
|
|
def find_relevant_section(query, sections, section_embeddings, log_messages): |
|
stopwords = {"and", "the", "is", "for", "to", "a", "an", "of", "in", "on", "at", "with", "by", "it", "as", "so", "what"} |
|
|
|
|
|
query_embedding = embedder.encode(query, convert_to_tensor=True) |
|
similarities = util.cos_sim(query_embedding, section_embeddings)[0] |
|
best_idx = similarities.argmax().item() |
|
best_section = sections[best_idx] |
|
similarity_score = similarities[best_idx].item() |
|
|
|
SIMILARITY_THRESHOLD = 0.4 |
|
if similarity_score >= SIMILARITY_THRESHOLD: |
|
log_messages = log_message(f"Found relevant section using embeddings for query: {query}", log_messages) |
|
return best_section, log_messages |
|
|
|
log_messages = log_message(f"Low similarity ({similarity_score}). Falling back to keyword search.", log_messages) |
|
|
|
|
|
query_words = {word for word in query.lower().split() if word not in stopwords} |
|
for section in sections: |
|
section_words = {word for word in section.lower().split() if word not in stopwords} |
|
common_words = query_words.intersection(section_words) |
|
if len(common_words) >= 2: |
|
log_messages = log_message(f"Keyword match found for query: {query} with common words: {common_words}", log_messages) |
|
return section, log_messages |
|
|
|
log_messages = log_message(f"No good keyword match found. Returning default fallback response.", log_messages) |
|
return "I don’t have enough information to answer that.", log_messages |
|
|
|
|
|
def process_file(file, state, log_messages): |
|
if file is None: |
|
log_messages = log_message("No file uploaded.", log_messages) |
|
return [("Bot", "Please upload a file.")], state, log_messages |
|
|
|
file_path = file.name |
|
if file_path.lower().endswith(".pdf"): |
|
log_messages = log_message(f"Uploaded PDF file: {file_path}", log_messages) |
|
text = extract_text_from_pdf(file_path) |
|
elif file_path.lower().endswith(".txt"): |
|
log_messages = log_message(f"Uploaded TXT file: {file_path}", log_messages) |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
text = f.read() |
|
else: |
|
log_messages = log_message(f"Unsupported file format: {file_path}", log_messages) |
|
return [("Bot", "Unsupported file format. Please upload a PDF or TXT file.")], state, log_messages |
|
|
|
sections = text.split('\n\n') |
|
section_embeddings = embedder.encode(sections, convert_to_tensor=True) |
|
state['document_text'] = text |
|
state['sections'] = sections |
|
state['section_embeddings'] = section_embeddings |
|
state['current_query'] = None |
|
state['feedback_count'] = 0 |
|
state['mode'] = 'waiting_for_query' |
|
state['chat_history'] = [("Bot", "File processed. You can now ask questions.")] |
|
log_messages = log_message(f"Processed file: {file_path}", log_messages) |
|
return state['chat_history'], state, log_messages |
|
|
|
|
|
def handle_input(user_input, state, log_messages): |
|
if state['mode'] == 'waiting_for_upload': |
|
state['chat_history'].append(("Bot", "Please upload a file first.")) |
|
log_messages = log_message("User attempted to interact without uploading a file.", log_messages) |
|
return state['chat_history'], state, log_messages |
|
elif state['mode'] == 'waiting_for_query': |
|
if user_input.lower() == "exit": |
|
log_messages = log_message("User entered 'exit'. Ending session.", log_messages) |
|
state['mode'] = 'exited' |
|
state['chat_history'].append(("User", "exit")) |
|
state['chat_history'].append(("Bot", "Session ended. You can download the log file.")) |
|
return state['chat_history'], state, log_messages |
|
|
|
query = user_input |
|
state['current_query'] = query |
|
state['feedback_count'] = 0 |
|
context, log_messages = find_relevant_section(query, state['sections'], state['section_embeddings'], log_messages) |
|
if context == "I don’t have enough information to answer that.": |
|
answer = context |
|
else: |
|
result = qa_model(question=query, context=context) |
|
answer = result["answer"] |
|
state['last_answer'] = answer |
|
state['mode'] = 'waiting_for_feedback' |
|
state['chat_history'].append(("User", query)) |
|
state['chat_history'].append(("Bot", f"Answer: {answer}\nPlease provide feedback: good, too vague, not helpful.")) |
|
|
|
log_messages = log_message(f"Query: {query}, Answer: {answer}", log_messages) |
|
elif state['mode'] == 'waiting_for_feedback': |
|
if user_input.lower() == "exit": |
|
log_messages = log_message("User entered 'exit'. Ending session.", log_messages) |
|
state['mode'] = 'exited' |
|
state['chat_history'].append(("User", "exit")) |
|
state['chat_history'].append(("Bot", "Session ended. You can download the log file.")) |
|
return state['chat_history'], state, log_messages |
|
|
|
feedback = user_input.lower() |
|
state['chat_history'].append(("User", feedback)) |
|
log_messages = log_message(f"Feedback: {feedback}", log_messages) |
|
if feedback == "good" or state['feedback_count'] >= 2: |
|
state['mode'] = 'waiting_for_query' |
|
if feedback == "good": |
|
state['chat_history'].append(("Bot", "Thank you for your feedback. You can ask another question.")) |
|
log_messages = log_message("Feedback accepted as 'good'. Waiting for next query.", log_messages) |
|
else: |
|
state['chat_history'].append(("Bot", "Maximum feedback iterations reached. You can ask another question.")) |
|
log_messages = log_message("Max feedback iterations reached. Waiting for next query.", log_messages) |
|
else: |
|
query = state['current_query'] |
|
context, log_messages = find_relevant_section(query, state['sections'], state['section_embeddings'], log_messages) |
|
if feedback == "too vague": |
|
adjusted_answer = f"{state['last_answer']}\n\n(More details:\n{context[:500]}...)" |
|
elif feedback == "not helpful": |
|
adjusted_answer = qa_model(question=query + " Please provide more detailed information with examples.", context=context)['answer'] |
|
else: |
|
state['chat_history'].append(("Bot", "Please provide valid feedback: good, too vague, not helpful.")) |
|
log_messages = log_message(f"Invalid feedback received: {feedback}", log_messages) |
|
return state['chat_history'], state, log_messages |
|
state['last_answer'] = adjusted_answer |
|
state['feedback_count'] += 1 |
|
state['chat_history'].append(("Bot", f"Updated answer: {adjusted_answer}\nPlease provide feedback: good, too vague, not helpful.")) |
|
log_messages = log_message(f"Adjusted answer: {adjusted_answer}", log_messages) |
|
elif state['mode'] == 'exited': |
|
state['chat_history'].append(("Bot", "Session is over. Please download the log.")) |
|
log_messages = log_message("User interacted after exiting.", log_messages) |
|
return state['chat_history'], state, log_messages |
|
|
|
|
|
initial_state = { |
|
'document_text': None, |
|
'sections': None, |
|
'section_embeddings': None, |
|
'current_query': None, |
|
'feedback_count': 0, |
|
'mode': 'waiting_for_upload', |
|
'chat_history': [("Bot", "Please upload a PDF or TXT file to start.")], |
|
'last_answer': None |
|
} |
|
|
|
|
|
log_messages = [] |
|
|
|
|
|
def log_message(message, log_messages): |
|
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
log_entry = f"{timestamp} - {message}" |
|
log_messages.append(log_entry) |
|
return log_messages |
|
|
|
|
|
def save_logs_to_file(log_messages): |
|
with open("support_bot_log.txt", "w") as log_file: |
|
for log_message in log_messages: |
|
log_file.write(log_message + "\n") |
|
|
|
|
|
with gr.Blocks() as demo: |
|
state = gr.State(initial_state) |
|
file_upload = gr.File(label="Upload PDF or TXT file") |
|
chat = gr.Chatbot() |
|
user_input = gr.Textbox(label="Your query or feedback") |
|
submit_btn = gr.Button("Submit") |
|
download_log_btn = gr.Button("Download Log File") |
|
log_file = gr.File(label="Log File") |
|
|
|
|
|
file_upload.upload(process_file, inputs=[file_upload, state, gr.State(log_messages)], outputs=[chat, state, gr.State(log_messages)]) |
|
|
|
|
|
submit_btn.click(handle_input, inputs=[user_input, state, gr.State(log_messages)], outputs=[chat, state, gr.State(log_messages)]).then(lambda: "", None, user_input) |
|
|
|
|
|
|
|
|
|
download_log_btn.click( |
|
lambda log_messages: "support_bot_log.txt", |
|
inputs=[gr.State(log_messages)], |
|
outputs=[log_file] |
|
) |
|
|
|
|
|
user_input.submit( |
|
lambda user_input, state, log_messages: ( |
|
save_logs_to_file(log_messages) if user_input.lower() == "exit" else None, |
|
state |
|
), |
|
[user_input, state, gr.State(log_messages)], |
|
[log_file, state] |
|
) |
|
|
|
demo.launch(share=True) |