# import ast | |
# from pymongo import MongoClient | |
# from datetime import datetime | |
# import openai | |
# import google.generativeai as genai | |
# from google.generativeai import GenerativeModel | |
# from dotenv import load_dotenv | |
# import os | |
# from file_upload_vectorize import resources_collection, vectors_collection, courses_collection2, faculty_collection | |
# # Load environment variables | |
# load_dotenv() | |
# MONGO_URI = os.getenv('MONGO_URI') | |
# OPENAI_KEY = os.getenv('OPENAI_KEY') | |
# GEMINI_KEY = os.getenv('GEMINI_KEY') | |
# # Configure APIs | |
# openai.api_key = OPENAI_KEY | |
# genai.configure(api_key=GEMINI_KEY) | |
# model = genai.GenerativeModel('gemini-pro') | |
# # Connect to MongoDB | |
# client = MongoClient(MONGO_URI) | |
# db = client['novascholar_db'] | |
# quizzes_collection = db["quizzes"] | |
# def strip_code_markers(response_text): | |
# """Strip off the markers ``` and python from a LLM model's response""" | |
# if response_text.startswith("```python"): | |
# response_text = response_text[len("```python"):].strip() | |
# if response_text.startswith("```"): | |
# response_text = response_text[len("```"):].strip() | |
# if response_text.endswith("```"): | |
# response_text = response_text[:-len("```")].strip() | |
# return response_text | |
# # New function to generate MCQs using Gemini | |
# def generate_mcqs(context, num_questions, session_title, session_description): | |
# """Generate MCQs either from context or session details""" | |
# try: | |
# # Initialize Gemini model | |
# if context: | |
# prompt = f""" | |
# Based on the following content, generate {num_questions} multiple choice questions. | |
# Format each question as a Python dictionary with the following structure: | |
# {{ | |
# "question": "Question text here", | |
# "options": ["A) option1", "B) option2", "C) option3", "D) option4"], | |
# "correct_option": "A) option1" or "B) option2" or "C) option3" or "D) option4" | |
# }} | |
# Content: | |
# {context} | |
# Generate challenging but clear questions that test understanding of key concepts. | |
# Return only the Python list of dictionaries. | |
# """ | |
# else: | |
# prompt = f""" | |
# Generate {num_questions} multiple choice questions about the topic: | |
# Title: {session_title} | |
# Description: {session_description} | |
# Format each question as a Python dictionary with the following structure: | |
# {{ | |
# "question": "Question text here", | |
# "options": ["A) option1", "B) option2", "C) option3", "D) option4"], | |
# "correct_option": "A" or "B" or "C" or "D" | |
# }} | |
# Generate challenging but clear questions. | |
# Return only the Python list of dictionaries without any additional formatting or markers | |
# Do not write any other text, do not start the response with (```python), do not end the response with backticks(```) | |
# A Sample response should look like this: Response Text: [ | |
# { | |
# "question": "Which of the following is NOT a valid data type in C++?", | |
# "options": ["int", "double", "boolean", "char"], | |
# "correct_option": "C" | |
# } | |
# ] (Notice that there are no backticks(```) around the response and no (```python)) | |
# . | |
# """ | |
# response = model.generate_content(prompt) | |
# response_text = response.text.strip() | |
# print("Response Text:", response_text) | |
# modified_response_text = strip_code_markers(response_text) | |
# print("Response Text Modified to:", modified_response_text) | |
# # Extract and parse the response to get the list of MCQs | |
# mcqs = ast.literal_eval(modified_response_text) # Be careful with eval, consider using ast.literal_eval for production | |
# print(mcqs) | |
# if not mcqs: | |
# raise ValueError("No questions generated") | |
# return mcqs | |
# except Exception as e: | |
# print(f"Error generating MCQs: , error: {e}") | |
# return None | |
# # New function to save quiz to database | |
# def save_quiz(course_id, session_id, title, questions, user_id): | |
# """Save quiz to database""" | |
# try: | |
# quiz_data = { | |
# "user_id": user_id, | |
# "course_id": course_id, | |
# "session_id": session_id, | |
# "title": title, | |
# "questions": questions, | |
# "created_at": datetime.utcnow(), | |
# "status": "active", | |
# "submissions": [] | |
# } | |
# result = quizzes_collection.insert_one(quiz_data) | |
# return result.inserted_id | |
# except Exception as e: | |
# print(f"Error saving quiz: {e}") | |
# return None | |
# def get_student_quiz_score(quiz_id, student_id): | |
# """Get student's score for a specific quiz""" | |
# quiz = quizzes_collection.find_one( | |
# { | |
# "_id": quiz_id, | |
# "submissions.student_id": student_id | |
# }, | |
# {"submissions.$": 1} | |
# ) | |
# if quiz and quiz.get('submissions'): | |
# return quiz['submissions'][0].get('score') | |
# return None | |
# # def submit_quiz_answers(quiz_id, student_id, student_answers): | |
# # """Submit and score student's quiz answers""" | |
# # quiz = quizzes_collection.find_one({"_id": quiz_id}) | |
# # if not quiz: | |
# # return None | |
# # # Calculate score | |
# # correct_answers = 0 | |
# # total_questions = len(quiz['questions']) | |
# # for q_idx, question in enumerate(quiz['questions']): | |
# # if student_answers.get(str(q_idx)) == question['correct_option']: | |
# # correct_answers += 1 | |
# # score = (correct_answers / total_questions) * 100 | |
# # # Store submission | |
# # submission_data = { | |
# # "student_id": student_id, | |
# # "answers": student_answers, | |
# # "score": score, | |
# # "submitted_at": datetime.utcnow() | |
# # } | |
# # # Update quiz with submission | |
# # quizzes_collection.update_one( | |
# # {"_id": quiz_id}, | |
# # { | |
# # "$push": {"submissions": submission_data} | |
# # } | |
# # ) | |
# # return score | |
# def submit_quiz_answers(quiz_id, student_id, student_answers): | |
# """Submit and score student's quiz answers""" | |
# try: | |
# quiz = quizzes_collection.find_one({"_id": quiz_id}) | |
# if not quiz: | |
# return None | |
# # Calculate score | |
# correct_answers = 0 | |
# total_questions = len(quiz['questions']) | |
# for q_idx, question in enumerate(quiz['questions']): | |
# student_answer = student_answers.get(str(q_idx)) | |
# if student_answer: # Only check if answer was provided | |
# # Extract the option letter (A, B, C, D) from the full answer string | |
# answer_letter = student_answer.split(')')[0].strip() | |
# if answer_letter == question['correct_option']: | |
# correct_answers += 1 | |
# score = (correct_answers / total_questions) * 100 | |
# # Store submission | |
# submission_data = { | |
# "student_id": student_id, | |
# "answers": student_answers, | |
# "score": score, | |
# "submitted_at": datetime.utcnow() | |
# } | |
# # Update quiz with submission | |
# result = quizzes_collection.update_one( | |
# {"_id": quiz_id}, | |
# {"$push": {"submissions": submission_data}} | |
# ) | |
# return score if result.modified_count > 0 else None | |
# except Exception as e: | |
# print(f"Error submitting quiz: {e}") | |
# return None | |
import ast | |
from pymongo import MongoClient | |
from datetime import datetime | |
import openai | |
import google.generativeai as genai | |
from google.generativeai import GenerativeModel | |
from dotenv import load_dotenv | |
import os | |
from file_upload_vectorize import resources_collection, vectors_collection, courses_collection2, faculty_collection | |
# Load environment variables | |
load_dotenv() | |
MONGO_URI = os.getenv('MONGO_URI') | |
OPENAI_KEY = os.getenv('OPENAI_KEY') | |
GEMINI_KEY = os.getenv('GEMINI_KEY') | |
# Configure APIs | |
openai.api_key = OPENAI_KEY | |
genai.configure(api_key=GEMINI_KEY) | |
model = genai.GenerativeModel('gemini-pro') | |
# Connect to MongoDB | |
client = MongoClient(MONGO_URI) | |
db = client['novascholar_db'] | |
quizzes_collection = db["quizzes"] | |
surprise_quizzes_collection = db["surprise_quizzes"] | |
def strip_code_markers(response_text): | |
"""Strip off the markers ``` and python from a LLM model's response""" | |
if response_text.startswith("```python"): | |
response_text = response_text[len("```python"):].strip() | |
if response_text.startswith("```"): | |
response_text = response_text[len("```"):].strip() | |
if response_text.endswith("```"): | |
response_text = response_text[:-len("```")].strip() | |
return response_text | |
# New function to generate MCQs using Gemini | |
def generate_mcqs(context, num_questions, session_title, session_description): | |
"""Generate MCQs either from context or session details""" | |
try: | |
# Initialize Gemini model | |
if context: | |
prompt = f""" | |
Based on the following content, generate {num_questions} multiple choice questions. | |
Format each question as a Python dictionary with the following structure: | |
{{ | |
"question": "Question text here", | |
"options": ["A) option1", "B) option2", "C) option3", "D) option4"], | |
"correct_option": "A) option1" or "B) option2" or "C) option3" or "D) option4" | |
}} | |
Content: | |
{context} | |
Generate challenging but clear questions that test understanding of key concepts. | |
Return only the Python list of dictionaries. | |
""" | |
else: | |
prompt = f""" | |
Generate {num_questions} multiple choice questions about the topic: | |
Title: {session_title} | |
Description: {session_description} | |
Format each question as a Python dictionary with the following structure: | |
{{ | |
"question": "Question text here", | |
"options": ["A) option1", "B) option2", "C) option3", "D) option4"], | |
"correct_option": "A" or "B" or "C" or "D" | |
}} | |
Generate challenging but clear questions. | |
Return only the Python list of dictionaries without any additional formatting or markers | |
Do not write any other text, do not start the response with (```python), do not end the response with backticks(```) | |
A Sample response should look like this: Response Text: [ | |
{ | |
"question": "Which of the following is NOT a valid data type in C++?", | |
"options": ["int", "double", "boolean", "char"], | |
"correct_option": "C" | |
} | |
] (Notice that there are no backticks(```) around the response and no (```python)) | |
. | |
""" | |
response = model.generate_content(prompt) | |
response_text = response.text.strip() | |
print("Response Text:", response_text) | |
modified_response_text = strip_code_markers(response_text) | |
print("Response Text Modified to:", modified_response_text) | |
# Extract and parse the response to get the list of MCQs | |
mcqs = ast.literal_eval(modified_response_text) # Be careful with eval, consider using ast.literal_eval for production | |
print(mcqs) | |
if not mcqs: | |
raise ValueError("No questions generated") | |
return mcqs | |
except Exception as e: | |
print(f"Error generating MCQs: , error: {e}") | |
return None | |
# New function to save quiz to database | |
def save_quiz(course_id, session_id, title, questions, user_id): | |
"""Save quiz to database""" | |
try: | |
quiz_data = { | |
"user_id": user_id, | |
"course_id": course_id, | |
"session_id": session_id, | |
"title": title, | |
"questions": questions, | |
"created_at": datetime.utcnow(), | |
"status": "active", | |
"submissions": [] | |
} | |
result = quizzes_collection.insert_one(quiz_data) | |
return result.inserted_id | |
except Exception as e: | |
print(f"Error saving quiz: {e}") | |
return None | |
def save_surprise_quiz(course_id, session_id, title, questions, user_id, no_minutes): | |
"""Save quiz to database""" | |
try: | |
quiz_data = { | |
"user_id": user_id, | |
"course_id": course_id, | |
"session_id": session_id, | |
"title": title, | |
"questions": questions, | |
"created_at": datetime.now(), | |
"status": "active", | |
"submissions": [], | |
"no_minutes": no_minutes | |
} | |
result = surprise_quizzes_collection.insert_one(quiz_data) | |
return result.inserted_id | |
except Exception as e: | |
print(f"Error saving quiz: {e}") | |
return None | |
def get_student_quiz_score(quiz_id, student_id): | |
"""Get student's score for a specific quiz""" | |
quiz = quizzes_collection.find_one( | |
{ | |
"_id": quiz_id, | |
"submissions.student_id": student_id | |
}, | |
{"submissions.$": 1} | |
) | |
if quiz and quiz.get('submissions'): | |
return quiz['submissions'][0].get('score') | |
return None | |
def get_student_surprise_quiz_score(quiz_id, student_id): | |
"""Get student's score for a specific quiz""" | |
quiz = surprise_quizzes_collection.find_one( | |
{ | |
"_id": quiz_id, | |
"submissions.student_id": student_id | |
}, | |
{"submissions.$": 1} | |
) | |
if quiz and quiz.get('submissions'): | |
return quiz['submissions'][0].get('score') | |
return None | |
# def submit_quiz_answers(quiz_id, student_id, student_answers): | |
# """Submit and score student's quiz answers""" | |
# quiz = quizzes_collection.find_one({"_id": quiz_id}) | |
# if not quiz: | |
# return None | |
# # Calculate score | |
# correct_answers = 0 | |
# total_questions = len(quiz['questions']) | |
# for q_idx, question in enumerate(quiz['questions']): | |
# if student_answers.get(str(q_idx)) == question['correct_option']: | |
# correct_answers += 1 | |
# score = (correct_answers / total_questions) * 100 | |
# # Store submission | |
# submission_data = { | |
# "student_id": student_id, | |
# "answers": student_answers, | |
# "score": score, | |
# "submitted_at": datetime.utcnow() | |
# } | |
# # Update quiz with submission | |
# quizzes_collection.update_one( | |
# {"_id": quiz_id}, | |
# { | |
# "$push": {"submissions": submission_data} | |
# } | |
# ) | |
# return score | |
def submit_quiz_answers(quiz_id, student_id, student_answers): | |
"""Submit and score student's quiz answers""" | |
try: | |
quiz = quizzes_collection.find_one({"_id": quiz_id}) | |
if not quiz: | |
return None | |
# Calculate score | |
correct_answers = 0 | |
total_questions = len(quiz['questions']) | |
for q_idx, question in enumerate(quiz['questions']): | |
student_answer = student_answers.get(str(q_idx)) | |
if student_answer: # Only check if answer was provided | |
# Extract the option letter (A, B, C, D) from the full answer string | |
answer_letter = student_answer.split(')')[0].strip() | |
if answer_letter == question['correct_option']: | |
correct_answers += 1 | |
score = (correct_answers / total_questions) * 100 | |
# Store submission | |
submission_data = { | |
"student_id": student_id, | |
"answers": student_answers, | |
"score": score, | |
"submitted_at": datetime.utcnow() | |
} | |
# Update quiz with submission | |
result = quizzes_collection.update_one( | |
{"_id": quiz_id}, | |
{"$push": {"submissions": submission_data}} | |
) | |
return score if result.modified_count > 0 else None | |
except Exception as e: | |
print(f"Error submitting quiz: {e}") | |
return None | |
def submit_surprise_quiz_answers(quiz_id, student_id, student_answers): | |
"""Submit and score student's quiz answers""" | |
try: | |
quiz = surprise_quizzes_collection.find_one({"_id": quiz_id}) | |
if not quiz: | |
return None | |
# Calculate score | |
correct_answers = 0 | |
total_questions = len(quiz['questions']) | |
for q_idx, question in enumerate(quiz['questions']): | |
student_answer = student_answers.get(str(q_idx)) | |
if student_answer: # Only check if answer was provided | |
# Extract the option letter (A, B, C, D) from the full answer string | |
answer_letter = student_answer.split(')')[0].strip() | |
if answer_letter == question['correct_option']: | |
correct_answers += 1 | |
score = (correct_answers / total_questions) * 100 | |
# Store submission | |
submission_data = { | |
"student_id": student_id, | |
"answers": student_answers, | |
"score": score, | |
"submitted_at": datetime.utcnow() | |
} | |
# Update quiz with submission | |
result = surprise_quizzes_collection.update_one( | |
{"_id": quiz_id}, | |
{"$push": {"submissions": submission_data}} | |
) | |
return score if result.modified_count > 0 else None | |
except Exception as e: | |
print(f"Error submitting quiz: {e}") | |
return None |