|
from datetime import datetime, timedelta |
|
from typing import Dict, List |
|
from bson import ObjectId |
|
import pandas as pd |
|
import streamlit as st |
|
import json |
|
import google.generativeai as genai |
|
from dotenv import load_dotenv |
|
import os |
|
from pymongo import MongoClient |
|
import plotly.express as px |
|
|
|
load_dotenv() |
|
MONGO_URI = os.getenv('MONGO_URI') |
|
GEMINI_API_KEY = os.getenv('GEMINI_KEY') |
|
genai.configure(api_key=GEMINI_API_KEY) |
|
model = genai.GenerativeModel("gemini-1.5-flash") |
|
|
|
client = MongoClient(MONGO_URI) |
|
db = client['novascholar_db'] |
|
courses_collection = db['courses'] |
|
|
|
|
|
def generate_interval_questions(context: str, num_questions: int) -> List[Dict]: |
|
"""Generate all interval questions at once""" |
|
prompt = f""" |
|
Generate {num_questions} quick check-in questions based on the following context. |
|
Return ONLY a JSON array with this exact structure: |
|
[ |
|
{{ |
|
"question_text": "Your question here", |
|
"type": "mcq OR short_answer", |
|
"options": ["option1", "option2", "option3", "option4"], |
|
"correct_option": "correct answer", |
|
"explanation": "brief explanation" |
|
}} |
|
] |
|
|
|
Requirements: |
|
- Each question should be answerable in 30-60 seconds |
|
- Test understanding of different aspects of the material |
|
- For MCQ, include exactly 4 options |
|
- Questions should build in complexity |
|
- **MAKE SURE THE NUMBER OF SHORT ANSWER QUESTIONS ARE MORE THAN MCQS** |
|
- **MAKE SURE THE NUMBER OF MCQS DOES NOT EXCEED 2** |
|
|
|
Context: {context} |
|
""" |
|
try: |
|
response = model.generate_content( |
|
prompt, |
|
generation_config=genai.GenerationConfig( |
|
temperature=0.3, |
|
response_mime_type="application/json" |
|
) |
|
) |
|
|
|
if not response.text: |
|
raise ValueError("Empty response from model") |
|
|
|
questions = json.loads(response.text) |
|
return questions |
|
|
|
except Exception as e: |
|
st.error(f"Error generating questions: {e}") |
|
return None |
|
|
|
class IntervalQuestionManager: |
|
def __init__(self, session_id: str, course_id: str): |
|
self.session_id = session_id |
|
self.course_id = course_id |
|
self.questions = [] |
|
self.current_index = 0 |
|
self.interval_minutes = 10 |
|
|
|
self.start_time = None |
|
|
|
self._initialize_from_db() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _initialize_from_db(self): |
|
"""Load existing questions and settings from database""" |
|
try: |
|
session_data = courses_collection.find_one( |
|
{ |
|
"course_id": self.course_id, |
|
"sessions.session_id": self.session_id |
|
}, |
|
{"sessions.$": 1} |
|
) |
|
|
|
if session_data and session_data.get('sessions'): |
|
session = session_data['sessions'][0] |
|
interval_data = session.get('in_class', {}).get('interval_questions', {}) |
|
|
|
if interval_data: |
|
self.questions = interval_data.get('questions', []) |
|
self.interval_minutes = interval_data.get('interval_minutes', 10) |
|
self.start_time = interval_data.get('start_time') |
|
print(f"Loaded {len(self.questions)} questions, interval: {self.interval_minutes} mins") |
|
|
|
except Exception as e: |
|
st.error(f"Error initializing from database: {e}") |
|
|
|
def initialize_questions(self, context: str, num_questions: int, interval_minutes: int): |
|
if self.questions: |
|
return True |
|
|
|
"""Initialize all questions and save to database""" |
|
questions = generate_interval_questions(context, num_questions) |
|
if not questions: |
|
return False |
|
|
|
self.questions = [] |
|
self.interval_minutes = interval_minutes |
|
|
|
for i, q in enumerate(questions): |
|
question_doc = { |
|
"_id": ObjectId(), |
|
"question_text": q["question_text"], |
|
"type": q["type"], |
|
"options": q.get("options", []), |
|
"correct_option": q.get("correct_option", ""), |
|
"explanation": q.get("explanation", ""), |
|
"display_after": i * interval_minutes, |
|
"active": False, |
|
"submissions": [] |
|
} |
|
self.questions.append(question_doc) |
|
|
|
|
|
result = courses_collection.update_one( |
|
{ |
|
"course_id": self.course_id, |
|
"sessions.session_id": self.session_id |
|
}, |
|
{ |
|
"$set": { |
|
"sessions.$.in_class.interval_questions": { |
|
"questions": self.questions, |
|
"interval_minutes": self.interval_minutes, |
|
"start_time": None |
|
} |
|
} |
|
} |
|
) |
|
|
|
return result.modified_count > 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def start_questions(self): |
|
"""Start the interval questions""" |
|
try: |
|
self.start_time = datetime.now() |
|
|
|
result = courses_collection.update_one( |
|
{ |
|
"course_id": self.course_id, |
|
"sessions.session_id": self.session_id |
|
}, |
|
{ |
|
"$set": { |
|
"sessions.$.in_class.interval_questions.start_time": self.start_time |
|
} |
|
} |
|
) |
|
|
|
if result.modified_count > 0: |
|
st.success("Questions started successfully!") |
|
else: |
|
st.error("Could not start questions. Please try again.") |
|
|
|
except Exception as e: |
|
st.error(f"Error starting questions: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_current_question(self) -> Dict: |
|
"""Get current question based on elapsed time""" |
|
try: |
|
if not self.start_time or not self.questions: |
|
return None |
|
|
|
elapsed_minutes = (datetime.now() - self.start_time).total_seconds() / 60 |
|
question_index = int(elapsed_minutes // self.interval_minutes) |
|
|
|
|
|
print(f"Elapsed minutes: {elapsed_minutes}") |
|
print(f"Question index: {question_index}") |
|
print(f"Total questions: {len(self.questions)}") |
|
print(f"Interval minutes: {self.interval_minutes}") |
|
|
|
if 0 <= question_index < len(self.questions): |
|
return self.questions[question_index] |
|
return None |
|
|
|
except Exception as e: |
|
st.error(f"Error getting current question: {e}") |
|
return None |
|
|
|
def display_response_distribution(course_id: str, session_id: str, question_id: str): |
|
"""Display real-time distribution of student responses""" |
|
try: |
|
session_data = courses_collection.find_one( |
|
{ |
|
"course_id": course_id, |
|
"sessions.session_id": session_id |
|
}, |
|
{"sessions.$": 1} |
|
) |
|
|
|
if not session_data or not session_data.get('sessions'): |
|
st.warning("No responses found.") |
|
return |
|
|
|
session = session_data['sessions'][0] |
|
question = next( |
|
(q for q in session['in_class']['interval_questions']['questions'] |
|
if str(q['_id']) == question_id), |
|
None |
|
) |
|
|
|
if not question: |
|
st.warning("Question not found.") |
|
return |
|
|
|
submissions = question.get('submissions', []) |
|
|
|
if not submissions: |
|
st.info("No responses received yet.") |
|
return |
|
|
|
|
|
if question['type'] == 'mcq': |
|
responses = {} |
|
for submission in submissions: |
|
answer = submission['answer'] |
|
responses[answer] = responses.get(answer, 0) + 1 |
|
|
|
|
|
df = pd.DataFrame(list(responses.items()), columns=['Option', 'Count']) |
|
df['Percentage'] = df['Count'] / len(submissions) * 100 |
|
|
|
|
|
total_responses = len(submissions) |
|
st.metric("Total Responses", total_responses) |
|
|
|
|
|
fig = px.bar(df, |
|
x='Option', |
|
y='Count', |
|
title='Response Distribution', |
|
text='Percentage') |
|
fig.update_traces(texttemplate='%{text:.1f}%', textposition='outside') |
|
st.plotly_chart(fig) |
|
|
|
|
|
st.dataframe(df) |
|
|
|
else: |
|
st.markdown("##### Responses Received") |
|
|
|
|
|
|
|
st.metric("Total Responses", len(submissions)) |
|
|
|
except Exception as e: |
|
st.error(f"Error displaying responses: {str(e)}") |
|
|
|
def display_interval_question(course_id, session, question: Dict, user_type: str): |
|
"""Display the interval question with different views for faculty/students""" |
|
if not question: |
|
return |
|
|
|
st.markdown("#### Quick Questions ⚡") |
|
|
|
try: |
|
if user_type == "faculty": |
|
with st.container(): |
|
st.markdown("##### Current Active Question") |
|
st.markdown(f"**Question:** {question['question_text']}") |
|
if question['type'] == 'mcq': |
|
display_response_distribution(course_id, session['session_id'], str(question['_id'])) |
|
|
|
|
|
col1, col2 = st.columns([3,1]) |
|
with col2: |
|
if st.button("End Question"): |
|
end_question_period(course_id, session['session_id'], str(question['_id'])) |
|
st.rerun() |
|
|
|
|
|
st.markdown("##### All Questions Overview") |
|
|
|
|
|
session_data = courses_collection.find_one( |
|
{ |
|
"course_id": course_id, |
|
"sessions.session_id": session['session_id'] |
|
}, |
|
{"sessions.$": 1} |
|
) |
|
|
|
if session_data and session_data.get('sessions'): |
|
interval_questions = session_data['sessions'][0].get('in_class', {}).get('interval_questions', {}) |
|
all_questions = interval_questions.get('questions', []) |
|
|
|
for idx, q in enumerate(all_questions, 1): |
|
with st.expander(f"Question {idx}", expanded=False): |
|
st.markdown(f"**Type:** {q['type'].upper()}") |
|
st.markdown(f"**Question:** {q['question_text']}") |
|
|
|
if q['type'] == 'mcq': |
|
st.markdown("**Options:**") |
|
for opt in q['options']: |
|
if opt == q['correct_option']: |
|
st.markdown(f"✅ {opt}") |
|
else: |
|
st.markdown(f"- {opt}") |
|
|
|
st.markdown(f"**Explanation:** {q['explanation']}") |
|
|
|
|
|
display_time = timedelta(minutes=q['display_after']) |
|
st.info(f"Will be displayed after: {display_time}") |
|
|
|
|
|
submission_count = len(q.get('submissions', [])) |
|
if submission_count > 0: |
|
st.metric("Responses received", submission_count) |
|
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
existing_submission = _check_existing_submission( |
|
course_id, |
|
session['session_id'], |
|
str(question['_id']), |
|
st.session_state.user_id |
|
) |
|
if existing_submission: |
|
st.success("Response submitted!") |
|
st.markdown(f"Your answer: **{existing_submission['answer']}**") |
|
|
|
else: |
|
with st.form(key=f"question_form_{question['_id']}"): |
|
st.markdown(f"**Question:** {question['question_text']}") |
|
|
|
if question['type'] == 'mcq': |
|
response = st.radio( |
|
"Select your answer:", |
|
options=question['options'], |
|
key=f"mcq_{question['_id']}" |
|
) |
|
else: |
|
response = st.text_area( |
|
"Your answer:", |
|
key=f"text_{question['_id']}" |
|
) |
|
|
|
if st.form_submit_button("Submit"): |
|
if response: |
|
submit_response( |
|
course_id, |
|
session['session_id'], |
|
str(question['_id']), |
|
st.session_state.user_id, |
|
response |
|
) |
|
st.rerun() |
|
else: |
|
st.error("Please provide an answer before submitting.") |
|
|
|
except Exception as e: |
|
st.error(f"Error displaying question: {str(e)}") |
|
|
|
def _check_existing_submission(course_id: str, session_id: str, question_id: str, student_id: str) -> Dict: |
|
"""Check if student has already submitted an answer""" |
|
try: |
|
session_data = courses_collection.find_one( |
|
{ |
|
"course_id": course_id, |
|
"sessions.session_id": session_id, |
|
"sessions.in_class.questions._id": ObjectId(question_id) |
|
}, |
|
{"sessions.$": 1} |
|
) |
|
|
|
if session_data and session_data.get('sessions'): |
|
session = session_data['sessions'][0] |
|
question = next( |
|
(q for q in session['in_class']['questions'] |
|
if str(q['_id']) == question_id), |
|
None |
|
) |
|
|
|
if question: |
|
return next( |
|
(sub for sub in question.get('submissions', []) |
|
if sub['student_id'] == student_id), |
|
None |
|
) |
|
return None |
|
except Exception as e: |
|
st.error(f"Error checking submission: {e}") |
|
return None |
|
|
|
def end_question_period(course_id: str, session_id: str, question_id: str): |
|
"""End the question period and mark the question as inactive""" |
|
try: |
|
result = courses_collection.update_one( |
|
{ |
|
"course_id": course_id, |
|
"sessions.session_id": session_id, |
|
"sessions.in_class.questions._id": ObjectId(question_id) |
|
}, |
|
{ |
|
"$set": { |
|
"sessions.$.in_class.questions.$.active": False, |
|
"sessions.$.in_class.questions.$.ended_at": datetime.utcnow() |
|
} |
|
} |
|
) |
|
|
|
if result.modified_count > 0: |
|
st.success("Question period ended successfully!") |
|
st.rerun() |
|
else: |
|
st.warning("Could not end question period. Question may not exist.") |
|
|
|
except Exception as e: |
|
st.error(f"Error ending question period: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def submit_response(course_id: str, session_id: str, question_id: str, student_id: str, response: str): |
|
"""Submit a student's response to a question""" |
|
try: |
|
|
|
print(f"Submitting response for question {question_id}") |
|
print(f"Course: {course_id}, Session: {session_id}") |
|
|
|
|
|
session_doc = courses_collection.find_one( |
|
{ |
|
"course_id": course_id, |
|
"sessions.session_id": session_id |
|
}, |
|
{"sessions.$": 1} |
|
) |
|
|
|
if not session_doc: |
|
print("Session document not found") |
|
st.error("Session not found") |
|
return |
|
|
|
session = session_doc['sessions'][0] |
|
|
|
|
|
interval_questions = session.get('in_class', {}).get('interval_questions', {}) |
|
if not interval_questions: |
|
print("No interval questions found") |
|
st.error("No questions found") |
|
return |
|
|
|
questions = interval_questions.get('questions', []) |
|
|
|
|
|
question = next( |
|
(q for q in questions if str(q['_id']) == question_id), |
|
None |
|
) |
|
if not question: |
|
print(f"Question {question_id} not found in questions list") |
|
st.error("Question not found") |
|
return |
|
|
|
|
|
if any(sub['student_id'] == student_id for sub in question.get('submissions', [])): |
|
st.warning("You have already submitted an answer for this question.") |
|
return |
|
|
|
|
|
result = courses_collection.update_one( |
|
{ |
|
"course_id": course_id, |
|
"sessions.session_id": session_id, |
|
"sessions.in_class.interval_questions.questions._id": ObjectId(question_id) |
|
}, |
|
{ |
|
"$push": { |
|
"sessions.$[ses].in_class.interval_questions.questions.$[q].submissions": { |
|
"student_id": student_id, |
|
"answer": response, |
|
"submitted_at": datetime.utcnow() |
|
} |
|
} |
|
}, |
|
array_filters=[ |
|
{"ses.session_id": session_id}, |
|
{"q._id": ObjectId(question_id)} |
|
] |
|
) |
|
print(f"Update result: {result.modified_count}") |
|
|
|
if result.modified_count > 0: |
|
st.success("Response submitted successfully!") |
|
st.rerun() |
|
else: |
|
st.error("Could not submit response. Please try again.") |
|
|
|
except Exception as e: |
|
print(f"Error details: {str(e)}") |
|
st.error(f"Error submitting response: {str(e)}") |