|
import logging |
|
import gradio as gr |
|
from transformers import pipeline |
|
from sentence_transformers import SentenceTransformer, util |
|
import PyPDF2 |
|
import os |
|
|
|
|
|
logger = logging.getLogger('SupportBot') |
|
logger.setLevel(logging.INFO) |
|
if logger.handlers: |
|
logger.handlers.clear() |
|
|
|
|
|
log_file_path = '/tmp/support_bot_log.txt' |
|
|
|
|
|
file_handler = logging.FileHandler(log_file_path, mode='a') |
|
file_handler.setLevel(logging.INFO) |
|
formatter = logging.Formatter('%(asctime)s - %(message)s') |
|
file_handler.setFormatter(formatter) |
|
logger.addHandler(file_handler) |
|
|
|
|
|
stream_handler = logging.StreamHandler() |
|
stream_handler.setLevel(logging.INFO) |
|
stream_handler.setFormatter(formatter) |
|
logger.addHandler(stream_handler) |
|
|
|
|
|
qa_model = pipeline("question-answering", model="distilbert-base-uncased-distilled-squad") |
|
embedder = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
def extract_text_from_pdf(file_path): |
|
text = "" |
|
with open(file_path, "rb") as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
for page in pdf_reader.pages: |
|
extracted_text = page.extract_text() |
|
if extracted_text: |
|
text += extracted_text + "\n" |
|
return text |
|
|
|
|
|
def find_relevant_section(query, sections, section_embeddings): |
|
stopwords = {"and", "the", "is", "for", "to", "a", "an", "of", "in", "on", "at", "with", "by", "it", "as", "so", "what"} |
|
|
|
logger.info(f"Searching for relevant section for query: {query}") |
|
query_embedding = embedder.encode(query, convert_to_tensor=True) |
|
similarities = util.cos_sim(query_embedding, section_embeddings)[0] |
|
best_idx = similarities.argmax().item() |
|
best_section = sections[best_idx] |
|
similarity_score = similarities[best_idx].item() |
|
|
|
SIMILARITY_THRESHOLD = 0.4 |
|
if similarity_score >= SIMILARITY_THRESHOLD: |
|
logger.info(f"Found relevant section using embeddings (score: {similarity_score})") |
|
file_handler.flush() |
|
return best_section |
|
|
|
logger.info(f"Low similarity ({similarity_score}). Falling back to keyword search.") |
|
query_words = {word for word in query.lower().split() if word not in stopwords} |
|
for section in sections: |
|
section_words = {word for word in section.lower().split() if word not in stopwords} |
|
common_words = query_words.intersection(section_words) |
|
if len(common_words) >= 2: |
|
logger.info(f"Keyword match found with common words: {common_words}") |
|
file_handler.flush() |
|
return section |
|
|
|
logger.info("No good match found. Returning default response.") |
|
file_handler.flush() |
|
return "I don’t have enough information to answer that." |
|
|
|
|
|
def process_file(file, state): |
|
logger.info("Received file upload request") |
|
if file is None: |
|
logger.info("No file uploaded") |
|
file_handler.flush() |
|
return [("Bot", "Please upload a file.")], state |
|
|
|
|
|
file_path = file.name |
|
temp_file_path = os.path.join("/tmp", os.path.basename(file_path)) |
|
with open(temp_file_path, "wb") as f: |
|
|
|
if hasattr(file, "read"): |
|
content = file.read() |
|
else: |
|
content = file |
|
if isinstance(content, str): |
|
content = content.encode("utf-8") |
|
f.write(content) |
|
|
|
if temp_file_path.lower().endswith(".pdf"): |
|
logger.info(f"Processing PDF file: {temp_file_path}") |
|
text = extract_text_from_pdf(temp_file_path) |
|
elif temp_file_path.lower().endswith(".txt"): |
|
logger.info(f"Processing TXT file: {temp_file_path}") |
|
with open(temp_file_path, 'r', encoding='utf-8') as f: |
|
text = f.read() |
|
else: |
|
logger.error(f"Unsupported file format: {temp_file_path}") |
|
file_handler.flush() |
|
return [("Bot", "Unsupported file format. Please upload a PDF or TXT file.")], state |
|
|
|
sections = text.split('\n\n') |
|
section_embeddings = embedder.encode(sections, convert_to_tensor=True) |
|
state['document_text'] = text |
|
state['sections'] = sections |
|
state['section_embeddings'] = section_embeddings |
|
state['current_query'] = None |
|
state['feedback_count'] = 0 |
|
state['mode'] = 'waiting_for_query' |
|
state['chat_history'] = [("Bot", "File processed. You can now ask questions.")] |
|
logger.info(f"File processed successfully: {temp_file_path}") |
|
file_handler.flush() |
|
return state['chat_history'], state |
|
|
|
|
|
def handle_input(user_input, state): |
|
if state['mode'] == 'waiting_for_upload': |
|
logger.info("User input received before file upload") |
|
state['chat_history'].append(("Bot", "Please upload a file first.")) |
|
file_handler.flush() |
|
elif state['mode'] == 'waiting_for_query': |
|
query = user_input |
|
logger.info(f"User query: {query}") |
|
state['current_query'] = query |
|
state['feedback_count'] = 0 |
|
context = find_relevant_section(query, state['sections'], state['section_embeddings']) |
|
if context == "I don’t have enough information to answer that.": |
|
answer = context |
|
else: |
|
result = qa_model(question=query, context=context) |
|
answer = result["answer"] |
|
state['last_answer'] = answer |
|
state['mode'] = 'waiting_for_feedback' |
|
state['chat_history'].append(("User", query)) |
|
state['chat_history'].append(("Bot", f"Answer: {answer}\nPlease provide feedback: good, too vague, not helpful.")) |
|
logger.info(f"Generated answer: {answer}") |
|
file_handler.flush() |
|
elif state['mode'] == 'waiting_for_feedback': |
|
feedback = user_input.lower() |
|
logger.info(f"User feedback: {feedback}") |
|
state['chat_history'].append(("User", feedback)) |
|
if feedback == "good" or state['feedback_count'] >= 2: |
|
state['mode'] = 'waiting_for_query' |
|
if feedback == "good": |
|
state['chat_history'].append(("Bot", "Thank you for your feedback. You can ask another question.")) |
|
logger.info("Feedback 'good' received. Ready for next query.") |
|
else: |
|
state['chat_history'].append(("Bot", "Maximum feedback iterations reached. You can ask another question.")) |
|
logger.info("Max feedback iterations (2) reached. Ready for next query.") |
|
file_handler.flush() |
|
else: |
|
query = state['current_query'] |
|
context = find_relevant_section(query, state['sections'], state['section_embeddings']) |
|
if feedback == "too vague": |
|
adjusted_answer = f"{state['last_answer']}\n\n(More details:\n{context[:500]}...)" |
|
logger.info("Feedback 'too vague'. Providing context.") |
|
elif feedback == "not helpful": |
|
adjusted_answer = qa_model(question=query + " Please provide more detailed information with examples.", context=context)['answer'] |
|
logger.info("Feedback 'not helpful'. Re-searching with modified query.") |
|
else: |
|
state['chat_history'].append(("Bot", "Please provide valid feedback: good, too vague, not helpful.")) |
|
logger.info(f"Invalid feedback received: {feedback}") |
|
file_handler.flush() |
|
return state['chat_history'], state |
|
state['last_answer'] = adjusted_answer |
|
state['feedback_count'] += 1 |
|
state['chat_history'].append(("Bot", f"Updated answer: {adjusted_answer}\nPlease provide feedback: good, too vague, not helpful.")) |
|
logger.info(f"Updated answer: {adjusted_answer}") |
|
file_handler.flush() |
|
return state['chat_history'], state |
|
|
|
|
|
initial_state = { |
|
'document_text': None, |
|
'sections': None, |
|
'section_embeddings': None, |
|
'current_query': None, |
|
'feedback_count': 0, |
|
'mode': 'waiting_for_upload', |
|
'chat_history': [("Bot", "Please upload a PDF or TXT file to start.")], |
|
'last_answer': None |
|
} |
|
|
|
|
|
with gr.Blocks() as demo: |
|
state = gr.State(initial_state) |
|
file_upload = gr.File(label="Upload PDF or TXT file") |
|
chat = gr.Chatbot() |
|
user_input = gr.Textbox(label="Your query or feedback") |
|
submit_btn = gr.Button("Submit") |
|
|
|
log_file = gr.File(label="Download Log File", value=log_file_path) |
|
|
|
file_upload.upload(process_file, inputs=[file_upload, state], outputs=[chat, state]) |
|
submit_btn.click(handle_input, inputs=[user_input, state], outputs=[chat, state]).then(lambda: "", None, user_input) |
|
|
|
demo.launch(share=True) |
|
|