Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| from transformers import T5ForConditionalGeneration, T5Tokenizer | |
| import spacy | |
| import nltk | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from rake_nltk import Rake | |
| import pandas as pd | |
| from fpdf import FPDF | |
| import wikipediaapi | |
| from functools import lru_cache | |
| nltk.download('punkt') | |
| nltk.download('stopwords') | |
| nltk.download('brown') | |
| from nltk.tokenize import sent_tokenize | |
| nltk.download('wordnet') | |
| from nltk.corpus import wordnet | |
| import random | |
| import sense2vec | |
| from wordcloud import WordCloud | |
| import matplotlib.pyplot as plt | |
| import json | |
| import os | |
| from sentence_transformers import SentenceTransformer, util | |
| import textstat | |
| from spellchecker import SpellChecker | |
| from transformers import pipeline | |
| import re | |
| import pymupdf | |
| import uuid | |
| import time | |
| import asyncio | |
| import aiohttp | |
| # '-----------------' | |
| import smtplib | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.text import MIMEText | |
| from email.mime.base import MIMEBase | |
| from email import encoders | |
| # '------------------' | |
| print("***************************************************************") | |
| st.set_page_config( | |
| page_icon='cyclone', | |
| page_title="Question Generator", | |
| initial_sidebar_state="auto", | |
| menu_items={ | |
| "About" : "Hi this our project." | |
| } | |
| ) | |
| st.set_option('deprecation.showPyplotGlobalUse',False) | |
| class QuestionGenerationError(Exception): | |
| """Custom exception for question generation errors.""" | |
| pass | |
| # Initialize Wikipedia API with a user agent | |
| user_agent = 'QGen/1.2' | |
| wiki_wiki = wikipediaapi.Wikipedia(user_agent= user_agent,language='en') | |
| def get_session_id(): | |
| if 'session_id' not in st.session_state: | |
| st.session_state.session_id = str(uuid.uuid4()) | |
| return st.session_state.session_id | |
| def initialize_state(session_id): | |
| if 'session_states' not in st.session_state: | |
| st.session_state.session_states = {} | |
| if session_id not in st.session_state.session_states: | |
| st.session_state.session_states[session_id] = { | |
| 'generated_questions': [], | |
| # add other state variables as needed | |
| } | |
| return st.session_state.session_states[session_id] | |
| def get_state(session_id): | |
| return st.session_state.session_states[session_id] | |
| def set_state(session_id, key, value): | |
| st.session_state.session_states[session_id][key] = value | |
| def load_model(modelname): | |
| model_name = modelname | |
| model = T5ForConditionalGeneration.from_pretrained(model_name) | |
| tokenizer = T5Tokenizer.from_pretrained(model_name) | |
| return model, tokenizer | |
| # Load Spacy Model | |
| def load_nlp_models(): | |
| nlp = spacy.load("en_core_web_md") | |
| s2v = sense2vec.Sense2Vec().from_disk('s2v_old') | |
| return nlp, s2v | |
| # Load Quality Assurance Models | |
| def load_qa_models(): | |
| # Initialize BERT model for sentence similarity | |
| similarity_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| spell = SpellChecker() | |
| return similarity_model, spell | |
| with st.sidebar: | |
| select_model = st.selectbox("Select Model", ("T5-large","T5-small")) | |
| if select_model == "T5-large": | |
| modelname = "DevBM/t5-large-squad" | |
| elif select_model == "T5-small": | |
| modelname = "AneriThakkar/flan-t5-small-finetuned" | |
| nlp, s2v = load_nlp_models() | |
| similarity_model, spell = load_qa_models() | |
| context_model = similarity_model | |
| model, tokenizer = load_model(modelname) | |
| # Info Section | |
| def display_info(): | |
| st.sidebar.title("Information") | |
| st.sidebar.markdown(""" | |
| ### Question Generator System | |
| This system is designed to generate questions based on the provided context. It uses various NLP techniques and models to: | |
| - Extract keywords from the text | |
| - Map keywords to sentences | |
| - Generate questions | |
| - Provide multiple choice options | |
| - Assess the quality of generated questions | |
| #### Key Features: | |
| - **Keyword Extraction:** Combines RAKE, TF-IDF, and spaCy for comprehensive keyword extraction. | |
| - **Question Generation:** Utilizes a pre-trained T5 model for generating questions. | |
| - **Options Generation:** Creates contextually relevant multiple-choice options. | |
| - **Question Assessment:** Scores questions based on relevance, complexity, and spelling correctness. | |
| - **Feedback Collection:** Allows users to rate the generated questions and provides statistics on feedback. | |
| #### Customization Options: | |
| - Number of beams for question generation | |
| - Context window size for mapping keywords to sentences | |
| - Number of questions to generate | |
| - Additional display elements (context, answer, options, entity link, QA scores) | |
| #### Outputs: | |
| - Generated questions with multiple-choice options | |
| - Download options for CSV and PDF formats | |
| - Visualization of overall scores | |
| """) | |
| def get_pdf_text(pdf_file): | |
| doc = pymupdf.open(stream=pdf_file.read(), filetype="pdf") | |
| text = "" | |
| for page_num in range(doc.page_count): | |
| page = doc.load_page(page_num) | |
| text += page.get_text() | |
| return text | |
| def save_feedback(question, answer, rating, options, context): | |
| feedback_file = 'question_feedback.json' | |
| if os.path.exists(feedback_file): | |
| with open(feedback_file, 'r') as f: | |
| feedback_data = json.load(f) | |
| else: | |
| feedback_data = [] | |
| tpl = { | |
| 'question' : question, | |
| 'answer' : answer, | |
| 'context' : context, | |
| 'options' : options, | |
| 'rating' : rating, | |
| } | |
| # feedback_data[question] = rating | |
| feedback_data.append(tpl) | |
| print(feedback_data) | |
| with open(feedback_file, 'w') as f: | |
| json.dump(feedback_data, f) | |
| return feedback_file | |
| # ----------------------------------------------------------------------------------------- | |
| def send_email_with_attachment(email_subject, email_body, recipient_emails, sender_email, sender_password, attachment_path): | |
| msg = MIMEMultipart() | |
| msg['From'] = sender_email | |
| msg['To'] = ", ".join(recipient_emails) # Join the list of recipients with commas | |
| msg['Subject'] = email_subject | |
| msg.attach(MIMEText(email_body, 'plain')) | |
| attachment = open(attachment_path, 'rb') | |
| part = MIMEBase('application', 'octet-stream') | |
| part.set_payload(attachment.read()) | |
| encoders.encode_base64(part) | |
| part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(attachment_path)}') | |
| msg.attach(part) | |
| attachment.close() | |
| with smtplib.SMTP('smtp.gmail.com', 587) as server: | |
| server.starttls() | |
| print(sender_email) | |
| print(sender_password) | |
| server.login(sender_email, sender_password) | |
| text = msg.as_string() | |
| server.sendmail(sender_email, recipient_emails, text) | |
| # ---------------------------------------------------------------------------------- | |
| # Function to clean text | |
| def clean_text(text): | |
| text = re.sub(r"[^\x00-\x7F]", " ", text) | |
| text = re.sub(f"[\n]"," ", text) | |
| return text | |
| # Function to create text chunks | |
| def segment_text(text, max_segment_length=700, batch_size=7): | |
| sentences = sent_tokenize(text) | |
| segments = [] | |
| current_segment = "" | |
| for sentence in sentences: | |
| if len(current_segment) + len(sentence) <= max_segment_length: | |
| current_segment += sentence + " " | |
| else: | |
| segments.append(current_segment.strip()) | |
| current_segment = sentence + " " | |
| if current_segment: | |
| segments.append(current_segment.strip()) | |
| # Create batches | |
| batches = [segments[i:i + batch_size] for i in range(0, len(segments), batch_size)] | |
| return batches | |
| # Function to extract keywords using combined techniques | |
| def extract_keywords(text, extract_all): | |
| try: | |
| doc = nlp(text) | |
| spacy_keywords = set([ent.text for ent in doc.ents]) | |
| spacy_entities = spacy_keywords | |
| print(f"\n\nSpacy Entities: {spacy_entities} \n\n") | |
| # Use Only Spacy Entities | |
| if extract_all is False: | |
| return list(spacy_entities) | |
| # Use RAKE | |
| rake = Rake() | |
| rake.extract_keywords_from_text(text) | |
| rake_keywords = set(rake.get_ranked_phrases()) | |
| print(f"\n\nRake Keywords: {rake_keywords} \n\n") | |
| # Use spaCy for NER and POS tagging | |
| spacy_keywords.update([token.text for token in doc if token.pos_ in ["NOUN", "PROPN", "VERB", "ADJ"]]) | |
| print(f"\n\nSpacy Keywords: {spacy_keywords} \n\n") | |
| # Use TF-IDF | |
| vectorizer = TfidfVectorizer(stop_words='english') | |
| X = vectorizer.fit_transform([text]) | |
| tfidf_keywords = set(vectorizer.get_feature_names_out()) | |
| print(f"\n\nTFIDF Entities: {tfidf_keywords} \n\n") | |
| # Combine all keywords | |
| combined_keywords = rake_keywords.union(spacy_keywords).union(tfidf_keywords) | |
| return list(combined_keywords) | |
| except Exception as e: | |
| raise QuestionGenerationError(f"Error in keyword extraction: {str(e)}") | |
| def get_similar_words_sense2vec(word, n=3): | |
| # Try to find the word with its most likely part-of-speech | |
| word_with_pos = word + "|NOUN" | |
| if word_with_pos in s2v: | |
| similar_words = s2v.most_similar(word_with_pos, n=n) | |
| return [word.split("|")[0] for word, _ in similar_words] | |
| # If not found, try without POS | |
| if word in s2v: | |
| similar_words = s2v.most_similar(word, n=n) | |
| return [word.split("|")[0] for word, _ in similar_words] | |
| return [] | |
| def get_synonyms(word, n=3): | |
| synonyms = [] | |
| for syn in wordnet.synsets(word): | |
| for lemma in syn.lemmas(): | |
| if lemma.name() != word and lemma.name() not in synonyms: | |
| synonyms.append(lemma.name()) | |
| if len(synonyms) == n: | |
| return synonyms | |
| return synonyms | |
| def generate_options(answer, context, n=3): | |
| options = [answer] | |
| # Add contextually relevant words using a pre-trained model | |
| context_embedding = context_model.encode(context) | |
| answer_embedding = context_model.encode(answer) | |
| context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] | |
| # Compute similarity scores and sort context words | |
| similarity_scores = [util.pytorch_cos_sim(context_model.encode(word), answer_embedding).item() for word in context_words] | |
| sorted_context_words = [word for _, word in sorted(zip(similarity_scores, context_words), reverse=True)] | |
| options.extend(sorted_context_words[:n]) | |
| # Try to get similar words based on sense2vec | |
| similar_words = get_similar_words_sense2vec(answer, n) | |
| options.extend(similar_words) | |
| # If we don't have enough options, try synonyms | |
| if len(options) < n + 1: | |
| synonyms = get_synonyms(answer, n - len(options) + 1) | |
| options.extend(synonyms) | |
| # If we still don't have enough options, extract other entities from the context | |
| if len(options) < n + 1: | |
| doc = nlp(context) | |
| entities = [ent.text for ent in doc.ents if ent.text.lower() != answer.lower()] | |
| options.extend(entities[:n - len(options) + 1]) | |
| # If we still need more options, add some random words from the context | |
| if len(options) < n + 1: | |
| context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] | |
| options.extend(random.sample(context_words, min(n - len(options) + 1, len(context_words)))) | |
| print(f"\n\nAll Possible Options: {options}\n\n") | |
| # Ensure we have the correct number of unique options | |
| options = list(dict.fromkeys(options))[:n+1] | |
| # Shuffle the options | |
| random.shuffle(options) | |
| return options | |
| # Function to map keywords to sentences with customizable context window size | |
| def map_keywords_to_sentences(text, keywords, context_window_size): | |
| sentences = sent_tokenize(text) | |
| keyword_sentence_mapping = {} | |
| print(f"\n\nSentences: {sentences}\n\n") | |
| for keyword in keywords: | |
| for i, sentence in enumerate(sentences): | |
| if keyword in sentence: | |
| # Combine current sentence with surrounding sentences for context | |
| start = max(0, i - context_window_size) | |
| end = min(len(sentences), i + context_window_size + 1) | |
| context = ' '.join(sentences[start:end]) | |
| if keyword not in keyword_sentence_mapping: | |
| keyword_sentence_mapping[keyword] = context | |
| else: | |
| keyword_sentence_mapping[keyword] += ' ' + context | |
| return keyword_sentence_mapping | |
| # Function to perform entity linking using Wikipedia API | |
| def entity_linking(keyword): | |
| page = wiki_wiki.page(keyword) | |
| if page.exists(): | |
| return page.fullurl | |
| return None | |
| async def generate_question_async(context, answer, num_beams): | |
| try: | |
| input_text = f"<context> {context} <answer> {answer}" | |
| print(f"\n{input_text}\n") | |
| input_ids = tokenizer.encode(input_text, return_tensors='pt') | |
| outputs = await asyncio.to_thread(model.generate, input_ids, num_beams=num_beams, early_stopping=True, max_length=250) | |
| question = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| print(f"\n{question}\n") | |
| return question | |
| except Exception as e: | |
| raise QuestionGenerationError(f"Error in question generation: {str(e)}") | |
| async def generate_options_async(answer, context, n=3): | |
| try: | |
| options = [answer] | |
| # Add contextually relevant words using a pre-trained model | |
| context_embedding = await asyncio.to_thread(context_model.encode, context) | |
| answer_embedding = await asyncio.to_thread(context_model.encode, answer) | |
| context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] | |
| # Compute similarity scores and sort context words | |
| similarity_scores = [util.pytorch_cos_sim(await asyncio.to_thread(context_model.encode, word), answer_embedding).item() for word in context_words] | |
| sorted_context_words = [word for _, word in sorted(zip(similarity_scores, context_words), reverse=True)] | |
| options.extend(sorted_context_words[:n]) | |
| # Try to get similar words based on sense2vec | |
| similar_words = await asyncio.to_thread(get_similar_words_sense2vec, answer, n) | |
| options.extend(similar_words) | |
| # If we don't have enough options, try synonyms | |
| if len(options) < n + 1: | |
| synonyms = await asyncio.to_thread(get_synonyms, answer, n - len(options) + 1) | |
| options.extend(synonyms) | |
| # Ensure we have the correct number of unique options | |
| options = list(dict.fromkeys(options))[:n+1] | |
| # Shuffle the options | |
| random.shuffle(options) | |
| return options | |
| except Exception as e: | |
| raise QuestionGenerationError(f"Error in generating options: {str(e)}") | |
| # Function to generate questions using beam search | |
| async def generate_questions_async(text, num_questions, context_window_size, num_beams, extract_all_keywords): | |
| try: | |
| batches = segment_text(text) | |
| keywords = extract_keywords(text, extract_all_keywords) | |
| all_questions = [] | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| for i, batch in enumerate(batches): | |
| status_text.text(f"Processing batch {i+1} of {len(batches)}...") | |
| batch_questions = await process_batch(batch, keywords, context_window_size, num_beams) | |
| all_questions.extend(batch_questions) | |
| progress_bar.progress((i + 1) / len(batches)) | |
| if len(all_questions) >= num_questions: | |
| break | |
| progress_bar.empty() | |
| status_text.empty() | |
| return all_questions[:num_questions] | |
| except QuestionGenerationError as e: | |
| st.error(f"An error occurred during question generation: {str(e)}") | |
| return [] | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)}") | |
| return [] | |
| async def process_batch(batch, keywords, context_window_size, num_beams): | |
| questions = [] | |
| for text in batch: | |
| keyword_sentence_mapping = map_keywords_to_sentences(text, keywords, context_window_size) | |
| for keyword, context in keyword_sentence_mapping.items(): | |
| question = await generate_question_async(context, keyword, num_beams) | |
| options = await generate_options_async(keyword, context) | |
| overall_score, relevance_score, complexity_score, spelling_correctness = assess_question_quality(context, question, keyword) | |
| if overall_score >= 0.5: | |
| questions.append({ | |
| "question": question, | |
| "context": context, | |
| "answer": keyword, | |
| "options": options, | |
| "overall_score": overall_score, | |
| "relevance_score": relevance_score, | |
| "complexity_score": complexity_score, | |
| "spelling_correctness": spelling_correctness, | |
| }) | |
| return questions | |
| # Function to export questions to CSV | |
| def export_to_csv(data): | |
| # df = pd.DataFrame(data, columns=["Context", "Answer", "Question", "Options"]) | |
| df = pd.DataFrame(data) | |
| # csv = df.to_csv(index=False,encoding='utf-8') | |
| csv = df.to_csv(index=False) | |
| return csv | |
| # Function to export questions to PDF | |
| def export_to_pdf(data): | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_font("Arial", size=12) | |
| for item in data: | |
| pdf.multi_cell(0, 10, f"Context: {item['context']}") | |
| pdf.multi_cell(0, 10, f"Question: {item['question']}") | |
| pdf.multi_cell(0, 10, f"Answer: {item['answer']}") | |
| pdf.multi_cell(0, 10, f"Options: {', '.join(item['options'])}") | |
| pdf.multi_cell(0, 10, f"Overall Score: {item['overall_score']:.2f}") | |
| pdf.ln(10) | |
| return pdf.output(dest='S').encode('latin-1') | |
| def display_word_cloud(generated_questions): | |
| word_frequency = {} | |
| for question in generated_questions: | |
| words = question.split() | |
| for word in words: | |
| word_frequency[word] = word_frequency.get(word, 0) + 1 | |
| wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(word_frequency) | |
| plt.figure(figsize=(10, 5)) | |
| plt.imshow(wordcloud, interpolation='bilinear') | |
| plt.axis('off') | |
| st.pyplot() | |
| def assess_question_quality(context, question, answer): | |
| # Assess relevance using cosine similarity | |
| context_doc = nlp(context) | |
| question_doc = nlp(question) | |
| relevance_score = context_doc.similarity(question_doc) | |
| # Assess complexity using token length (as a simple metric) | |
| complexity_score = min(len(question_doc) / 20, 1) # Normalize to 0-1 | |
| # Assess Spelling correctness | |
| misspelled = spell.unknown(question.split()) | |
| spelling_correctness = 1 - (len(misspelled) / len(question.split())) # Normalize to 0-1 | |
| # Calculate overall score (you can adjust weights as needed) | |
| overall_score = ( | |
| 0.4 * relevance_score + | |
| 0.4 * complexity_score + | |
| 0.2 * spelling_correctness | |
| ) | |
| return overall_score, relevance_score, complexity_score, spelling_correctness | |
| def main(): | |
| # Streamlit interface | |
| st.title(":blue[Question Generator System]") | |
| session_id = get_session_id() | |
| state = initialize_state(session_id) | |
| with st.sidebar: | |
| show_info = st.toggle('Show Info',True) | |
| if show_info: | |
| display_info() | |
| st.subheader("Customization Options") | |
| # Customization options | |
| input_type = st.radio("Select Input Preference", ("Text Input","Upload PDF")) | |
| with st.expander("Choose the Additional Elements to show"): | |
| show_context = st.checkbox("Context",True) | |
| show_answer = st.checkbox("Answer",True) | |
| show_options = st.checkbox("Options",False) | |
| show_entity_link = st.checkbox("Entity Link For Wikipedia",True) | |
| show_qa_scores = st.checkbox("QA Score",False) | |
| num_beams = st.slider("Select number of beams for question generation", min_value=2, max_value=10, value=2) | |
| context_window_size = st.slider("Select context window size (number of sentences before and after)", min_value=1, max_value=5, value=1) | |
| num_questions = st.slider("Select number of questions to generate", min_value=1, max_value=1000, value=5) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| extract_all_keywords = st.toggle("Extract Max Keywords",value=False) | |
| with col2: | |
| enable_feedback_mode = st.toggle("Enable Feedback Mode",False) | |
| text = None | |
| if input_type == "Text Input": | |
| text = st.text_area("Enter text here:", value="Joe Biden, the current US president is on a weak wicket going in for his reelection later this November against former President Donald Trump.", help="Enter or paste your text here") | |
| elif input_type == "Upload PDF": | |
| file = st.file_uploader("Upload PDF Files") | |
| if file is not None: | |
| try: | |
| text = get_pdf_text(file) | |
| except Exception as e: | |
| st.error(f"Error reading PDF file: {str(e)}") | |
| text = None | |
| if text: | |
| text = clean_text(text) | |
| generate_questions_button = st.button("Generate Questions") | |
| st.markdown('<span aria-label="Generate questions button">Above is the generate questions button</span>', unsafe_allow_html=True) | |
| # if generate_questions_button: | |
| if generate_questions_button and text: | |
| start_time = time.time() | |
| with st.spinner("Generating questions..."): | |
| try: | |
| state['generated_questions'] = asyncio.run(generate_questions_async(text, num_questions, context_window_size, num_beams, extract_all_keywords)) | |
| if not state['generated_questions']: | |
| st.warning("No questions were generated. The text might be too short or lack suitable content.") | |
| else: | |
| st.success(f"Successfully generated {len(state['generated_questions'])} questions!") | |
| except QuestionGenerationError as e: | |
| st.error(f"An error occurred during question generation: {str(e)}") | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)}") | |
| print("\n\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\n") | |
| data = get_state(session_id) | |
| print(data) | |
| end_time = time.time() | |
| print(f"Time Taken to generate: {end_time-start_time}") | |
| set_state(session_id, 'generated_questions', state['generated_questions']) | |
| # sort question based on their quality score | |
| state['generated_questions'] = sorted(state['generated_questions'],key = lambda x: x['overall_score'], reverse=True) | |
| # Display generated questions | |
| if state['generated_questions']: | |
| st.header("Generated Questions:",divider='blue') | |
| for i, q in enumerate(state['generated_questions']): | |
| st.subheader(body=f":orange[Q{i+1}:] {q['question']}") | |
| if show_context is True: | |
| st.write(f"**Context:** {q['context']}") | |
| if show_answer is True: | |
| st.write(f"**Answer:** {q['answer']}") | |
| if show_options is True: | |
| st.write(f"**Options:**") | |
| for j, option in enumerate(q['options']): | |
| st.write(f"{chr(65+j)}. {option}") | |
| if show_entity_link is True: | |
| linked_entity = entity_linking(q['answer']) | |
| if linked_entity: | |
| st.write(f"**Entity Link:** {linked_entity}") | |
| if show_qa_scores is True: | |
| m1,m2,m3,m4 = st.columns([1.7,1,1,1]) | |
| m1.metric("Overall Quality Score", value=f"{q['overall_score']:,.2f}") | |
| m2.metric("Relevance Score", value=f"{q['relevance_score']:,.2f}") | |
| m3.metric("Complexity Score", value=f"{q['complexity_score']:,.2f}") | |
| m4.metric("Spelling Correctness", value=f"{q['spelling_correctness']:,.2f}") | |
| # q['context'] = st.text_area(f"Edit Context {i+1}:", value=q['context'], key=f"context_{i}") | |
| if enable_feedback_mode: | |
| q['question'] = st.text_input(f"Edit Question {i+1}:", value=q['question'], key=f"question_{i}") | |
| q['rating'] = st.select_slider(f"Rate this question (1-5)", options=[1, 2, 3, 4, 5], key=f"rating_{i}") | |
| if st.button(f"Submit Feedback for Question {i+1}", key=f"submit_{i}"): | |
| feedback_file=save_feedback(q['question'], q['answer'], q['rating'], q['options'], q['context']) | |
| st.success(f"Feedback submitted for Question {i+1}") | |
| pswd = st.secrets['EMAIL_PASSWORD'] | |
| send_email_with_attachment( | |
| email_subject='feedback from QGen', | |
| email_body='Please find the attached feedback JSON file.', | |
| recipient_emails=['[email protected]', '[email protected]'], | |
| sender_email='[email protected]', | |
| sender_password=pswd, | |
| attachment_path=feedback_file) | |
| st.write("Feedback sent to admin") | |
| st.write("---") | |
| # Export buttons | |
| # if st.session_state.generated_questions: | |
| if state['generated_questions']: | |
| with st.sidebar: | |
| csv_data = export_to_csv(state['generated_questions']) | |
| st.download_button(label="Download CSV", data=csv_data, file_name='questions.csv', mime='text/csv') | |
| pdf_data = export_to_pdf(state['generated_questions']) | |
| st.download_button(label="Download PDF", data=pdf_data, file_name='questions.pdf', mime='application/pdf') | |
| with st.expander("View Visualizations"): | |
| questions = [tpl['question'] for tpl in state['generated_questions']] | |
| overall_scores = [tpl['overall_score'] for tpl in state['generated_questions']] | |
| st.subheader('WordCloud of Questions',divider='rainbow') | |
| display_word_cloud(questions) | |
| st.subheader('Overall Scores',divider='violet') | |
| overall_scores = pd.DataFrame(overall_scores,columns=['Overall Scores']) | |
| st.line_chart(overall_scores) | |
| # View Feedback Statistics | |
| with st.expander("View Feedback Statistics"): | |
| feedback_file = 'question_feedback.json' | |
| if os.path.exists(feedback_file): | |
| with open(feedback_file, 'r') as f: | |
| feedback_data = json.load(f) | |
| st.subheader("Feedback Statistics") | |
| # Calculate average rating | |
| ratings = [feedback['rating'] for feedback in feedback_data] | |
| avg_rating = sum(ratings) / len(ratings) if ratings else 0 | |
| st.write(f"Average Question Rating: {avg_rating:.2f}") | |
| # Show distribution of ratings | |
| rating_counts = {i: ratings.count(i) for i in range(1, 6)} | |
| st.bar_chart(rating_counts) | |
| # Show some highly rated questions | |
| st.subheader("Highly Rated Questions") | |
| sorted_feedback = sorted(feedback_data, key=lambda x: x['rating'], reverse=True) | |
| top_questions = sorted_feedback[:5] | |
| for feedback in top_questions: | |
| st.write(f"Question: {feedback['question']}") | |
| st.write(f"Answer: {feedback['answer']}") | |
| st.write(f"Rating: {feedback['rating']}") | |
| st.write("---") | |
| else: | |
| st.write("No feedback data available yet.") | |
| print("********************************************************************************") | |
| if __name__ == '__main__': | |
| try: | |
| main() | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred: {str(e)}") | |
| st.error("Please try refreshing the page. If the problem persists, contact support.") |