|
import os |
|
import logging |
|
import gradio as gr |
|
from transformers import pipeline |
|
from sentence_transformers import SentenceTransformer, util |
|
import PyPDF2 |
|
|
|
|
|
logging.basicConfig( |
|
filename='support_bot_log.txt', |
|
level=logging.INFO, |
|
format='%(asctime)s - %(message)s', |
|
force=True |
|
) |
|
logger = logging.getLogger() |
|
|
|
|
|
qa_model = pipeline("question-answering", model="distilbert-base-uncased-distilled-squad") |
|
embedder = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
def extract_text_from_pdf(file_path): |
|
text = "" |
|
with open(file_path, "rb") as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
for page in pdf_reader.pages: |
|
text += page.extract_text() + "\n" |
|
return text |
|
|
|
|
|
|
|
def find_relevant_section(query, sections, section_embeddings): |
|
""" |
|
1. First, it performs a semantic search using cosine similarity. |
|
2. If the similarity score is below a threshold, it falls back to a keyword-based search. |
|
""" |
|
|
|
stopwords = {"and", "the", "is", "for", "to", "a", "an", "of", "in", "on", "at", "with", "by", "it", "as", "so", "what"} |
|
|
|
|
|
query_embedding = embedder.encode(query, convert_to_tensor=True) |
|
similarities = util.cos_sim(query_embedding, section_embeddings)[0] |
|
best_idx = similarities.argmax().item() |
|
best_section = sections[best_idx] |
|
similarity_score = similarities[best_idx].item() |
|
|
|
|
|
|
|
SIMILARITY_THRESHOLD = 0.4 |
|
if similarity_score >= SIMILARITY_THRESHOLD: |
|
logger.info(f"Found relevant section using embeddings for query: {query}") |
|
return best_section |
|
|
|
logger.info(f"Low similarity ({similarity_score}). Falling back to keyword search.") |
|
|
|
|
|
|
|
query_words = {word for word in query.lower().split() if word not in stopwords} |
|
for section in sections: |
|
section_words = {word for word in section.lower().split() if word not in stopwords} |
|
common_words = query_words.intersection(section_words) |
|
|
|
|
|
|
|
if len(common_words) >= 2: |
|
logger.info(f"Keyword match found for query: {query} with common words: {common_words}") |
|
return section |
|
|
|
logger.info(f"No good keyword match found. Returning default fallback response.") |
|
return "I don’t have enough information to answer that." |
|
|
|
def process_file(file, state): |
|
"""Handles the uploaded file, processes its text, and prepares it for querying.""" |
|
|
|
if file is None: |
|
logger.info("No file uploaded.") |
|
return [("Bot", "Please upload a file.")], state |
|
|
|
|
|
|
|
file_path = file.name |
|
if file_path.lower().endswith(".pdf"): |
|
logger.info(f"Uploaded PDF file: {file_path}") |
|
text = extract_text_from_pdf(file_path) |
|
elif file_path.lower().endswith(".txt"): |
|
logger.info(f"Uploaded TXT file: {file_path}") |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
text = f.read() |
|
else: |
|
logger.error(f"Unsupported file format: {file_path}") |
|
return [("Bot", "Unsupported file format. Please upload a PDF or TXT file.")], state |
|
|
|
|
|
sections = text.split('\n\n') |
|
section_embeddings = embedder.encode(sections, convert_to_tensor=True) |
|
|
|
|
|
state['document_text'] = text |
|
state['sections'] = sections |
|
state['section_embeddings'] = section_embeddings |
|
state['current_query'] = None |
|
state['feedback_count'] = 0 |
|
state['mode'] = 'waiting_for_query' |
|
state['chat_history'] = [("Bot", "File processed. You can now ask questions.")] |
|
logger.info(f"Processed file: {file_path}") |
|
return state['chat_history'], state |
|
|
|
|
|
def handle_input(user_input, state): |
|
"""Processes user queries, fetches answers, and handles feedback loops.""" |
|
|
|
if state['mode'] == 'waiting_for_upload': |
|
state['chat_history'].append(("Bot", "Please upload a file first.")) |
|
logger.info("User attempted to interact without uploading a file.") |
|
|
|
elif state['mode'] == 'waiting_for_query': |
|
query = user_input |
|
state['current_query'] = query |
|
state['feedback_count'] = 0 |
|
|
|
|
|
context = find_relevant_section(query, state['sections'], state['section_embeddings']) |
|
|
|
|
|
if context == "I don’t have enough information to answer that.": |
|
answer = context |
|
else: |
|
result = qa_model(question=query, context=context) |
|
answer = result["answer"] |
|
|
|
state['last_answer'] = answer |
|
state['mode'] = 'waiting_for_feedback' |
|
state['chat_history'].append(("User", query)) |
|
state['chat_history'].append(("Bot", f"Answer: {answer}\nPlease provide feedback: good, too vague, not helpful.")) |
|
logger.info(f"Query: {query}, Answer: {answer}") |
|
|
|
elif state['mode'] == 'waiting_for_feedback': |
|
feedback = user_input.lower() |
|
state['chat_history'].append(("User", feedback)) |
|
logger.info(f"Feedback: {feedback}") |
|
|
|
|
|
|
|
if feedback == "good" or state['feedback_count'] >= 2: |
|
state['mode'] = 'waiting_for_query' |
|
|
|
if feedback == "good": |
|
state['chat_history'].append(("Bot", "Thank you for your feedback. You can ask another question.")) |
|
logger.info("Feedback accepted as 'good'. Waiting for next query.") |
|
|
|
else: |
|
state['chat_history'].append(("Bot", "Maximum feedback iterations reached. You can ask another question.")) |
|
logger.info("Max feedback iterations reached. Waiting for next query.") |
|
|
|
else: |
|
query = state['current_query'] |
|
context = find_relevant_section(query, state['sections'], state['section_embeddings']) |
|
|
|
if feedback == "too vague": |
|
adjusted_answer = f"{state['last_answer']}\n\n(More details:\n{context[:500]}...)" |
|
|
|
elif feedback == "not helpful": |
|
adjusted_answer = qa_model(question=query + " Please provide more detailed information with examples.", context=context)['answer'] |
|
|
|
else: |
|
state['chat_history'].append(("Bot", "Please provide valid feedback: good, too vague, not helpful.")) |
|
logger.info(f"Invalid feedback received: {feedback}") |
|
return state['chat_history'], state |
|
|
|
state['last_answer'] = adjusted_answer |
|
state['feedback_count'] += 1 |
|
state['chat_history'].append(("Bot", f"Updated answer: {adjusted_answer}\nPlease provide feedback: good, too vague, not helpful.")) |
|
logger.info(f"Adjusted answer: {adjusted_answer}") |
|
|
|
return state['chat_history'], state |
|
|
|
|
|
|
|
def get_log_file(): |
|
|
|
for handler in logger.handlers: |
|
handler.flush() |
|
|
|
if not os.path.exists("support_bot_log.txt"): |
|
with open("support_bot_log.txt", "w", encoding="utf-8") as f: |
|
f.write("") |
|
logger.info("Log file downloaded by user.") |
|
return "support_bot_log.txt" |
|
|
|
|
|
initial_state = { |
|
'document_text': None, |
|
'sections': None, |
|
'section_embeddings': None, |
|
'current_query': None, |
|
'feedback_count': 0, |
|
'mode': 'waiting_for_upload', |
|
'chat_history': [("Bot", "Please upload a PDF or TXT file to start.")], |
|
'last_answer': None |
|
} |
|
|
|
|
|
with gr.Blocks() as demo: |
|
state = gr.State(initial_state) |
|
|
|
with gr.Row(): |
|
file_upload = gr.File(label="Upload PDF or TXT file") |
|
download_btn = gr.Button("Download Log") |
|
download_file = gr.File(label="Log File", interactive=False) |
|
|
|
chat = gr.Chatbot() |
|
user_input = gr.Textbox(label="Your query or feedback") |
|
submit_btn = gr.Button("Submit") |
|
|
|
|
|
file_upload.upload(process_file, inputs=[file_upload, state], outputs=[chat, state]) |
|
|
|
|
|
submit_btn.click(handle_input, inputs=[user_input, state], outputs=[chat, state]).then(lambda: "", None, user_input) |
|
|
|
|
|
download_btn.click(fn=get_log_file, inputs=[], outputs=download_file) |
|
|
|
demo.launch(share=True) |