Spaces:
Runtime error
Runtime error
import gradio as gr | |
import pandas as pd | |
import json | |
import os | |
import re | |
from PyPDF2 import PdfReader | |
from collections import defaultdict | |
from typing import Dict, List, Optional, Tuple, Union | |
import html | |
from pathlib import Path | |
import fitz # PyMuPDF | |
import pytesseract | |
from PIL import Image | |
import io | |
import secrets | |
import string | |
from huggingface_hub import HfApi, HfFolder | |
import torch | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
import time | |
import logging | |
import asyncio | |
from functools import lru_cache | |
import hashlib | |
from concurrent.futures import ThreadPoolExecutor | |
from pydantic import BaseModel | |
import plotly.express as px | |
import pdfplumber | |
from io import BytesIO | |
import base64 | |
import datetime | |
from cryptography.fernet import Fernet | |
import calendar | |
from dateutil.relativedelta import relativedelta | |
import numpy as np | |
import matplotlib.pyplot as plt | |
# Enhanced Configuration | |
PROFILES_DIR = "student_profiles" | |
ALLOWED_FILE_TYPES = [".pdf", ".png", ".jpg", ".jpeg"] | |
MAX_FILE_SIZE_MB = 10 # Increased from 5MB | |
MIN_AGE = 5 | |
MAX_AGE = 120 | |
SESSION_TOKEN_LENGTH = 32 | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", Fernet.generate_key().decode()) | |
SESSION_TIMEOUT = 3600 * 3 # 3 hour session timeout | |
MAX_CONTEXT_HISTORY = 10 | |
MAX_PROFILE_LOAD_ATTEMPTS = 3 | |
# Initialize logging with enhanced configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('transcript_parser.log'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Model configuration - Using more capable model | |
MODEL_NAME = "deepseek-ai/deepseek-llm-7b" # Upgraded from 1.3b to 7b | |
# Initialize Hugging Face API with retry logic | |
if HF_TOKEN: | |
hf_api = None | |
for attempt in range(3): | |
try: | |
hf_api = HfApi(token=HF_TOKEN) | |
HfFolder.save_token(HF_TOKEN) | |
logger.info("Hugging Face API initialized successfully") | |
break | |
except Exception as e: | |
logger.error(f"Attempt {attempt + 1} failed to initialize Hugging Face API: {str(e)}") | |
time.sleep(2 ** attempt) # Exponential backoff | |
# ========== LEARNING STYLE QUIZ ========== | |
class LearningStyleQuiz: | |
def __init__(self): | |
self.questions = [ | |
"When learning something new, I prefer to:", | |
"I remember information best when I:", | |
"When giving directions, I:", | |
"When I'm bored, I tend to:", | |
"When learning a new skill, I prefer to:", | |
"When studying, I like to:", | |
"I prefer teachers who:", | |
"When solving problems, I:" | |
] | |
self.options = [ | |
["See diagrams and charts", "Listen to explanations", "Read about it", "Try it out hands-on"], | |
["See pictures or diagrams", "Hear someone explain it", "Read about it", "Do something physical with it"], | |
["Draw a map", "Give verbal instructions", "Write down directions", "Demonstrate or guide physically"], | |
["Doodle or look around", "Talk to myself or others", "Read or imagine things", "Fidget or move around"], | |
["Watch demonstrations", "Listen to instructions", "Read instructions", "Jump in and try it"], | |
["Use highlighters and diagrams", "Discuss with others", "Read and take notes", "Move around or use objects"], | |
["Use visual aids", "Give interesting lectures", "Provide reading materials", "Include hands-on activities"], | |
["Draw pictures or diagrams", "Talk through options", "Make lists", "Try different solutions physically"] | |
] | |
self.learning_styles = { | |
'visual': "**Visual** learners prefer seeing information in charts, diagrams, and pictures.", | |
'auditory': "**Auditory** learners prefer hearing information spoken and learn best through lectures and discussions.", | |
'reading/writing': "**Reading/Writing** learners prefer information displayed as words and learn best through reading and note-taking.", | |
'kinesthetic': "**Kinesthetic** learners prefer physical experience and learn best through hands-on activities and movement." | |
} | |
def evaluate_quiz(self, *answers): | |
"""Evaluate quiz answers and determine learning style""" | |
if not answers or any(a is None for a in answers): | |
raise gr.Error("Please answer all questions before submitting") | |
style_counts = { | |
'visual': 0, | |
'auditory': 0, | |
'reading/writing': 0, | |
'kinesthetic': 0 | |
} | |
# Map each answer to a learning style | |
for answer in answers: | |
if answer.startswith("See") or answer.startswith("Draw") or answer.startswith("Watch") or "diagram" in answer.lower(): | |
style_counts['visual'] += 1 | |
elif answer.startswith("Listen") or answer.startswith("Hear") or answer.startswith("Talk") or "lecture" in answer.lower(): | |
style_counts['auditory'] += 1 | |
elif answer.startswith("Read") or "note" in answer.lower() or "write" in answer.lower(): | |
style_counts['reading/writing'] += 1 | |
elif answer.startswith("Try") or "physical" in answer.lower() or "hands-on" in answer.lower(): | |
style_counts['kinesthetic'] += 1 | |
primary_style = max(style_counts, key=style_counts.get) | |
secondary_styles = sorted(style_counts.items(), key=lambda x: x[1], reverse=True)[1:3] | |
# Generate results | |
result = [ | |
"## π― Your Learning Style Results", | |
f"Your primary learning style is **{primary_style.capitalize()}**", | |
self.learning_styles[primary_style], | |
"", | |
"### Tips for Your Learning Style:" | |
] | |
if primary_style == 'visual': | |
result.extend([ | |
"- Use color coding in your notes", | |
"- Create mind maps and diagrams", | |
"- Watch educational videos to visualize concepts", | |
"- Highlight or underline important information" | |
]) | |
elif primary_style == 'auditory': | |
result.extend([ | |
"- Record lectures and listen to them", | |
"- Explain concepts out loud to yourself", | |
"- Participate in study groups", | |
"- Use rhymes or songs to remember information" | |
]) | |
elif primary_style == 'reading/writing': | |
result.extend([ | |
"- Write detailed summaries in your own words", | |
"- Create question-answer sets for each topic", | |
"- Rewrite your notes to reinforce learning", | |
"- Read textbooks and articles on the subject" | |
]) | |
elif primary_style == 'kinesthetic': | |
result.extend([ | |
"- Use hands-on activities when possible", | |
"- Study while moving or pacing", | |
"- Create physical models to represent concepts", | |
"- Take frequent short breaks to move around" | |
]) | |
result.extend([ | |
"", | |
"### Secondary Learning Styles:", | |
f"1. {secondary_styles[0][0].capitalize()}", | |
f"2. {secondary_styles[1][0].capitalize()}" | |
]) | |
return "\n".join(result) | |
# Initialize learning style quiz | |
learning_style_quiz = LearningStyleQuiz() | |
# ========== ENHANCED MODEL LOADER ========== | |
class ModelLoader: | |
def __init__(self): | |
self.model = None | |
self.tokenizer = None | |
self.loaded = False | |
self.loading = False | |
self.error = None | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
self.load_attempts = 0 | |
self.max_retries = 3 | |
def load_model(self, progress: gr.Progress = None) -> Tuple[Optional[AutoModelForCausalLM], Optional[AutoTokenizer]]: | |
"""Enhanced lazy load the model with progress feedback and retry logic""" | |
if self.loaded: | |
return self.model, self.tokenizer | |
if self.loading: | |
while self.loading and self.load_attempts < self.max_retries: | |
time.sleep(0.5) | |
return self.model, self.tokenizer | |
self.loading = True | |
self.load_attempts += 1 | |
try: | |
if progress: | |
progress(0.1, desc="Initializing model environment...") | |
# Clear GPU cache more aggressively | |
if self.device == "cuda": | |
torch.cuda.empty_cache() | |
torch.cuda.reset_peak_memory_stats() | |
if progress: | |
progress(0.2, desc="Loading tokenizer...") | |
# Tokenizer with more error handling | |
tokenizer = None | |
for attempt in range(3): | |
try: | |
tokenizer = AutoTokenizer.from_pretrained( | |
MODEL_NAME, | |
trust_remote_code=True, | |
use_fast=True | |
) | |
break | |
except Exception as e: | |
if attempt == 2: | |
raise | |
logger.warning(f"Tokenizer loading attempt {attempt + 1} failed: {str(e)}") | |
time.sleep(2 ** attempt) | |
if progress: | |
progress(0.5, desc="Loading model (this may take a few minutes)...") | |
# Model configuration with fallbacks | |
model_kwargs = { | |
"trust_remote_code": True, | |
"torch_dtype": torch.float16 if self.device == "cuda" else torch.float32, | |
"device_map": "auto" if self.device == "cuda" else None, | |
"low_cpu_mem_usage": True, | |
"offload_folder": "offload" | |
} | |
# Add max_memory configuration if multiple GPUs available | |
if torch.cuda.device_count() > 1: | |
model_kwargs["max_memory"] = {i: "20GiB" for i in range(torch.cuda.device_count())} | |
model = None | |
for attempt in range(3): | |
try: | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
**model_kwargs | |
) | |
break | |
except torch.cuda.OutOfMemoryError: | |
logger.warning("CUDA OOM encountered, trying CPU offloading") | |
model_kwargs["device_map"] = None | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
**model_kwargs | |
).to('cpu') | |
self.device = 'cpu' | |
break | |
except Exception as e: | |
if attempt == 2: | |
raise | |
logger.warning(f"Model loading attempt {attempt + 1} failed: {str(e)}") | |
time.sleep(2 ** attempt) | |
# Test inference | |
if progress: | |
progress(0.8, desc="Verifying model...") | |
test_input = tokenizer("Test", return_tensors="pt").to(self.device) | |
with torch.no_grad(): | |
_ = model.generate(**test_input, max_new_tokens=1) | |
self.model = model.eval() | |
self.tokenizer = tokenizer | |
self.loaded = True | |
logger.info("Model loaded successfully") | |
return model, tokenizer | |
except Exception as e: | |
self.error = f"Model loading failed after {self.load_attempts} attempts: {str(e)}" | |
logger.error(self.error) | |
if self.load_attempts < self.max_retries: | |
logger.info(f"Retrying model loading ({self.load_attempts}/{self.max_retries})") | |
time.sleep(5) | |
return self.load_model(progress) | |
return None, None | |
finally: | |
self.loading = False | |
# Initialize model loader | |
model_loader = ModelLoader() | |
def get_model_and_tokenizer(): | |
return model_loader.load_model() | |
# ========== ENHANCED UTILITY FUNCTIONS ========== | |
class DataEncryptor: | |
def __init__(self, key: str): | |
self.cipher = Fernet(key.encode()) | |
def encrypt(self, data: str) -> str: | |
return self.cipher.encrypt(data.encode()).decode() | |
def decrypt(self, encrypted_data: str) -> str: | |
return self.cipher.decrypt(encrypted_data.encode()).decode() | |
encryptor = DataEncryptor(ENCRYPTION_KEY) | |
def generate_session_token() -> str: | |
alphabet = string.ascii_letters + string.digits | |
return ''.join(secrets.choice(alphabet) for _ in range(SESSION_TOKEN_LENGTH)) | |
def sanitize_input(text: str) -> str: | |
if not text: | |
return "" | |
text = html.escape(text.strip()) | |
text = re.sub(r'<[^>]*>', '', text) | |
text = re.sub(r'[^\w\s\-.,!?@#\$%^&*()+=]', '', text) | |
return text | |
def validate_name(name: str) -> str: | |
name = name.strip() | |
if not name: | |
raise ValueError("Name cannot be empty.") | |
if len(name) > 100: | |
raise ValueError("Name is too long (maximum 100 characters).") | |
if any(c.isdigit() for c in name): | |
raise ValueError("Name cannot contain numbers.") | |
return name | |
def validate_age(age: Union[int, float, str]) -> int: | |
try: | |
age_int = int(age) | |
if not MIN_AGE <= age_int <= MAX_AGE: | |
raise ValueError(f"Age must be between {MIN_AGE} and {MAX_AGE}.") | |
return age_int | |
except (ValueError, TypeError): | |
raise ValueError("Please enter a valid age number.") | |
def validate_file(file_obj) -> None: | |
if not file_obj: | |
raise ValueError("Please upload a file first") | |
file_ext = os.path.splitext(file_obj.name)[1].lower() | |
if file_ext not in ALLOWED_FILE_TYPES: | |
raise ValueError(f"Invalid file type. Allowed types: {', '.join(ALLOWED_FILE_TYPES)}") | |
file_size = os.path.getsize(file_obj.name) / (1024 * 1024) | |
if file_size > MAX_FILE_SIZE_MB: | |
raise ValueError(f"File too large. Maximum size is {MAX_FILE_SIZE_MB}MB.") | |
def remove_sensitive_info(text: str) -> str: | |
"""Enhanced PII removal with more patterns""" | |
patterns = [ | |
(r'\b\d{3}-\d{2}-\d{4}\b', '[REDACTED-SSN]'), | |
(r'\b\d{6,9}\b', '[ID]'), | |
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]'), | |
(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '[IP]'), | |
(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', '[NAME]'), # Simple name pattern | |
(r'\b\d{3}\) \d{3}-\d{4}\b', '[PHONE]'), | |
(r'\b\d{1,5} [A-Z][a-z]+ [A-Z][a-z]+, [A-Z]{2} \d{5}\b', '[ADDRESS]') | |
] | |
for pattern, replacement in patterns: | |
text = re.sub(pattern, replacement, text) | |
return text | |
# ========== ENHANCED PDF PARSING ========== | |
class EnhancedTranscriptParser: | |
def __init__(self): | |
self.common_school_patterns = { | |
'miami_dade': r'(MIAMI-DADE|DADE COUNTY|MDCPS)', | |
'broward': r'(BROWARD COUNTY|BCPS)', | |
'florida': r'(FLORIDA|FDOE|FL DOE)' | |
} | |
self.transcript_templates = { | |
'miami_dade': self._parse_miami_dade_transcript, | |
'broward': self._parse_broward_transcript, | |
'florida': self._parse_florida_standard_transcript, | |
'default': self._parse_generic_transcript | |
} | |
def detect_transcript_type(self, text: str) -> str: | |
"""Detect the transcript format based on patterns""" | |
text = text.upper() | |
for template, pattern in self.common_school_patterns.items(): | |
if re.search(pattern, text): | |
return template | |
return 'default' | |
def parse_transcript(self, file_path: str, file_ext: str) -> Dict: | |
"""Enhanced parsing with format detection and fallbacks""" | |
try: | |
# First extract text with appropriate method | |
text = self.extract_text_from_file(file_path, file_ext) | |
if not text.strip(): | |
raise ValueError("No text could be extracted from file") | |
# Detect transcript type | |
transcript_type = self.detect_transcript_type(text) | |
logger.info(f"Detected transcript type: {transcript_type}") | |
# Try specialized parser first | |
parser_func = self.transcript_templates.get(transcript_type, self._parse_generic_transcript) | |
parsed_data = parser_func(text) | |
if not parsed_data: | |
logger.warning(f"Specialized parser failed, trying generic parser") | |
parsed_data = self._parse_generic_transcript(text) | |
if not parsed_data: | |
raise ValueError("No data could be parsed from transcript") | |
# Validate and enhance parsed data | |
self.validate_parsed_data(parsed_data) | |
self.enhance_parsed_data(parsed_data) | |
return parsed_data | |
except Exception as e: | |
logger.error(f"Error parsing transcript: {str(e)}") | |
raise ValueError(f"Couldn't parse transcript content. Error: {str(e)}") | |
def extract_text_from_file(self, file_path: str, file_ext: str) -> str: | |
"""Enhanced text extraction with multiple fallbacks""" | |
text = "" | |
try: | |
if file_ext == '.pdf': | |
# Try pdfplumber first for better table handling | |
try: | |
with pdfplumber.open(file_path) as pdf: | |
for page in pdf.pages: | |
# Try to extract tables first | |
tables = page.extract_tables({ | |
"vertical_strategy": "text", | |
"horizontal_strategy": "text", | |
"intersection_y_tolerance": 10, | |
"join_tolerance": 20 | |
}) | |
if tables: | |
for table in tables: | |
for row in table: | |
text += " | ".join(str(cell).strip() for cell in row if cell) + "\n" | |
# Fall back to text extraction if tables are empty | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
if not text.strip(): | |
raise ValueError("PDFPlumber returned empty text") | |
except Exception as e: | |
logger.warning(f"PDFPlumber failed: {str(e)}. Trying PyMuPDF...") | |
doc = fitz.open(file_path) | |
for page in doc: | |
text += page.get_text("text", flags=fitz.TEXT_PRESERVE_IMAGES) + '\n' | |
elif file_ext in ['.png', '.jpg', '.jpeg']: | |
text = self.extract_text_with_enhanced_ocr(file_path) | |
text = self.clean_extracted_text(text) | |
if not text.strip(): | |
raise ValueError("The file appears to be empty or contains no readable text.") | |
return text | |
except Exception as e: | |
logger.error(f"Text extraction error: {str(e)}") | |
raise ValueError(f"Failed to extract text: {str(e)}") | |
def extract_text_with_enhanced_ocr(self, file_path: str) -> str: | |
"""Enhanced OCR with preprocessing""" | |
try: | |
image = Image.open(file_path) | |
# Preprocessing for better OCR | |
image = image.convert('L') # Grayscale | |
image = image.point(lambda x: 0 if x < 140 else 255, '1') # Thresholding | |
# Custom config for academic documents | |
custom_config = r'--oem 3 --psm 6 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-.,:()%$@ ' | |
# Try with different page segmentation modes | |
for psm in [6, 11, 4]: # Try different modes | |
text = pytesseract.image_to_string(image, config=f"{custom_config} --psm {psm}") | |
if len(text.strip()) > 50: # If we got reasonable text | |
break | |
return text | |
except Exception as e: | |
raise ValueError(f"OCR processing failed: {str(e)}") | |
def clean_extracted_text(self, text: str) -> str: | |
"""Enhanced cleaning for academic transcripts""" | |
# Normalize whitespace and case | |
text = re.sub(r'\s+', ' ', text).strip() | |
# Fix common OCR errors in academic contexts | |
replacements = { | |
'GradeLv1': 'GradeLvl', | |
'CrsNu m': 'CrsNum', | |
'YOG': 'Year of Graduation', | |
'Comm Serv': 'Community Service', | |
r'\bA\s*-\s*': 'A-', # Fix requirement codes | |
r'\bB\s*-\s*': 'B-', | |
r'\bC\s*-\s*': 'C-', | |
r'\bD\s*-\s*': 'D-', | |
r'\bE\s*-\s*': 'E-', | |
r'\bF\s*-\s*': 'F-', | |
r'\bG\s*-\s*': 'G-', | |
r'\bZ\s*-\s*': 'Z-', | |
'lnProgress': 'inProgress', | |
'lP': 'IP', | |
'AP\s': 'AP ', | |
'DE\s': 'DE ', | |
'Honors\s': 'Honors ', | |
'lB': 'IB' | |
} | |
for pattern, replacement in replacements.items(): | |
text = re.sub(pattern, replacement, text, flags=re.IGNORECASE) | |
# Fix course codes with spaces | |
text = re.sub(r'(\b[A-Z]{2,4})\s(\d{3}[A-Z]?\b)', r'\1\2', text) | |
return text | |
def validate_parsed_data(self, parsed_data: Dict) -> bool: | |
"""Enhanced validation with more fields""" | |
required_fields = [ | |
('student_info', 'name'), | |
('student_info', 'id'), | |
('requirements',), # At least some requirements | |
('course_history',) # At least some courses | |
] | |
for path in required_fields: | |
current = parsed_data | |
for key in path: | |
if key not in current: | |
raise ValueError(f"Missing critical field: {'.'.join(path)}") | |
current = current[key] | |
return True | |
def enhance_parsed_data(self, parsed_data: Dict) -> Dict: | |
"""Add derived fields and calculations""" | |
# Calculate total credits if not present | |
if 'total_credits' not in parsed_data.get('student_info', {}): | |
try: | |
total_credits = sum( | |
float(course.get('credits', 0)) | |
for course in parsed_data.get('course_history', []) | |
if course and str(course.get('credits', '0')).replace('.', '').isdigit() | |
) | |
parsed_data['student_info']['total_credits'] = round(total_credits, 2) | |
except: | |
pass | |
# Calculate GPA if not present | |
if 'weighted_gpa' not in parsed_data.get('student_info', {}): | |
try: | |
grades = [] | |
grade_points = { | |
'A': 4.0, 'A-': 3.7, 'B+': 3.3, 'B': 3.0, 'B-': 2.7, | |
'C+': 2.3, 'C': 2.0, 'C-': 1.7, 'D+': 1.3, 'D': 1.0, 'F': 0.0 | |
} | |
for course in parsed_data.get('course_history', []): | |
grade = course.get('grade_earned', '').upper() | |
if grade in grade_points: | |
grades.append(grade_points[grade]) | |
if grades: | |
unweighted_gpa = sum(grades) / len(grades) | |
parsed_data['student_info']['unweighted_gpa'] = round(unweighted_gpa, 2) | |
# Simple weighted GPA calculation (AP/IB/DE courses get +1) | |
weighted_grades = [] | |
for course in parsed_data.get('course_history', []): | |
grade = course.get('grade_earned', '').upper() | |
if grade in grade_points: | |
weight = 1.0 if any(x in course.get('course_name', '').upper() | |
for x in ['AP', 'IB', 'DE', 'HONORS']) else 0.0 | |
weighted_grades.append(grade_points[grade] + weight) | |
if weighted_grades: | |
parsed_data['student_info']['weighted_gpa'] = round(sum(weighted_grades) / len(weighted_grades), 2) | |
except: | |
pass | |
return parsed_data | |
def _parse_miami_dade_transcript(self, text: str) -> Optional[Dict]: | |
"""Enhanced Miami-Dade parser with better table handling""" | |
try: | |
parsed_data = { | |
'student_info': {}, | |
'requirements': {}, | |
'course_history': [], | |
'assessments': {} | |
} | |
# Extract student info with more robust pattern | |
student_info_match = re.search( | |
r"(\d{7})\s*-\s*(.*?)\s*\n.*?Current Grade:\s*(\d+).*?YOG\s*(\d{4})", | |
text, | |
re.DOTALL | re.IGNORECASE | |
) | |
if student_info_match: | |
parsed_data['student_info'] = { | |
'id': student_info_match.group(1), | |
'name': student_info_match.group(2).strip(), | |
'grade': student_info_match.group(3), | |
'year_of_graduation': student_info_match.group(4), | |
'district': 'Miami-Dade' | |
} | |
# Extract GPA information with more flexible patterns | |
gpa_patterns = [ | |
r"(?:Un.?weighted|Weighted)\s*GPA\s*([\d.]+)", | |
r"GPA\s*\(.*?\)\s*:\s*([\d.]+)", | |
r"Grade\s*Point\s*Average\s*:\s*([\d.]+)" | |
] | |
gpa_values = [] | |
for pattern in gpa_patterns: | |
gpa_values.extend(re.findall(pattern, text, re.IGNORECASE)) | |
if len(gpa_values) >= 2: | |
break | |
if len(gpa_values) >= 1: | |
parsed_data['student_info']['unweighted_gpa'] = float(gpa_values[0]) | |
if len(gpa_values) >= 2: | |
parsed_data['student_info']['weighted_gpa'] = float(gpa_values[1]) | |
# Extract community service info | |
service_hours_match = re.search(r"Comm\s*Serv\s*Hours\s*(\d+)", text, re.IGNORECASE) | |
if service_hours_match: | |
parsed_data['student_info']['community_service_hours'] = int(service_hours_match.group(1)) | |
service_date_match = re.search(r"Comm\s*Serv\s*Date\s*(\d{2}/\d{2}/\d{4})", text, re.IGNORECASE) | |
if service_date_match: | |
parsed_data['student_info']['community_service_date'] = service_date_match.group(1) | |
# Extract credits info | |
credits_match = re.search(r"Total\s*Credits\s*Earned\s*([\d.]+)", text, re.IGNORECASE) | |
if credits_match: | |
parsed_data['student_info']['total_credits'] = float(credits_match.group(1)) | |
# Extract virtual grade | |
virtual_grade_match = re.search(r"Virtual\s*Grade\s*([A-Z])", text, re.IGNORECASE) | |
if virtual_grade_match: | |
parsed_data['student_info']['virtual_grade'] = virtual_grade_match.group(1) | |
# Enhanced requirements section parsing | |
req_section = re.search( | |
r"(?:Graduation\s*Requirements|Requirements\s*Summary).*?(Code\s*Description.*?)(?:\n\s*\n|$)", | |
text, | |
re.DOTALL | re.IGNORECASE | |
) | |
if req_section: | |
req_lines = [line.strip() for line in req_section.group(1).split('\n') if line.strip()] | |
for line in req_lines: | |
if '|' in line: # Table format | |
parts = [part.strip() for part in line.split('|') if part.strip()] | |
if len(parts) >= 5: # More lenient check for number of columns | |
try: | |
code = parts[0] if len(parts) > 0 else "" | |
description = parts[1] if len(parts) > 1 else "" | |
required = float(parts[2]) if len(parts) > 2 and parts[2].replace('.','').isdigit() else 0.0 | |
waived = float(parts[3]) if len(parts) > 3 and parts[3].replace('.','').isdigit() else 0.0 | |
completed = float(parts[4]) if len(parts) > 4 and parts[4].replace('.','').isdigit() else 0.0 | |
status = parts[5] if len(parts) > 5 else "" | |
# Extract percentage if available | |
percent = 0.0 | |
if status: | |
percent_match = re.search(r"(\d+)%", status) | |
if percent_match: | |
percent = float(percent_match.group(1)) | |
parsed_data['requirements'][code] = { | |
"description": description, | |
"required": required, | |
"waived": waived, | |
"completed": completed, | |
"percent_complete": percent, | |
"status": status | |
} | |
except (IndexError, ValueError) as e: | |
logger.warning(f"Skipping malformed requirement line: {line}. Error: {str(e)}") | |
continue | |
# Enhanced course history parsing | |
course_section = re.search( | |
r"(?:Course\s*History|Academic\s*Record).*?(Requirement.*?School Year.*?GradeLv1.*?CrsNum.*?Description.*?Term.*?DstNumber.*?FG.*?Incl.*?Credits.*?)(?:\n\s*\n|$)", | |
text, | |
re.DOTALL | re.IGNORECASE | |
) | |
if course_section: | |
course_lines = [ | |
line.strip() for line in course_section.group(1).split('\n') | |
if line.strip() and '|' in line | |
] | |
for line in course_lines: | |
parts = [part.strip() for part in line.split('|') if part.strip()] | |
try: | |
course = { | |
'requirement': parts[0] if len(parts) > 0 else "", | |
'school_year': parts[1] if len(parts) > 1 else "", | |
'grade_level': parts[2] if len(parts) > 2 else "", | |
'course_code': parts[3] if len(parts) > 3 else "", | |
'description': parts[4] if len(parts) > 4 else "", | |
'term': parts[5] if len(parts) > 5 else "", | |
'district_number': parts[6] if len(parts) > 6 else "", | |
'fg': parts[7] if len(parts) > 7 else "", | |
'included': parts[8] if len(parts) > 8 else "", | |
'credits': parts[9] if len(parts) > 9 else "0", | |
'status': 'Completed' if parts[9] and parts[9] != 'inProgress' else 'In Progress' | |
} | |
# Handle credits conversion | |
if "inprogress" in course['credits'].lower() or not course['credits']: | |
course['credits'] = "0" | |
elif not course['credits'].replace('.','').isdigit(): | |
course['credits'] = "0" | |
parsed_data['course_history'].append(course) | |
except (IndexError, ValueError) as e: | |
logger.warning(f"Skipping malformed course line: {line}. Error: {str(e)}") | |
continue | |
return parsed_data | |
except Exception as e: | |
logger.warning(f"Miami-Dade transcript parsing failed: {str(e)}") | |
return None | |
def _parse_broward_transcript(self, text: str) -> Optional[Dict]: | |
"""Parser for Broward County transcripts""" | |
try: | |
parsed_data = { | |
'student_info': {}, | |
'requirements': {}, | |
'course_history': [], | |
'assessments': {} | |
} | |
# Broward-specific patterns | |
student_info_match = re.search( | |
r"Student:\s*(\d+)\s*-\s*(.*?)\s*Grade:\s*(\d+)", | |
text, | |
re.IGNORECASE | |
) | |
if student_info_match: | |
parsed_data['student_info'] = { | |
'id': student_info_match.group(1), | |
'name': student_info_match.group(2).strip(), | |
'grade': student_info_match.group(3), | |
'district': 'Broward' | |
} | |
# Add Broward-specific parsing logic here... | |
return parsed_data | |
except Exception as e: | |
logger.warning(f"Broward transcript parsing failed: {str(e)}") | |
return None | |
def _parse_florida_standard_transcript(self, text: str) -> Optional[Dict]: | |
"""Parser for Florida standard transcripts""" | |
try: | |
parsed_data = { | |
'student_info': {}, | |
'requirements': {}, | |
'course_history': [], | |
'assessments': {} | |
} | |
# Florida standard patterns | |
student_info_match = re.search( | |
r"Florida\s*Student\s*Transcript.*?Name:\s*(.*?)\s*ID:\s*(\d+)", | |
text, | |
re.IGNORECASE | re.DOTALL | |
) | |
if student_info_match: | |
parsed_data['student_info'] = { | |
'name': student_info_match.group(1).strip(), | |
'id': student_info_match.group(2), | |
'district': 'Florida' | |
} | |
# Add Florida standard parsing logic here... | |
return parsed_data | |
except Exception as e: | |
logger.warning(f"Florida standard transcript parsing failed: {str(e)}") | |
return None | |
def _parse_generic_transcript(self, text: str) -> Optional[Dict]: | |
"""Fallback parser for generic transcripts""" | |
try: | |
parsed_data = { | |
'student_info': {}, | |
'requirements': {}, | |
'course_history': [], | |
'assessments': {} | |
} | |
# Try to extract basic student info | |
name_match = re.search(r"(?:Student|Name):\s*(.*?)\s*(?:\n|ID|$)", text, re.IGNORECASE) | |
if name_match: | |
parsed_data['student_info']['name'] = name_match.group(1).strip() | |
id_match = re.search(r"(?:ID|Student\s*Number):\s*(\d+)", text, re.IGNORECASE) | |
if id_match: | |
parsed_data['student_info']['id'] = id_match.group(1) | |
# Try to extract courses | |
course_patterns = [ | |
r"([A-Z]{2,4}\d{3}[A-Z]?)\s+(.*?)\s+([A-F][+-]?)\s+(\d+\.?\d*)", # CODE DESC GRADE CREDITS | |
r"(\d{4}-\d{4})\s+([A-Z]{2,4}\d{3}[A-Z]?)\s+(.*?)\s+([A-F][+-]?)\s+(\d+\.?\d*)", # YEAR CODE DESC GRADE CREDITS | |
r"(.*?)\s+([A-F][+-]?)\s+(\d+\.?\d*)" # DESC GRADE CREDITS | |
] | |
for pattern in course_patterns: | |
courses = re.findall(pattern, text) | |
if courses: | |
for course in courses: | |
if len(course) == 4: | |
parsed_data['course_history'].append({ | |
'course_code': course[0], | |
'description': course[1], | |
'grade': course[2], | |
'credits': course[3] | |
}) | |
elif len(course) == 5: | |
parsed_data['course_history'].append({ | |
'school_year': course[0], | |
'course_code': course[1], | |
'description': course[2], | |
'grade': course[3], | |
'credits': course[4] | |
}) | |
elif len(course) == 3: | |
parsed_data['course_history'].append({ | |
'description': course[0], | |
'grade': course[1], | |
'credits': course[2] | |
}) | |
break | |
return parsed_data if parsed_data['course_history'] else None | |
except Exception as e: | |
logger.warning(f"Generic transcript parsing failed: {str(e)}") | |
return None | |
# Initialize enhanced parser | |
transcript_parser = EnhancedTranscriptParser() | |
# ========== ENHANCED ANALYSIS FUNCTIONS ========== | |
class AcademicAnalyzer: | |
def __init__(self): | |
self.gpa_scale = { | |
'A': 4.0, 'A-': 3.7, 'B+': 3.3, 'B': 3.0, 'B-': 2.7, | |
'C+': 2.3, 'C': 2.0, 'C-': 1.7, 'D+': 1.3, 'D': 1.0, 'F': 0.0 | |
} | |
self.college_tiers = { | |
'ivy_league': {'gpa': 4.3, 'rigor': 8, 'service': 100}, | |
'top_tier': {'gpa': 4.0, 'rigor': 6, 'service': 80}, | |
'competitive': {'gpa': 3.7, 'rigor': 4, 'service': 60}, | |
'good': {'gpa': 3.3, 'rigor': 2, 'service': 40}, | |
'average': {'gpa': 2.7, 'rigor': 1, 'service': 20} | |
} | |
def analyze_gpa(self, parsed_data: Dict) -> Dict: | |
"""Enhanced GPA analysis with more detailed feedback""" | |
analysis = { | |
'rating': '', | |
'description': '', | |
'comparison': '', | |
'improvement_tips': [] | |
} | |
try: | |
weighted_gpa = float(parsed_data.get('student_info', {}).get('weighted_gpa', 0)) | |
unweighted_gpa = float(parsed_data.get('student_info', {}).get('unweighted_gpa', 0)) | |
if weighted_gpa >= 4.5: | |
analysis['rating'] = 'Excellent' | |
analysis['description'] = "π You're in the top tier of students with a highly competitive GPA." | |
analysis['comparison'] = "This puts you in the top 5% of students nationally." | |
analysis['improvement_tips'] = [ | |
"Consider taking advanced courses to challenge yourself", | |
"Look into college-level courses or research opportunities" | |
] | |
elif weighted_gpa >= 4.0: | |
analysis['rating'] = 'Strong' | |
analysis['description'] = "π Your GPA is strong and competitive for most colleges." | |
analysis['comparison'] = "This is above the national average and competitive for many universities." | |
analysis['improvement_tips'] = [ | |
"Maintain your current study habits", | |
"Consider adding 1-2 more challenging courses" | |
] | |
elif weighted_gpa >= 3.5: | |
analysis['rating'] = 'Good' | |
analysis['description'] = "βΉοΈ Your GPA is good but could be improved for more competitive schools." | |
analysis['comparison'] = "This is slightly above the national average." | |
analysis['improvement_tips'] = [ | |
"Focus on improving in your weaker subjects", | |
"Consider getting tutoring for challenging courses", | |
"Develop better study habits and time management" | |
] | |
elif weighted_gpa >= 3.0: | |
analysis['rating'] = 'Average' | |
analysis['description'] = "β οΈ Your GPA is average. Focus on improvement for better college options." | |
analysis['comparison'] = "This is around the national average." | |
analysis['improvement_tips'] = [ | |
"Identify your weakest subjects and focus on them", | |
"Develop a consistent study schedule", | |
"Seek help from teachers or tutors", | |
"Consider retaking courses with low grades if possible" | |
] | |
else: | |
analysis['rating'] = 'Below Average' | |
analysis['description'] = "β Your GPA is below average. Please consult with your academic advisor." | |
analysis['comparison'] = "This is below the national average and may limit college options." | |
analysis['improvement_tips'] = [ | |
"Meet with your school counselor immediately", | |
"Develop a structured improvement plan", | |
"Consider summer school or credit recovery options", | |
"Focus on fundamental study skills" | |
] | |
# Add comparison between weighted and unweighted | |
if weighted_gpa > 0 and unweighted_gpa > 0: | |
diff = weighted_gpa - unweighted_gpa | |
if diff > 0.5: | |
analysis['comparison'] += "\n\nThe significant difference between your weighted and unweighted GPA suggests you're taking many advanced courses." | |
elif diff > 0.2: | |
analysis['comparison'] += "\n\nThe moderate difference between your weighted and unweighted GPA suggests a good balance of standard and advanced courses." | |
else: | |
analysis['comparison'] += "\n\nThe small difference between your weighted and unweighted GPA suggests you might benefit from more challenging courses." | |
return analysis | |
except: | |
return { | |
'rating': 'Unknown', | |
'description': 'Could not analyze GPA', | |
'comparison': '', | |
'improvement_tips': [] | |
} | |
def analyze_graduation_status(self, parsed_data: Dict) -> Dict: | |
"""Enhanced graduation analysis with requirement breakdown""" | |
analysis = { | |
'status': '', | |
'completion_percentage': 0, | |
'missing_requirements': [], | |
'on_track': False, | |
'timeline': '' | |
} | |
try: | |
total_required = sum( | |
float(req.get('required', 0)) | |
for req in parsed_data.get('requirements', {}).values() | |
if req and str(req.get('required', '0')).replace('.','').isdigit() | |
) | |
total_completed = sum( | |
float(req.get('completed', 0)) | |
for req in parsed_data.get('requirements', {}).values() | |
if req and str(req.get('completed', '0')).replace('.','').isdigit() | |
) | |
analysis['completion_percentage'] = (total_completed / total_required) * 100 if total_required > 0 else 0 | |
# Identify missing requirements | |
analysis['missing_requirements'] = [ | |
{ | |
'code': code, | |
'description': req.get('description', ''), | |
'remaining': max(0, float(req.get('required', 0)) - float(req.get('completed', 0))), | |
'status': req.get('status', '') | |
} | |
for code, req in parsed_data.get('requirements', {}).items() | |
if req and float(req.get('completed', 0)) < float(req.get('required', 0)) | |
] | |
# Determine status message | |
current_grade = parsed_data.get('student_info', {}).get('grade', '') | |
grad_year = parsed_data.get('student_info', {}).get('year_of_graduation', '') | |
if analysis['completion_percentage'] >= 100: | |
analysis['status'] = "π Congratulations! You've met all graduation requirements." | |
analysis['on_track'] = True | |
elif analysis['completion_percentage'] >= 90: | |
analysis['status'] = f"β You've completed {analysis['completion_percentage']:.1f}% of requirements. Almost there!" | |
analysis['on_track'] = True | |
elif analysis['completion_percentage'] >= 75: | |
analysis['status'] = f"π You've completed {analysis['completion_percentage']:.1f}% of requirements. Keep working!" | |
analysis['on_track'] = True | |
elif analysis['completion_percentage'] >= 50: | |
analysis['status'] = f"β οΈ You've completed {analysis['completion_percentage']:.1f}% of requirements. Please meet with your counselor." | |
analysis['on_track'] = False | |
else: | |
analysis['status'] = f"β You've only completed {analysis['completion_percentage']:.1f}% of requirements. Immediate action needed." | |
analysis['on_track'] = False | |
# Add timeline projection if possible | |
if current_grade and grad_year: | |
remaining_credits = total_required - total_completed | |
years_remaining = int(grad_year) - datetime.datetime.now().year - int(current_grade) | |
if years_remaining > 0: | |
credits_per_year = remaining_credits / years_remaining | |
analysis['timeline'] = ( | |
f"To graduate on time in {grad_year}, you need to complete approximately " | |
f"{credits_per_year:.1f} credits per year." | |
) | |
return analysis | |
except: | |
return { | |
'status': 'Could not analyze graduation status', | |
'completion_percentage': 0, | |
'missing_requirements': [], | |
'on_track': False, | |
'timeline': '' | |
} | |
def analyze_course_rigor(self, parsed_data: Dict) -> Dict: | |
"""Analyze the difficulty level of courses taken""" | |
analysis = { | |
'advanced_courses': 0, | |
'honors_courses': 0, | |
'ap_courses': 0, | |
'ib_courses': 0, | |
'de_courses': 0, | |
'rating': '', | |
'recommendations': [] | |
} | |
try: | |
for course in parsed_data.get('course_history', []): | |
course_name = course.get('description', '').upper() | |
if 'AP' in course_name: | |
analysis['ap_courses'] += 1 | |
analysis['advanced_courses'] += 1 | |
elif 'IB' in course_name: | |
analysis['ib_courses'] += 1 | |
analysis['advanced_courses'] += 1 | |
elif 'DE' in course_name or 'DUAL ENROLLMENT' in course_name: | |
analysis['de_courses'] += 1 | |
analysis['advanced_courses'] += 1 | |
elif 'HONORS' in course_name: | |
analysis['honors_courses'] += 1 | |
analysis['advanced_courses'] += 1 | |
total_advanced = analysis['advanced_courses'] | |
total_courses = len(parsed_data.get('course_history', [])) | |
if total_courses == 0: | |
return analysis | |
advanced_percentage = (total_advanced / total_courses) * 100 | |
if advanced_percentage >= 50: | |
analysis['rating'] = 'Very High Rigor' | |
analysis['recommendations'] = [ | |
"Your course rigor is excellent for college admissions", | |
"Consider adding 1-2 more advanced courses if manageable" | |
] | |
elif advanced_percentage >= 30: | |
analysis['rating'] = 'High Rigor' | |
analysis['recommendations'] = [ | |
"Your course rigor is strong", | |
"Consider adding 1-2 more advanced courses next year" | |
] | |
elif advanced_percentage >= 15: | |
analysis['rating'] = 'Moderate Rigor' | |
analysis['recommendations'] = [ | |
"Your course rigor is average", | |
"Consider adding more advanced courses to strengthen your profile" | |
] | |
else: | |
analysis['rating'] = 'Low Rigor' | |
analysis['recommendations'] = [ | |
"Your course rigor is below average for college-bound students", | |
"Strongly consider adding advanced courses next semester", | |
"Meet with your counselor to discuss options" | |
] | |
return analysis | |
except: | |
return { | |
'advanced_courses': 0, | |
'honors_courses': 0, | |
'ap_courses': 0, | |
'ib_courses': 0, | |
'de_courses': 0, | |
'rating': 'Unknown', | |
'recommendations': [] | |
} | |
def generate_college_recommendations(self, parsed_data: Dict) -> Dict: | |
"""Enhanced college recommendations based on full profile""" | |
recommendations = { | |
'reach': [], | |
'target': [], | |
'safety': [], | |
'scholarships': [], | |
'improvement_areas': [] | |
} | |
try: | |
# Get key metrics | |
weighted_gpa = float(parsed_data.get('student_info', {}).get('weighted_gpa', 0)) | |
rigor_analysis = self.analyze_course_rigor(parsed_data) | |
service_hours = int(parsed_data.get('student_info', {}).get('community_service_hours', 0)) | |
# Determine college tiers | |
if weighted_gpa >= 4.3 and rigor_analysis['advanced_courses'] >= 8 and service_hours >= 100: | |
recommendations['reach'].extend([ | |
"Ivy League: Harvard, Yale, Princeton, Columbia, etc.", | |
"Stanford, MIT, CalTech, University of Chicago" | |
]) | |
recommendations['target'].extend([ | |
"Top Public Universities: UCLA, UC Berkeley, UMich, UVA", | |
"Elite Liberal Arts: Williams, Amherst, Swarthmore" | |
]) | |
elif weighted_gpa >= 4.0 and rigor_analysis['advanced_courses'] >= 6 and service_hours >= 80: | |
recommendations['reach'].extend([ | |
"Top 20 National Universities", | |
"Highly Selective Liberal Arts Colleges" | |
]) | |
recommendations['target'].extend([ | |
"Top 50 National Universities", | |
"Selective Public Flagships", | |
"Top Liberal Arts Colleges" | |
]) | |
elif weighted_gpa >= 3.7 and rigor_analysis['advanced_courses'] >= 4 and service_hours >= 60: | |
recommendations['reach'].extend([ | |
"Top 50 National Universities", | |
"Selective Liberal Arts Colleges" | |
]) | |
recommendations['target'].extend([ | |
"State Flagship Universities", | |
"Good Regional Universities" | |
]) | |
elif weighted_gpa >= 3.3 and rigor_analysis['advanced_courses'] >= 2 and service_hours >= 40: | |
recommendations['target'].extend([ | |
"State Universities", | |
"Many Private Colleges" | |
]) | |
recommendations['safety'].extend([ | |
"Less Selective Private Colleges", | |
"Community Colleges with Transfer Programs" | |
]) | |
else: | |
recommendations['target'].extend([ | |
"Open Admission Colleges", | |
"Some State Universities" | |
]) | |
recommendations['safety'].extend([ | |
"Community Colleges", | |
"Technical Schools" | |
]) | |
# Scholarship recommendations | |
if weighted_gpa >= 4.0: | |
recommendations['scholarships'].extend([ | |
"National Merit Scholarship", | |
"Presidential Scholarships", | |
"College-Specific Full-Ride Scholarships" | |
]) | |
elif weighted_gpa >= 3.7: | |
recommendations['scholarships'].extend([ | |
"Bright Futures (Florida)", | |
"State-Specific Merit Scholarships", | |
"Honors College Scholarships" | |
]) | |
elif weighted_gpa >= 3.3: | |
recommendations['scholarships'].extend([ | |
"Local Community Scholarships", | |
"Special Interest Scholarships", | |
"First-Generation Student Programs" | |
]) | |
# Improvement areas | |
if weighted_gpa < 3.5: | |
recommendations['improvement_areas'].append("Improve GPA through focused study and tutoring") | |
if rigor_analysis['advanced_courses'] < 4: | |
recommendations['improvement_areas'].append("Take more advanced courses (AP/IB/DE/Honors)") | |
if service_hours < 50: | |
recommendations['improvement_areas'].append("Increase community service involvement") | |
return recommendations | |
except: | |
return { | |
'reach': ["Could not generate recommendations"], | |
'target': [], | |
'safety': [], | |
'scholarships': [], | |
'improvement_areas': [] | |
} | |
def generate_study_plan(self, parsed_data: Dict, learning_style: str) -> Dict: | |
"""Generate personalized study plan based on learning style and courses""" | |
plan = { | |
'weekly_schedule': {}, | |
'study_strategies': [], | |
'time_management_tips': [], | |
'resource_recommendations': [] | |
} | |
try: | |
# Get current courses | |
current_courses = [ | |
course for course in parsed_data.get('course_history', []) | |
if course.get('status', '').lower() == 'in progress' | |
] | |
# Generate weekly schedule template | |
days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] | |
for day in days: | |
plan['weekly_schedule'][day] = [] | |
# Add study blocks based on learning style | |
study_blocks = 2 # Default | |
if learning_style.lower() == 'visual': | |
study_blocks = 3 | |
plan['study_strategies'].extend([ | |
"Create colorful mind maps for each subject", | |
"Use flashcards with images and diagrams", | |
"Watch educational videos on topics" | |
]) | |
elif learning_style.lower() == 'auditory': | |
study_blocks = 2 | |
plan['study_strategies'].extend([ | |
"Record yourself explaining concepts and listen back", | |
"Participate in study groups", | |
"Listen to educational podcasts" | |
]) | |
elif learning_style.lower() == 'reading/writing': | |
study_blocks = 4 | |
plan['study_strategies'].extend([ | |
"Write detailed summaries in your own words", | |
"Create question-answer sets for each topic", | |
"Rewrite your notes to reinforce learning" | |
]) | |
elif learning_style.lower() == 'kinesthetic': | |
study_blocks = 3 | |
plan['study_strategies'].extend([ | |
"Create physical models or demonstrations", | |
"Study while walking or moving", | |
"Use hands-on activities when possible" | |
]) | |
# Distribute study blocks | |
for i, course in enumerate(current_courses): | |
day_index = i % 5 # Monday-Friday | |
day = days[day_index] | |
plan['weekly_schedule'][day].append({ | |
'course': course.get('description', 'Course'), | |
'duration': '45-60 minutes', | |
'activities': [ | |
"Review notes", | |
"Complete practice problems", | |
"Prepare questions for teacher" | |
] | |
}) | |
# Add time management tips | |
plan['time_management_tips'].extend([ | |
"Use the Pomodoro technique (25 min study, 5 min break)", | |
"Prioritize assignments by due date and importance", | |
"Schedule regular review sessions" | |
]) | |
# Add resource recommendations | |
plan['resource_recommendations'].extend([ | |
"Khan Academy for math and science", | |
"Quizlet for flashcards", | |
"Wolfram Alpha for math help" | |
]) | |
return plan | |
except: | |
return { | |
'weekly_schedule': {'Error': ["Could not generate schedule"]}, | |
'study_strategies': [], | |
'time_management_tips': [], | |
'resource_recommendations': [] | |
} | |
# Initialize academic analyzer | |
academic_analyzer = AcademicAnalyzer() | |
# ========== ENHANCED VISUALIZATION FUNCTIONS ========== | |
class DataVisualizer: | |
def __init__(self): | |
self.color_palette = { | |
'complete': '#4CAF50', | |
'incomplete': '#F44336', | |
'in_progress': '#FFC107', | |
'gpa_weighted': '#3F51B5', | |
'gpa_unweighted': '#9C27B0', | |
'core': '#3498DB', | |
'electives': '#2ECC71', | |
'arts_pe': '#9B59B6' | |
} | |
def create_gpa_visualization(self, parsed_data: Dict): | |
"""Enhanced GPA visualization with more details""" | |
try: | |
gpa_data = { | |
"Type": ["Weighted GPA", "Unweighted GPA"], | |
"Value": [ | |
float(parsed_data.get('student_info', {}).get('weighted_gpa', 0)), | |
float(parsed_data.get('student_info', {}).get('unweighted_gpa', 0)) | |
], | |
"Color": [self.color_palette['gpa_weighted'], self.color_palette['gpa_unweighted']] | |
} | |
df = pd.DataFrame(gpa_data) | |
fig = px.bar( | |
df, | |
x="Type", | |
y="Value", | |
title="GPA Comparison", | |
color="Type", | |
color_discrete_map={ | |
"Weighted GPA": self.color_palette['gpa_weighted'], | |
"Unweighted GPA": self.color_palette['gpa_unweighted'] | |
}, | |
text="Value", | |
hover_data={"Type": True, "Value": ":.2f"} | |
) | |
# Add reference lines and annotations | |
fig.add_hline(y=4.0, line_dash="dot", line_color="green", annotation_text="Excellent", annotation_position="top left") | |
fig.add_hline(y=3.0, line_dash="dot", line_color="orange", annotation_text="Good", annotation_position="top left") | |
fig.add_hline(y=2.0, line_dash="dot", line_color="red", annotation_text="Minimum", annotation_position="top left") | |
fig.update_traces( | |
texttemplate='%{text:.2f}', | |
textposition='outside', | |
marker_line_color='rgb(8,48,107)', | |
marker_line_width=1.5 | |
) | |
fig.update_layout( | |
yaxis_range=[0, 5], | |
uniformtext_minsize=8, | |
uniformtext_mode='hide', | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)', | |
font=dict(size=12) | |
) | |
return fig | |
except Exception as e: | |
logger.error(f"Error creating GPA visualization: {str(e)}") | |
return None | |
def create_requirements_visualization(self, parsed_data: Dict): | |
"""Enhanced requirements visualization with interactive elements""" | |
try: | |
req_data = [] | |
for code, req in parsed_data.get('requirements', {}).items(): | |
if req and req.get('percent_complete'): | |
completion = float(req['percent_complete']) | |
req_data.append({ | |
"Requirement": f"{code}: {req.get('description', '')[:30]}...", | |
"Completion (%)": completion, | |
"Status": "Complete" if completion >= 100 else "In Progress" if completion > 0 else "Not Started", | |
"Required": req.get('required', 0), | |
"Completed": req.get('completed', 0), | |
"Remaining": max(0, float(req.get('required', 0)) - float(req.get('completed', 0))) | |
}) | |
if not req_data: | |
return None | |
df = pd.DataFrame(req_data) | |
fig = px.bar( | |
df, | |
x="Requirement", | |
y="Completion (%)", | |
title="Graduation Requirements Completion", | |
color="Status", | |
color_discrete_map={ | |
"Complete": self.color_palette['complete'], | |
"In Progress": self.color_palette['in_progress'], | |
"Not Started": self.color_palette['incomplete'] | |
}, | |
hover_data=["Required", "Completed", "Remaining"], | |
text="Completion (%)" | |
) | |
fig.update_traces( | |
texttemplate='%{text:.1f}%', | |
textposition='outside', | |
marker_line_color='rgb(8,48,107)', | |
marker_line_width=1.5 | |
) | |
fig.update_layout( | |
xaxis={'categoryorder':'total descending'}, | |
yaxis_range=[0, 100], | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)', | |
font=dict(size=12), | |
hovermode="x unified" | |
) | |
fig.add_hline(y=100, line_dash="dot", line_color="green") | |
return fig | |
except Exception as e: | |
logger.error(f"Error creating requirements visualization: {str(e)}") | |
return None | |
def create_credits_distribution_visualization(self, parsed_data: Dict): | |
"""Enhanced credits distribution visualization""" | |
try: | |
core_credits = sum( | |
req['completed'] for req in parsed_data.get('requirements', {}).values() | |
if req and req.get('code', '').split('-')[0] in ['A', 'B', 'C', 'D'] # English, Math, Science, Social Studies | |
) | |
elective_credits = sum( | |
req['completed'] for req in parsed_data.get('requirements', {}).values() | |
if req and req.get('code', '').split('-')[0] in ['G', 'H'] # Electives | |
) | |
other_credits = sum( | |
req['completed'] for req in parsed_data.get('requirements', {}).values() | |
if req and req.get('code', '').split('-')[0] in ['E', 'F'] # Arts, PE | |
) | |
credit_values = [core_credits, elective_credits, other_credits] | |
credit_labels = ['Core Subjects', 'Electives', 'Arts/PE'] | |
if sum(credit_values) == 0: | |
return None | |
df = pd.DataFrame({ | |
"Category": credit_labels, | |
"Credits": credit_values, | |
"Color": [self.color_palette['core'], self.color_palette['electives'], self.color_palette['arts_pe']] | |
}) | |
fig = px.pie( | |
df, | |
values="Credits", | |
names="Category", | |
title="Credit Distribution", | |
color="Category", | |
color_discrete_map={ | |
"Core Subjects": self.color_palette['core'], | |
"Electives": self.color_palette['electives'], | |
"Arts/PE": self.color_palette['arts_pe'] | |
}, | |
hole=0.3 | |
) | |
fig.update_traces( | |
textposition='inside', | |
textinfo='percent+label', | |
marker=dict(line=dict(color='#FFFFFF', width=2)) | |
) | |
fig.update_layout( | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)', | |
font=dict(size=12), | |
showlegend=False | |
) | |
return fig | |
except Exception as e: | |
logger.error(f"Error creating credits visualization: {str(e)}") | |
return None | |
def create_course_rigor_visualization(self, parsed_data: Dict): | |
"""Visualization of course rigor analysis""" | |
try: | |
rigor = academic_analyzer.analyze_course_rigor(parsed_data) | |
data = { | |
"Type": ["AP", "IB", "DE", "Honors"], | |
"Count": [rigor['ap_courses'], rigor['ib_courses'], rigor['de_courses'], rigor['honors_courses']], | |
"Color": ["#E91E63", "#673AB7", "#009688", "#FF9800"] | |
} | |
df = pd.DataFrame(data) | |
fig = px.bar( | |
df, | |
x="Type", | |
y="Count", | |
title="Advanced Course Breakdown", | |
color="Type", | |
color_discrete_map={ | |
"AP": "#E91E63", | |
"IB": "#673AB7", | |
"DE": "#009688", | |
"Honors": "#FF9800" | |
}, | |
text="Count" | |
) | |
fig.update_traces( | |
textposition='outside', | |
marker_line_color='rgb(8,48,107)', | |
marker_line_width=1.5 | |
) | |
fig.update_layout( | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)', | |
font=dict(size=12), | |
xaxis_title="Course Type", | |
yaxis_title="Number of Courses" | |
) | |
return fig | |
except Exception as e: | |
logger.error(f"Error creating course rigor visualization: {str(e)}") | |
return None | |
# Initialize visualizer | |
data_visualizer = DataVisualizer() | |
# ========== ENHANCED PROFILE MANAGEMENT ========== | |
class EnhancedProfileManager: | |
def __init__(self): | |
self.profiles_dir = Path(PROFILES_DIR) | |
self.profiles_dir.mkdir(exist_ok=True, parents=True) | |
self.current_session = None | |
self.encryptor = DataEncryptor(ENCRYPTION_KEY) | |
def set_session(self, session_token: str) -> None: | |
self.current_session = session_token | |
def get_profile_path(self, name: str) -> Path: | |
name_hash = hashlib.sha256(name.encode()).hexdigest()[:16] | |
if self.current_session: | |
return self.profiles_dir / f"{name_hash}_{self.current_session}_profile.json" | |
return self.profiles_dir / f"{name_hash}_profile.json" | |
def save_profile(self, name: str, age: Union[int, str], interests: str, | |
transcript: Dict, learning_style: str, | |
movie: str, movie_reason: str, show: str, show_reason: str, | |
book: str, book_reason: str, character: str, character_reason: str, | |
blog: str, study_plan: Dict = None) -> str: | |
"""Enhanced profile saving with encryption and validation""" | |
try: | |
name = validate_name(name) | |
age = validate_age(age) | |
if not interests.strip(): | |
raise ValueError("Please describe at least one interest or hobby.") | |
if not transcript: | |
raise ValueError("Please complete the transcript analysis first.") | |
if not learning_style or "Your primary learning style is" not in learning_style: | |
raise ValueError("Please complete the learning style quiz first.") | |
# Prepare favorites with sanitization | |
favorites = { | |
"movie": sanitize_input(movie), | |
"movie_reason": sanitize_input(movie_reason), | |
"show": sanitize_input(show), | |
"show_reason": sanitize_input(show_reason), | |
"book": sanitize_input(book), | |
"book_reason": sanitize_input(book_reason), | |
"character": sanitize_input(character), | |
"character_reason": sanitize_input(character_reason) | |
} | |
# Generate study plan if not provided | |
if not study_plan: | |
learning_style_match = re.search(r"Your primary learning style is\s*\*\*(.*?)\*\*", learning_style) | |
if learning_style_match: | |
study_plan = academic_analyzer.generate_study_plan( | |
transcript, | |
learning_style_match.group(1)) | |
# Prepare data with encryption for sensitive fields | |
data = { | |
"name": self.encryptor.encrypt(name), | |
"age": age, | |
"interests": self.encryptor.encrypt(sanitize_input(interests)), | |
"transcript": transcript, # Already sanitized during parsing | |
"learning_style": learning_style, | |
"favorites": favorites, | |
"blog": self.encryptor.encrypt(sanitize_input(blog)) if blog else "", | |
"study_plan": study_plan if study_plan else {}, | |
"session_token": self.current_session, | |
"last_updated": time.time(), | |
"version": "2.0" # Profile version for compatibility | |
} | |
filepath = self.get_profile_path(name) | |
# Save with atomic write | |
temp_path = filepath.with_suffix('.tmp') | |
with open(temp_path, "w", encoding='utf-8') as f: | |
json.dump(data, f, indent=2, ensure_ascii=False) | |
temp_path.replace(filepath) # Atomic replace | |
# Optional cloud backup | |
if HF_TOKEN and hf_api: | |
try: | |
hf_api.upload_file( | |
path_or_fileobj=filepath, | |
path_in_repo=f"profiles/{filepath.name}", | |
repo_id="your-username/student-learning-assistant", | |
repo_type="dataset", | |
commit_message=f"Profile update for {name}" | |
) | |
except Exception as e: | |
logger.error(f"Failed to upload to HF Hub: {str(e)}") | |
return f"Profile saved successfully for {name}." | |
except Exception as e: | |
logger.error(f"Profile save error: {str(e)}") | |
raise gr.Error(f"Couldn't save profile: {str(e)}") | |
def load_profile(self, name: str = None, session_token: str = None) -> Dict: | |
"""Enhanced profile loading with decryption and retries""" | |
for attempt in range(MAX_PROFILE_LOAD_ATTEMPTS): | |
try: | |
if session_token: | |
profile_pattern = f"*{session_token}_profile.json" | |
else: | |
profile_pattern = "*.json" | |
profiles = list(self.profiles_dir.glob(profile_pattern)) | |
if not profiles: | |
return {} | |
if name: | |
profile_file = self.get_profile_path(name) | |
if not profile_file.exists(): | |
# Try to download from Hugging Face Hub | |
if HF_TOKEN and hf_api: | |
try: | |
hf_api.download_file( | |
path_in_repo=f"profiles/{profile_file.name}", | |
repo_id="your-username/student-learning-assistant", | |
repo_type="dataset", | |
local_dir=self.profiles_dir | |
) | |
except Exception as e: | |
logger.warning(f"Failed to download profile: {str(e)}") | |
raise gr.Error(f"No profile found for {name}") | |
else: | |
raise gr.Error(f"No profile found for {name}") | |
else: | |
# Load most recently modified profile | |
profiles.sort(key=lambda x: x.stat().st_mtime, reverse=True) | |
profile_file = profiles[0] | |
with open(profile_file, "r", encoding='utf-8') as f: | |
profile_data = json.load(f) | |
# Check session timeout | |
if time.time() - profile_data.get('last_updated', 0) > SESSION_TIMEOUT: | |
raise gr.Error("Session expired. Please start a new session.") | |
# Decrypt encrypted fields | |
if profile_data.get('version', '1.0') == '2.0': | |
try: | |
profile_data['name'] = self.encryptor.decrypt(profile_data['name']) | |
profile_data['interests'] = self.encryptor.decrypt(profile_data.get('interests', '')) | |
if profile_data.get('blog'): | |
profile_data['blog'] = self.encryptor.decrypt(profile_data['blog']) | |
except Exception as e: | |
logger.error(f"Decryption error: {str(e)}") | |
raise gr.Error("Failed to decrypt profile data") | |
return profile_data | |
except json.JSONDecodeError as e: | |
if attempt == MAX_PROFILE_LOAD_ATTEMPTS - 1: | |
logger.error(f"Failed to load profile after {MAX_PROFILE_LOAD_ATTEMPTS} attempts") | |
raise gr.Error("Corrupted profile data") | |
time.sleep(0.5 * (attempt + 1)) | |
except Exception as e: | |
if attempt == MAX_PROFILE_LOAD_ATTEMPTS - 1: | |
raise | |
time.sleep(0.5 * (attempt + 1)) | |
def list_profiles(self, session_token: str = None) -> List[str]: | |
"""List available profiles with decrypted names""" | |
if session_token: | |
profiles = list(self.profiles_dir.glob(f"*{session_token}_profile.json")) | |
else: | |
profiles = list(self.profiles_dir.glob("*.json")) | |
profile_names = [] | |
for p in profiles: | |
try: | |
with open(p, "r", encoding='utf-8') as f: | |
data = json.load(f) | |
if data.get('version', '1.0') == '2.0': | |
try: | |
name = self.encryptor.decrypt(data['name']) | |
profile_names.append(name) | |
except: | |
profile_names.append(p.stem) | |
else: | |
profile_names.append(data.get('name', p.stem)) | |
except: | |
continue | |
return profile_names | |
def delete_profile(self, name: str, session_token: str = None) -> bool: | |
"""Delete a profile with verification""" | |
try: | |
profile_file = self.get_profile_path(name) | |
if not profile_file.exists(): | |
return False | |
# Verify the profile belongs to the current session | |
with open(profile_file, "r", encoding='utf-8') as f: | |
data = json.load(f) | |
if session_token and data.get('session_token') != session_token: | |
return False | |
# Delete local file | |
profile_file.unlink() | |
# Try to delete from Hugging Face Hub | |
if HF_TOKEN and hf_api: | |
try: | |
hf_api.delete_file( | |
path_in_repo=f"profiles/{profile_file.name}", | |
repo_id="your-username/student-learning-assistant", | |
repo_type="dataset" | |
) | |
except Exception as e: | |
logger.error(f"Failed to delete from HF Hub: {str(e)}") | |
return True | |
except Exception as e: | |
logger.error(f"Error deleting profile: {str(e)}") | |
return False | |
# Initialize enhanced profile manager | |
profile_manager = EnhancedProfileManager() | |
# ========== ENHANCED AI TEACHING ASSISTANT ========== | |
class EnhancedTeachingAssistant: | |
def __init__(self): | |
self.context_history = [] | |
self.max_context_length = MAX_CONTEXT_HISTORY | |
self.model, self.tokenizer = None, None | |
self.last_model_load_attempt = 0 | |
async def initialize_model(self): | |
"""Lazy initialize the model with retries""" | |
if not self.model or not self.tokenizer: | |
if time.time() - self.last_model_load_attempt > 3600: # Retry every hour if failed | |
self.model, self.tokenizer = get_model_and_tokenizer() | |
self.last_model_load_attempt = time.time() | |
async def generate_response(self, message: str, history: List[List[Union[str, None]]], session_token: str) -> str: | |
"""Enhanced response generation with context awareness""" | |
try: | |
await self.initialize_model() | |
profile = profile_manager.load_profile(session_token=session_token) | |
if not profile: | |
return "Please complete and save your profile first to get personalized advice." | |
self._update_context(message, history) | |
# Get relevant profile information | |
student_name = profile.get('name', 'Student') | |
gpa = profile.get('transcript', {}).get('student_info', {}).get('weighted_gpa', None) | |
learning_style = re.search(r"Your primary learning style is\s*\*\*(.*?)\*\*", | |
profile.get('learning_style', '')) | |
learning_style = learning_style.group(1) if learning_style else None | |
# Prepare context for the model | |
context = f"You are an AI teaching assistant helping {student_name}. " | |
if gpa: | |
context += f"{student_name}'s current weighted GPA is {gpa}. " | |
if learning_style: | |
context += f"They are a {learning_style.lower()} learner. " | |
# Add recent conversation history | |
if self.context_history: | |
context += "Recent conversation:\n" | |
for item in self.context_history[-self.max_context_length:]: | |
role = "Student" if item['role'] == 'user' else "Assistant" | |
context += f"{role}: {item['content']}\n" | |
# Generate response based on query type | |
query_type = self._classify_query(message) | |
response = await self._generate_typed_response(query_type, message, context, profile) | |
return response | |
except Exception as e: | |
logger.error(f"Error generating response: {str(e)}") | |
return "I encountered an error processing your request. Please try again." | |
def _classify_query(self, message: str) -> str: | |
"""Classify the type of user query""" | |
message_lower = message.lower() | |
if any(word in message_lower for word in ['gpa', 'grade', 'average']): | |
return 'gpa' | |
elif any(word in message_lower for word in ['study', 'learn', 'exam', 'test']): | |
return 'study' | |
elif any(word in message_lower for word in ['course', 'class', 'subject']): | |
return 'courses' | |
elif any(word in message_lower for word in ['college', 'university', 'apply']): | |
return 'college' | |
elif any(word in message_lower for word in ['plan', 'schedule', 'calendar']): | |
return 'planning' | |
elif any(word in message_lower for word in ['resource', 'book', 'website']): | |
return 'resources' | |
else: | |
return 'general' | |
async def _generate_typed_response(self, query_type: str, message: str, context: str, profile: Dict) -> str: | |
"""Generate response based on query type""" | |
if query_type == 'gpa': | |
return self._generate_gpa_response(profile) | |
elif query_type == 'study': | |
return self._generate_study_response(profile) | |
elif query_type == 'courses': | |
return self._generate_courses_response(profile) | |
elif query_type == 'college': | |
return self._generate_college_response(profile) | |
elif query_type == 'planning': | |
return self._generate_planning_response(profile) | |
elif query_type == 'resources': | |
return self._generate_resources_response(profile) | |
else: | |
return await self._generate_general_response(message, context) | |
def _generate_gpa_response(self, profile: Dict) -> str: | |
"""Generate response about GPA""" | |
gpa = profile.get('transcript', {}).get('student_info', {}).get('weighted_gpa', None) | |
if not gpa: | |
return "I couldn't find your GPA information. Please upload your transcript first." | |
analysis = academic_analyzer.analyze_gpa(profile['transcript']) | |
response = [ | |
f"## π GPA Analysis", | |
f"**Rating:** {analysis['rating']}", | |
f"{analysis['description']}", | |
f"{analysis['comparison']}", | |
"", | |
f"## π Graduation Status", | |
analysis['status'], | |
f"**Completion:** {analysis['completion_percentage']:.1f}%", | |
"", | |
f"## π« College Recommendations" | |
] | |
if analysis.get('improvement_tips'): | |
response.append("\n**Improvement Tips:**") | |
response.extend([f"- {tip}" for tip in analysis['improvement_tips']]) | |
return "\n\n".join(response) | |
def _generate_study_response(self, profile: Dict) -> str: | |
"""Generate study advice based on learning style""" | |
learning_style_match = re.search(r"Your primary learning style is\s*\*\*(.*?)\*\*", | |
profile.get('learning_style', '')) | |
if not learning_style_match: | |
return "Please complete the learning style quiz first to get personalized study advice." | |
learning_style = learning_style_match.group(1) | |
study_plan = profile.get('study_plan', {}) | |
response = [ | |
f"As a **{learning_style}** learner, here are some study strategies for you:" | |
] | |
if study_plan.get('study_strategies'): | |
response.extend([f"- {strategy}" for strategy in study_plan['study_strategies']]) | |
else: | |
# Fallback if no study plan | |
if learning_style.lower() == 'visual': | |
response.extend([ | |
"- Use color coding in your notes", | |
"- Create mind maps and diagrams", | |
"- Watch educational videos to visualize concepts" | |
]) | |
elif learning_style.lower() == 'auditory': | |
response.extend([ | |
"- Record lectures and listen to them", | |
"- Explain concepts out loud to yourself", | |
"- Participate in study groups" | |
]) | |
elif learning_style.lower() == 'reading/writing': | |
response.extend([ | |
"- Write detailed summaries in your own words", | |
"- Create question-answer sets for each topic", | |
"- Rewrite your notes to reinforce learning" | |
]) | |
elif learning_style.lower() == 'kinesthetic': | |
response.extend([ | |
"- Use hands-on activities when possible", | |
"- Study while moving or pacing", | |
"- Create physical models to represent concepts" | |
]) | |
if study_plan.get('time_management_tips'): | |
response.append("\n**Time Management Tips:**") | |
response.extend([f"- {tip}" for tip in study_plan['time_management_tips']]) | |
return "\n\n".join(response) | |
def _generate_courses_response(self, profile: Dict) -> str: | |
"""Generate response about current/past courses""" | |
transcript = profile.get('transcript', {}) | |
if not transcript.get('course_history'): | |
return "I couldn't find your course information. Please upload your transcript first." | |
# Get current courses (in progress) | |
current_courses = [ | |
course for course in transcript['course_history'] | |
if course.get('status', '').lower() == 'in progress' | |
] | |
# Get past completed courses | |
completed_courses = [ | |
course for course in transcript['course_history'] | |
if course.get('status', '').lower() == 'completed' | |
] | |
response = [] | |
if current_courses: | |
response.append("**Your Current Courses:**") | |
for course in current_courses[:5]: # Limit to 5 courses | |
response.append( | |
f"- {course.get('description', 'Unknown')} " | |
f"({course.get('course_code', '')})" | |
) | |
else: | |
response.append("I couldn't find any current courses in your transcript.") | |
if completed_courses: | |
response.append("\n**Recently Completed Courses:**") | |
for course in completed_courses[:5]: # Limit to 5 courses | |
grade = course.get('grade_earned', '') | |
if grade: | |
response.append( | |
f"- {course.get('description', 'Unknown')} " | |
f"(Grade: {grade})" | |
) | |
else: | |
response.append(f"- {course.get('description', 'Unknown')}") | |
# Add rigor analysis | |
rigor = academic_analyzer.analyze_course_rigor(transcript) | |
if rigor['rating']: | |
response.append(f"\n**Course Rigor Analysis:** {rigor['rating']}") | |
if rigor['recommendations']: | |
response.append("\n**Recommendations:**") | |
response.extend([f"- {rec}" for rec in rigor['recommendations']]) | |
return "\n".join(response) | |
def _generate_college_response(self, profile: Dict) -> str: | |
"""Generate college recommendations""" | |
recommendations = academic_analyzer.generate_college_recommendations(profile.get('transcript', {})) | |
response = ["**College Recommendations Based on Your Profile:**"] | |
if recommendations['reach']: | |
response.append("\n**Reach Schools (Competitive):**") | |
response.extend([f"- {school}" for school in recommendations['reach'][:3]]) | |
if recommendations['target']: | |
response.append("\n**Target Schools (Good Match):**") | |
response.extend([f"- {school}" for school in recommendations['target'][:3]]) | |
if recommendations['safety']: | |
response.append("\n**Safety Schools (Likely Admission):**") | |
response.extend([f"- {school}" for school in recommendations['safety'][:3]]) | |
if recommendations['scholarships']: | |
response.append("\n**Scholarship Opportunities:**") | |
response.extend([f"- {scholarship}" for scholarship in recommendations['scholarships'][:3]]) | |
if recommendations['improvement_areas']: | |
response.append("\n**Areas to Improve for College Admissions:**") | |
response.extend([f"- {area}" for area in recommendations['improvement_areas']]) | |
return "\n".join(response) | |
def _generate_planning_response(self, profile: Dict) -> str: | |
"""Generate study/schedule planning advice""" | |
study_plan = profile.get('study_plan', {}) | |
response = ["**Study Planning Advice:**"] | |
if study_plan.get('weekly_schedule'): | |
response.append("\nHere's a suggested weekly study schedule:") | |
for day, activities in study_plan['weekly_schedule'].items(): | |
if activities: | |
response.append(f"\n**{day}:**") | |
for activity in activities[:2]: # Show 2 activities per day max | |
response.append( | |
f"- {activity.get('course', 'Course')}: " | |
f"{activity.get('duration', '45-60 minutes')}" | |
) | |
else: | |
response.append("\nA good study schedule should include:") | |
response.append("- 45-60 minute study blocks with short breaks") | |
response.append("- Focus on 1-2 subjects per day") | |
response.append("- Regular review sessions") | |
if study_plan.get('time_management_tips'): | |
response.append("\n**Time Management Tips:**") | |
response.extend([f"- {tip}" for tip in study_plan['time_management_tips'][:3]]) | |
return "\n".join(response) | |
def _generate_resources_response(self, profile: Dict) -> str: | |
"""Generate resource recommendations""" | |
study_plan = profile.get('study_plan', {}) | |
transcript = profile.get('transcript', {}) | |
response = ["**Recommended Learning Resources:**"] | |
# General resources | |
if study_plan.get('resource_recommendations'): | |
response.extend([f"- {resource}" for resource in study_plan['resource_recommendations'][:3]]) | |
else: | |
response.extend([ | |
"- Khan Academy (free lessons on many subjects)", | |
"- Quizlet (flashcards and study tools)", | |
"- Wolfram Alpha for math help" | |
]) | |
# Subject-specific resources | |
current_courses = [ | |
course for course in transcript.get('course_history', []) | |
if course.get('status', '').lower() == 'in progress' | |
] | |
if current_courses: | |
response.append("\n**Course-Specific Resources:**") | |
for course in current_courses[:2]: # Limit to 2 courses | |
course_name = course.get('description', 'your course') | |
if 'MATH' in course_name.upper(): | |
response.append(f"- For {course_name}: Desmos Graphing Calculator, Art of Problem Solving") | |
elif 'SCIENCE' in course_name.upper(): | |
response.append(f"- For {course_name}: PhET Simulations, Crash Course Science videos") | |
elif 'HISTORY' in course_name.upper(): | |
response.append(f"- For {course_name}: Crash Course History videos, Library of Congress resources") | |
return "\n".join(response) | |
async def _generate_general_response(self, message: str, context: str) -> str: | |
"""Generate response using the language model""" | |
if not self.model or not self.tokenizer: | |
return "I'm still loading my knowledge base. Please try again in a moment." | |
try: | |
prompt = f"{context}\nStudent: {message}\nAssistant:" | |
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) | |
# Generate response with more controlled parameters | |
outputs = self.model.generate( | |
**inputs, | |
max_new_tokens=200, | |
temperature=0.7, | |
top_p=0.9, | |
repetition_penalty=1.1, | |
do_sample=True | |
) | |
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Extract just the assistant's response | |
response = response[len(prompt):].strip() | |
# Clean up any incomplete sentences | |
if response and response[-1] not in {'.', '!', '?'}: | |
last_period = response.rfind('.') | |
if last_period > 0: | |
response = response[:last_period + 1] | |
return response if response else "I'm not sure how to respond to that. Could you rephrase your question?" | |
except Exception as e: | |
logger.error(f"Model generation error: {str(e)}") | |
return "I encountered an error generating a response. Please try again." | |
def _update_context(self, message: str, history: List[List[Union[str, None]]]) -> None: | |
"""Update conversation context""" | |
self.context_history.append({"role": "user", "content": message}) | |
if history: | |
for h in history[-self.max_context_length:]: | |
if h[0]: | |
self.context_history.append({"role": "user", "content": h[0]}) | |
if h[1]: | |
self.context_history.append({"role": "assistant", "content": h[1]}) | |
# Trim to max context length | |
self.context_history = self.context_history[-(self.max_context_length * 2):] | |
# Initialize enhanced teaching assistant | |
teaching_assistant = EnhancedTeachingAssistant() | |
# ========== STUDY CALENDAR INTEGRATION ========== | |
class StudyCalendar: | |
def __init__(self): | |
self.calendar_events = {} | |
def generate_study_calendar(self, profile: Dict, start_date: str = None, weeks: int = 4) -> Dict: | |
"""Generate a study calendar for the given profile""" | |
try: | |
if not start_date: | |
start_date = datetime.date.today().isoformat() | |
start_date = datetime.date.fromisoformat(start_date) | |
study_plan = profile.get('study_plan', {}) | |
calendar = { | |
'start_date': start_date.isoformat(), | |
'end_date': (start_date + datetime.timedelta(weeks=weeks)).isoformat(), | |
'events': [], | |
'exams': [], | |
'assignments': [] | |
} | |
# Add study sessions from the study plan | |
if study_plan.get('weekly_schedule'): | |
for day_offset in range(weeks * 7): | |
current_date = start_date + datetime.timedelta(days=day_offset) | |
day_name = calendar.day_name[current_date.weekday()] | |
if day_name in study_plan['weekly_schedule']: | |
for session in study_plan['weekly_schedule'][day_name]: | |
calendar['events'].append({ | |
'date': current_date.isoformat(), | |
'title': f"Study {session.get('course', '')}", | |
'description': "\n".join(session.get('activities', [])), | |
'duration': session.get('duration', '45-60 minutes'), | |
'type': 'study' | |
}) | |
# Add exam dates from transcript (if available) | |
transcript = profile.get('transcript', {}) | |
if transcript.get('course_history'): | |
for course in transcript['course_history']: | |
if course.get('status', '').lower() == 'in progress': | |
# Simulate some exam dates (in a real app, these would come from the school calendar) | |
midterm_date = (start_date + datetime.timedelta(weeks=2)).isoformat() | |
final_date = (start_date + datetime.timedelta(weeks=weeks - 1)).isoformat() | |
calendar['exams'].append({ | |
'date': midterm_date, | |
'title': f"{course.get('description', 'Course')} Midterm", | |
'course': course.get('description', ''), | |
'type': 'exam' | |
}) | |
calendar['exams'].append({ | |
'date': final_date, | |
'title': f"{course.get('description', 'Course')} Final", | |
'course': course.get('description', ''), | |
'type': 'exam' | |
}) | |
return calendar | |
except Exception as e: | |
logger.error(f"Error generating calendar: {str(e)}") | |
return { | |
'start_date': datetime.date.today().isoformat(), | |
'end_date': (datetime.date.today() + datetime.timedelta(weeks=4)).isoformat(), | |
'events': [], | |
'exams': [], | |
'assignments': [] | |
} | |
def create_calendar_visualization(self, calendar_data: Dict) -> Optional[plt.Figure]: | |
"""Create a visualization of the study calendar""" | |
try: | |
import matplotlib.pyplot as plt | |
from matplotlib.patches import Rectangle | |
# Prepare data | |
start_date = datetime.date.fromisoformat(calendar_data['start_date']) | |
end_date = datetime.date.fromisoformat(calendar_data['end_date']) | |
days = (end_date - start_date).days + 1 | |
# Create figure | |
fig, ax = plt.subplots(figsize=(12, 6)) | |
# Draw week grid | |
for i in range(0, days, 7): | |
ax.add_patch(Rectangle((i, 0), 7, 1, color='#f5f5f5')) | |
# Add study events | |
for event in calendar_data['events']: | |
event_date = datetime.date.fromisoformat(event['date']) | |
day_offset = (event_date - start_date).days | |
ax.add_patch(Rectangle((day_offset, 0.7), 1, 0.3, color='#4CAF50')) | |
# Add exams | |
for exam in calendar_data['exams']: | |
exam_date = datetime.date.fromisoformat(exam['date']) | |
day_offset = (exam_date - start_date).days | |
ax.add_patch(Rectangle((day_offset, 0.3), 1, 0.3, color='#F44336')) | |
# Configure axes | |
ax.set_xlim(0, days) | |
ax.set_ylim(0, 1) | |
ax.set_xticks(range(0, days, 7)) | |
ax.set_xticklabels([(start_date + datetime.timedelta(days=x)).strftime('%b %d') | |
for x in range(0, days, 7)]) | |
ax.set_yticks([0.5]) | |
ax.set_yticklabels(['Study Calendar']) | |
# Add legend | |
ax.add_patch(Rectangle((days-5, 0.7), 1, 0.3, color='#4CAF50')) | |
ax.text(days-3.5, 0.85, 'Study Sessions', va='center') | |
ax.add_patch(Rectangle((days-5, 0.3), 1, 0.3, color='#F44336')) | |
ax.text(days-3.5, 0.45, 'Exams', va='center') | |
plt.title(f"Study Calendar: {start_date.strftime('%b %d')} to {end_date.strftime('%b %d')}") | |
plt.tight_layout() | |
return fig | |
except Exception as e: | |
logger.error(f"Error creating calendar visualization: {str(e)}") | |
return None | |
# Initialize study calendar | |
study_calendar = StudyCalendar() | |
# ========== GOAL TRACKING SYSTEM ========== | |
class GoalTracker: | |
def __init__(self): | |
self.goals = {} | |
def add_goal(self, profile_name: str, goal_type: str, description: str, | |
target_date: str, target_value: float = None) -> bool: | |
"""Add a new goal for the student""" | |
try: | |
goal_id = hashlib.sha256(f"{profile_name}{goal_type}{description}{time.time()}".encode()).hexdigest()[:16] | |
self.goals[goal_id] = { | |
'profile_name': profile_name, | |
'type': goal_type, | |
'description': description, | |
'target_date': target_date, | |
'target_value': target_value, | |
'created': time.time(), | |
'progress': [], | |
'completed': False | |
} | |
return True | |
except Exception as e: | |
logger.error(f"Error adding goal: {str(e)}") | |
return False | |
def update_goal_progress(self, goal_id: str, progress_value: float, notes: str = "") -> bool: | |
"""Update progress toward a goal""" | |
try: | |
if goal_id not in self.goals: | |
return False | |
self.goals[goal_id]['progress'].append({ | |
'date': time.time(), | |
'value': progress_value, | |
'notes': notes | |
}) | |
# Check if goal is completed | |
if self.goals[goal_id].get('target_value') is not None: | |
if progress_value >= self.goals[goal_id]['target_value']: | |
self.goals[goal_id]['completed'] = True | |
return True | |
except Exception as e: | |
logger.error(f"Error updating goal: {str(e)}") | |
return False | |
def get_goals(self, profile_name: str) -> List[Dict]: | |
"""Get all goals for a student""" | |
return [ | |
{**goal, 'id': goal_id} | |
for goal_id, goal in self.goals.items() | |
if goal['profile_name'] == profile_name | |
] | |
def create_goal_visualization(self, goals: List[Dict]) -> Optional[plt.Figure]: | |
"""Create a visualization of goal progress""" | |
try: | |
import matplotlib.pyplot as plt | |
if not goals: | |
return None | |
# Prepare data | |
goal_names = [goal['description'][:20] + ('...' if len(goal['description']) > 20 else '') | |
for goal in goals] | |
progress_values = [ | |
goal['progress'][-1]['value'] if goal['progress'] else 0 | |
for goal in goals | |
] | |
target_values = [ | |
goal['target_value'] if goal['target_value'] is not None else progress_values[i] | |
for i, goal in enumerate(goals) | |
] | |
# Create figure | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
# Plot bars | |
x = range(len(goals)) | |
bar_width = 0.35 | |
progress_bars = ax.bar( | |
[i - bar_width/2 for i in x], | |
progress_values, | |
bar_width, | |
label='Current Progress', | |
color='#4CAF50' | |
) | |
target_bars = ax.bar( | |
[i + bar_width/2 for i in x], | |
target_values, | |
bar_width, | |
label='Target', | |
color='#2196F3' | |
) | |
# Add labels and title | |
ax.set_xlabel('Goals') | |
ax.set_ylabel('Progress') | |
ax.set_title('Goal Progress Tracking') | |
ax.set_xticks(x) | |
ax.set_xticklabels(goal_names, rotation=45, ha='right') | |
ax.legend() | |
# Add value labels | |
for bar in progress_bars: | |
height = bar.get_height() | |
ax.annotate(f'{height:.1f}', | |
xy=(bar.get_x() + bar.get_width() / 2, height), | |
xytext=(0, 3), | |
textcoords="offset points", | |
ha='center', va='bottom') | |
for bar in target_bars: | |
height = bar.get_height() | |
ax.annotate(f'{height:.1f}', | |
xy=(bar.get_x() + bar.get_width() / 2, height), | |
xytext=(0, 3), | |
textcoords="offset points", | |
ha='center', va='bottom') | |
plt.tight_layout() | |
return fig | |
except Exception as e: | |
logger.error(f"Error creating goal visualization: {str(e)}") | |
return None | |
# Initialize goal tracker | |
goal_tracker = GoalTracker() | |
# ========== ENHANCED GRADIO INTERFACE ========== | |
def create_enhanced_interface(): | |
with gr.Blocks(theme=gr.themes.Soft(), title="Student Learning Assistant") as app: | |
session_token = gr.State(value=generate_session_token()) | |
profile_manager.set_session(session_token.value) | |
tab_completed = gr.State({ | |
0: False, # Transcript Upload | |
1: False, # Learning Style Quiz | |
2: False, # Personal Questions | |
3: False, # Save & Review | |
4: False, # AI Assistant | |
5: False # Goals & Planning | |
}) | |
# Custom CSS with enhanced styling | |
app.css = """ | |
.gradio-container { | |
max-width: 1200px !important; | |
margin: 0 auto !important; | |
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
} | |
.tab-content { | |
padding: 20px !important; | |
border: 1px solid #e0e0e0 !important; | |
border-radius: 8px !important; | |
margin-top: 10px !important; | |
background-color: white; | |
box-shadow: 0 2px 4px rgba(0,0,0,0.05); | |
} | |
.completed-tab { | |
background: #4CAF50 !important; | |
color: white !important; | |
font-weight: bold; | |
} | |
.incomplete-tab { | |
background: #E0E0E0 !important; | |
color: #616161; | |
} | |
.nav-message { | |
padding: 12px; | |
margin: 10px 0; | |
border-radius: 6px; | |
background-color: #ffebee; | |
color: #c62828; | |
border-left: 4px solid #c62828; | |
} | |
.file-upload { | |
border: 2px dashed #4CAF50 !important; | |
padding: 25px !important; | |
border-radius: 8px !important; | |
text-align: center; | |
background-color: #f8f8f8; | |
} | |
.file-upload:hover { | |
background: #f1f8e9; | |
} | |
.progress-bar { | |
height: 6px; | |
background: linear-gradient(to right, #4CAF50, #8BC34A); | |
margin-bottom: 15px; | |
border-radius: 3px; | |
box-shadow: inset 0 1px 2px rgba(0,0,0,0.1); | |
} | |
.quiz-question { | |
margin-bottom: 15px; | |
padding: 15px; | |
background: #f5f5f5; | |
border-radius: 5px; | |
border-left: 4px solid #2196F3; | |
} | |
.quiz-results { | |
margin-top: 20px; | |
padding: 20px; | |
background: #e8f5e9; | |
border-radius: 8px; | |
border-left: 4px solid #4CAF50; | |
} | |
.error-message { | |
color: #d32f2f; | |
background-color: #ffebee; | |
padding: 12px; | |
border-radius: 6px; | |
margin: 10px 0; | |
border-left: 4px solid #d32f2f; | |
} | |
.transcript-results { | |
border-left: 4px solid #4CAF50 !important; | |
padding: 15px !important; | |
background: #f8f8f8 !important; | |
border-radius: 4px; | |
} | |
.error-box { | |
border: 1px solid #ff4444 !important; | |
background: #fff8f8 !important; | |
border-radius: 4px; | |
} | |
.metric-box { | |
background-color: white; | |
border-radius: 10px; | |
padding: 15px; | |
margin: 10px 0; | |
box-shadow: 0 2px 5px rgba(0,0,0,0.1); | |
border-left: 4px solid #2196F3; | |
} | |
.recommendation { | |
background-color: #fff8e1; | |
padding: 10px; | |
border-left: 4px solid #ffc107; | |
margin: 5px 0; | |
border-radius: 4px; | |
} | |
.goal-card { | |
background-color: white; | |
border-radius: 8px; | |
padding: 15px; | |
margin: 10px 0; | |
box-shadow: 0 1px 3px rgba(0,0,0,0.1); | |
border-left: 4px solid #4CAF50; | |
} | |
.calendar-event { | |
background-color: #e3f2fd; | |
border-radius: 6px; | |
padding: 10px; | |
margin: 5px 0; | |
border-left: 4px solid #2196F3; | |
} | |
/* Dark mode styles */ | |
.dark .tab-content { | |
background-color: #2d2d2d !important; | |
border-color: #444 !important; | |
color: #eee !important; | |
} | |
.dark .quiz-question { | |
background-color: #3d3d3d !important; | |
color: #eee !important; | |
} | |
.dark .quiz-results { | |
background-color: #2e3d2e !important; | |
color: #eee !important; | |
} | |
.dark textarea, .dark input { | |
background-color: #333 !important; | |
color: #eee !important; | |
border-color: #555 !important; | |
} | |
.dark .output-markdown { | |
color: #eee !important; | |
} | |
.dark .chatbot { | |
background-color: #333 !important; | |
} | |
.dark .chatbot .user, .dark .chatbot .assistant { | |
color: #eee !important; | |
} | |
.dark .metric-box { | |
background-color: #333 !important; | |
color: #eee !important; | |
} | |
.dark .goal-card { | |
background-color: #333; | |
color: #eee; | |
} | |
.dark .calendar-event { | |
background-color: #1a3d5c; | |
color: #eee; | |
} | |
""" | |
# Header with improved layout | |
with gr.Row(): | |
with gr.Column(scale=4): | |
gr.Markdown(""" | |
# π Student Learning Assistant | |
**Your personalized education companion** | |
Complete each step to get customized learning recommendations and academic planning. | |
""") | |
with gr.Column(scale=1): | |
dark_mode = gr.Checkbox(label="Dark Mode", value=False) | |
# Navigation buttons with icons | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=100): | |
step1 = gr.Button("π 1. Transcript", elem_classes="incomplete-tab") | |
with gr.Column(scale=1, min_width=100): | |
step2 = gr.Button("π 2. Quiz", elem_classes="incomplete-tab", interactive=False) | |
with gr.Column(scale=1, min_width=100): | |
step3 = gr.Button("π€ 3. Profile", elem_classes="incomplete-tab", interactive=False) | |
with gr.Column(scale=1, min_width=100): | |
step4 = gr.Button("π 4. Review", elem_classes="incomplete-tab", interactive=False) | |
with gr.Column(scale=1, min_width=100): | |
step5 = gr.Button("π¬ 5. Assistant", elem_classes="incomplete-tab", interactive=False) | |
with gr.Column(scale=1, min_width=100): | |
step6 = gr.Button("π― 6. Goals", elem_classes="incomplete-tab", interactive=False) | |
nav_message = gr.HTML(visible=False) | |
# Main tabs | |
with gr.Tabs(visible=True) as tabs: | |
# ===== TAB 1: TRANSCRIPT UPLOAD ===== | |
with gr.Tab("Transcript", id=0): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### π Step 1: Upload Your Transcript") | |
with gr.Group(elem_classes="file-upload"): | |
file_input = gr.File( | |
label="Drag and drop your transcript here (PDF or Image)", | |
file_types=ALLOWED_FILE_TYPES, | |
type="filepath" | |
) | |
upload_btn = gr.Button("Analyze Transcript", variant="primary") | |
file_error = gr.HTML(visible=False) | |
with gr.Column(scale=2): | |
transcript_output = gr.Textbox( | |
label="Analysis Results", | |
lines=10, | |
interactive=False, | |
elem_classes="transcript-results" | |
) | |
with gr.Row(): | |
gpa_viz = gr.Plot(label="GPA Visualization", visible=False) | |
req_viz = gr.Plot(label="Requirements Visualization", visible=False) | |
with gr.Row(): | |
credits_viz = gr.Plot(label="Credits Distribution", visible=False) | |
rigor_viz = gr.Plot(label="Course Rigor", visible=False) | |
transcript_data = gr.State() | |
file_input.change( | |
fn=lambda f: ( | |
gr.update(visible=False), | |
gr.update(value="File ready for analysis!", visible=True) if f | |
else gr.update(value="Please upload a file", visible=False) | |
), | |
inputs=file_input, | |
outputs=[file_error, transcript_output] | |
) | |
def process_and_visualize(file_obj, tab_status): | |
try: | |
# Parse transcript with enhanced parser | |
parsed_data = transcript_parser.parse_transcript(file_obj.name, os.path.splitext(file_obj.name)[1].lower()) | |
# Generate analyses | |
gpa_analysis = academic_analyzer.analyze_gpa(parsed_data) | |
grad_status = academic_analyzer.analyze_graduation_status(parsed_data) | |
college_recs = academic_analyzer.generate_college_recommendations(parsed_data) | |
# Format results | |
results = [ | |
f"## π GPA Analysis", | |
f"**Rating:** {gpa_analysis['rating']}", | |
f"{gpa_analysis['description']}", | |
f"{gpa_analysis['comparison']}", | |
"", | |
f"## π Graduation Status", | |
grad_status['status'], | |
f"**Completion:** {grad_status['completion_percentage']:.1f}%", | |
"", | |
f"## π« College Recommendations" | |
] | |
if college_recs['reach']: | |
results.append("\n**Reach Schools:**") | |
results.extend([f"- {school}" for school in college_recs['reach'][:3]]) | |
if college_recs['target']: | |
results.append("\n**Target Schools:**") | |
results.extend([f"- {school}" for school in college_recs['target'][:3]]) | |
if college_recs['safety']: | |
results.append("\n**Safety Schools:**") | |
results.extend([f"- {school}" for school in college_recs['safety'][:3]]) | |
if gpa_analysis.get('improvement_tips'): | |
results.append("\n**Improvement Tips:**") | |
results.extend([f"- {tip}" for tip in gpa_analysis['improvement_tips']]) | |
# Update visualizations | |
viz_updates = [ | |
gr.update(visible=data_visualizer.create_gpa_visualization(parsed_data) is not None), | |
gr.update(visible=data_visualizer.create_requirements_visualization(parsed_data) is not None), | |
gr.update(visible=data_visualizer.create_credits_distribution_visualization(parsed_data) is not None), | |
gr.update(visible=data_visualizer.create_course_rigor_visualization(parsed_data) is not None) | |
] | |
# Update tab completion status | |
tab_status[0] = True | |
return "\n".join(results), parsed_data, *viz_updates, tab_status | |
except Exception as e: | |
error_msg = f"Error processing transcript: {str(e)}" | |
logger.error(error_msg) | |
raise gr.Error(f"{error_msg}\n\nPossible solutions:\n1. Try a different file format\n2. Ensure text is clear and not handwritten\n3. Check file size (<{MAX_FILE_SIZE_MB}MB)") | |
upload_btn.click( | |
fn=process_and_visualize, | |
inputs=[file_input, tab_completed], | |
outputs=[transcript_output, transcript_data, gpa_viz, req_viz, credits_viz, rigor_viz, tab_completed] | |
).then( | |
fn=lambda: gr.update(elem_classes="completed-tab"), | |
outputs=step1 | |
).then( | |
fn=lambda: gr.update(interactive=True), | |
outputs=step2 | |
) | |
# ===== TAB 2: LEARNING STYLE QUIZ ===== | |
with gr.Tab("Learning Style Quiz", id=1): | |
with gr.Column(): | |
gr.Markdown("### π Step 2: Discover Your Learning Style") | |
progress = gr.HTML("<div class='progress-bar' style='width: 0%'></div>") | |
quiz_components = [] | |
with gr.Accordion("Quiz Questions", open=True): | |
for i, (question, options) in enumerate(zip(learning_style_quiz.questions, learning_style_quiz.options)): | |
with gr.Group(elem_classes="quiz-question"): | |
q = gr.Radio( | |
options, | |
label=f"{i+1}. {question}", | |
show_label=True | |
) | |
quiz_components.append(q) | |
with gr.Row(): | |
quiz_submit = gr.Button("Submit Quiz", variant="primary") | |
quiz_clear = gr.Button("Clear Answers") | |
quiz_alert = gr.HTML(visible=False) | |
learning_output = gr.Markdown( | |
label="Your Learning Style Results", | |
visible=False, | |
elem_classes="quiz-results" | |
) | |
for component in quiz_components: | |
component.change( | |
fn=lambda *answers: { | |
progress: gr.HTML( | |
f"<div class='progress-bar' style='width: {sum(1 for a in answers if a)/len(answers)*100}%'></div>" | |
) | |
}, | |
inputs=quiz_components, | |
outputs=progress | |
) | |
quiz_submit.click( | |
fn=lambda *answers: learning_style_quiz.evaluate_quiz(*answers), | |
inputs=quiz_components, | |
outputs=learning_output | |
).then( | |
fn=lambda: gr.update(visible=True), | |
outputs=learning_output | |
).then( | |
fn=lambda: {1: True}, | |
inputs=None, | |
outputs=tab_completed | |
).then( | |
fn=lambda: gr.update(elem_classes="completed-tab"), | |
outputs=step2 | |
).then( | |
fn=lambda: gr.update(interactive=True), | |
outputs=step3 | |
) | |
quiz_clear.click( | |
fn=lambda: [None] * len(quiz_components), | |
outputs=quiz_components | |
).then( | |
fn=lambda: gr.HTML("<div class='progress-bar' style='width: 0%'></div>"), | |
outputs=progress | |
) | |
# ===== TAB 3: PERSONAL QUESTIONS ===== | |
with gr.Tab("Personal Profile", id=2): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### π€ Step 3: Tell Us About Yourself") | |
with gr.Group(): | |
name = gr.Textbox(label="Full Name", placeholder="Your name") | |
age = gr.Number(label="Age", minimum=MIN_AGE, maximum=MAX_AGE, precision=0) | |
interests = gr.Textbox( | |
label="Your Interests/Hobbies", | |
placeholder="e.g., Science, Music, Sports, Art..." | |
) | |
save_personal_btn = gr.Button("Save Information", variant="primary") | |
save_confirmation = gr.HTML(visible=False) | |
with gr.Column(scale=1): | |
gr.Markdown("### β€οΈ Favorites") | |
with gr.Group(): | |
movie = gr.Textbox(label="Favorite Movie") | |
movie_reason = gr.Textbox(label="Why do you like it?", lines=2) | |
show = gr.Textbox(label="Favorite TV Show") | |
show_reason = gr.Textbox(label="Why do you like it?", lines=2) | |
book = gr.Textbox(label="Favorite Book") | |
book_reason = gr.Textbox(label="Why do you like it?", lines=2) | |
character = gr.Textbox(label="Favorite Character (from any story)") | |
character_reason = gr.Textbox(label="Why do you like them?", lines=2) | |
with gr.Accordion("Personal Blog (Optional)", open=False): | |
blog = gr.Textbox( | |
label="Share your thoughts", | |
placeholder="Write something about yourself...", | |
lines=5 | |
) | |
save_personal_btn.click( | |
fn=lambda n, a, i, ts: ( | |
{2: True}, | |
gr.update(elem_classes="completed-tab"), | |
gr.update(interactive=True), | |
gr.update(value="<div class='alert-box'>Information saved!</div>", visible=True) | |
), | |
inputs=[name, age, interests, tab_completed], | |
outputs=[tab_completed, step3, step4, save_confirmation] | |
) | |
# ===== TAB 4: SAVE & REVIEW ===== | |
with gr.Tab("Save Profile", id=3): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### π Step 4: Review & Save Your Profile") | |
with gr.Group(): | |
load_profile_dropdown = gr.Dropdown( | |
label="Load Existing Profile", | |
choices=profile_manager.list_profiles(session_token.value), | |
visible=False | |
) | |
with gr.Row(): | |
load_btn = gr.Button("Load", visible=False) | |
delete_btn = gr.Button("Delete", variant="stop", visible=False) | |
save_btn = gr.Button("Save Profile", variant="primary") | |
clear_btn = gr.Button("Clear Form") | |
with gr.Column(scale=2): | |
output_summary = gr.Markdown( | |
"Your profile summary will appear here after saving.", | |
label="Profile Summary" | |
) | |
with gr.Row(): | |
req_viz_matplotlib = gr.Plot(label="Requirements Progress", visible=False) | |
credits_viz_matplotlib = gr.Plot(label="Credits Distribution", visible=False) | |
save_btn.click( | |
fn=profile_manager.save_profile, | |
inputs=[ | |
name, age, interests, transcript_data, learning_output, | |
movie, movie_reason, show, show_reason, | |
book, book_reason, character, character_reason, blog | |
], | |
outputs=output_summary | |
).then( | |
fn=lambda td: ( | |
gr.update(visible=data_visualizer.create_requirements_visualization(td) is not None), | |
gr.update(visible=data_visualizer.create_credits_distribution_visualization(td) is not None) | |
), | |
inputs=transcript_data, | |
outputs=[req_viz_matplotlib, credits_viz_matplotlib] | |
).then( | |
fn=lambda: {3: True}, | |
inputs=None, | |
outputs=tab_completed | |
).then( | |
fn=lambda: gr.update(elem_classes="completed-tab"), | |
outputs=step4 | |
).then( | |
fn=lambda: gr.update(interactive=True), | |
outputs=step5 | |
).then( | |
fn=lambda: gr.update(interactive=True), | |
outputs=step6 | |
).then( | |
fn=lambda: profile_manager.list_profiles(session_token.value), | |
outputs=load_profile_dropdown | |
).then( | |
fn=lambda: gr.update(visible=bool(profile_manager.list_profiles(session_token.value))), | |
outputs=load_btn | |
).then( | |
fn=lambda: gr.update(visible=bool(profile_manager.list_profiles(session_token.value))), | |
outputs=delete_btn | |
) | |
load_btn.click( | |
fn=lambda: profile_manager.load_profile(load_profile_dropdown.value, session_token.value), | |
inputs=None, | |
outputs=None | |
).then( | |
fn=lambda profile: ( | |
profile.get('name', ''), | |
profile.get('age', ''), | |
profile.get('interests', ''), | |
profile.get('learning_style', ''), | |
profile.get('favorites', {}).get('movie', ''), | |
profile.get('favorites', {}).get('movie_reason', ''), | |
profile.get('favorites', {}).get('show', ''), | |
profile.get('favorites', {}).get('show_reason', ''), | |
profile.get('favorites', {}).get('book', ''), | |
profile.get('favorites', {}).get('book_reason', ''), | |
profile.get('favorites', {}).get('character', ''), | |
profile.get('favorites', {}).get('character_reason', ''), | |
profile.get('blog', ''), | |
profile.get('transcript', {}), | |
gr.update(value="Profile loaded successfully!"), | |
data_visualizer.create_requirements_visualization(profile.get('transcript', {})), | |
data_visualizer.create_credits_distribution_visualization(profile.get('transcript', {})) | |
), | |
inputs=None, | |
outputs=[ | |
name, age, interests, learning_output, | |
movie, movie_reason, show, show_reason, | |
book, book_reason, character, character_reason, | |
blog, transcript_data, output_summary, | |
req_viz_matplotlib, credits_viz_matplotlib | |
] | |
) | |
# ===== TAB 5: AI ASSISTANT ===== | |
with gr.Tab("AI Assistant", id=4): | |
gr.Markdown("## π¬ Your Personalized Learning Assistant") | |
gr.Markdown("Ask me anything about studying, your courses, grades, or learning strategies.") | |
# Create chatbot interface without undo_btn | |
chatbot = gr.ChatInterface( | |
fn=lambda msg, hist: teaching_assistant.generate_response(msg, hist, session_token.value), | |
examples=[ | |
"What's my GPA?", | |
"How should I study for math?", | |
"What courses am I taking?", | |
"Study tips for my learning style", | |
"What colleges should I consider?" | |
], | |
title="" | |
) | |
# ===== TAB 6: GOALS & PLANNING ===== | |
with gr.Tab("Goals & Planning", id=5): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### π― Step 5: Set Academic Goals") | |
with gr.Group(): | |
goal_type = gr.Dropdown( | |
label="Goal Type", | |
choices=["GPA Improvement", "Course Completion", "Test Score", "Other"], | |
value="GPA Improvement" | |
) | |
goal_description = gr.Textbox(label="Goal Description") | |
goal_target_date = gr.DatePicker(label="Target Date") | |
goal_target_value = gr.Number(label="Target Value (if applicable)", visible=False) | |
add_goal_btn = gr.Button("Add Goal", variant="primary") | |
gr.Markdown("### π Study Calendar") | |
calendar_start_date = gr.DatePicker(label="Calendar Start Date", value=datetime.date.today()) | |
generate_calendar_btn = gr.Button("Generate Study Calendar") | |
with gr.Column(scale=2): | |
gr.Markdown("### Your Goals") | |
goals_output = gr.HTML() | |
goal_viz = gr.Plot(label="Goal Progress", visible=False) | |
gr.Markdown("### Your Study Calendar") | |
calendar_output = gr.HTML() | |
calendar_viz = gr.Plot(label="Calendar Visualization", visible=False) | |
# Show/hide target value based on goal type | |
goal_type.change( | |
fn=lambda gt: gr.update(visible=gt in ["GPA Improvement", "Test Score"]), | |
inputs=goal_type, | |
outputs=goal_target_value | |
) | |
def update_goals_display(profile_name): | |
goals = goal_tracker.get_goals(profile_name) | |
if not goals: | |
return ( | |
"<div class='alert-box'>No goals set yet. Add your first goal above!</div>", | |
gr.update(visible=False) | |
) | |
goals_html = [] | |
for goal in goals: | |
progress = goal['progress'][-1]['value'] if goal['progress'] else 0 | |
target = goal['target_value'] if goal['target_value'] is not None else "N/A" | |
goals_html.append(f""" | |
<div class='goal-card'> | |
<h4>{goal['description']}</h4> | |
<p><strong>Type:</strong> {goal['type']}</p> | |
<p><strong>Target Date:</strong> {goal['target_date']}</p> | |
<p><strong>Progress:</strong> {progress} / {target}</p> | |
{f"<p><strong>Last Note:</strong> {goal['progress'][-1]['notes']}</p>" if goal['progress'] else ""} | |
</div> | |
""") | |
return ( | |
"\n".join(goals_html), | |
gr.update(visible=goal_tracker.create_goal_visualization(goals) is not None) | |
) | |
def update_calendar_display(profile_name, start_date): | |
profile = profile_manager.load_profile(profile_name, session_token.value) | |
if not profile: | |
return ( | |
"<div class='alert-box'>Please complete and save your profile first</div>", | |
gr.update(visible=False) | |
) | |
calendar = study_calendar.generate_study_calendar(profile, start_date.isoformat()) | |
# Create HTML display | |
calendar_html = [] | |
current_date = datetime.date.fromisoformat(calendar['start_date']) | |
end_date = datetime.date.fromisoformat(calendar['end_date']) | |
while current_date <= end_date: | |
day_events = [ | |
e for e in calendar['events'] | |
if datetime.date.fromisoformat(e['date']) == current_date | |
] | |
day_exams = [ | |
e for e in calendar['exams'] | |
if datetime.date.fromisoformat(e['date']) == current_date | |
] | |
if day_events or day_exams: | |
calendar_html.append(f"<h4>{current_date.strftime('%A, %B %d')}</h4>") | |
for event in day_events: | |
calendar_html.append(f""" | |
<div class='calendar-event'> | |
<p><strong>π {event['title']}</strong></p> | |
<p>β±οΈ {event['duration']}</p> | |
<p>{event['description']}</p> | |
</div> | |
""") | |
for exam in day_exams: | |
calendar_html.append(f""" | |
<div class='calendar-event' style='border-left-color: #f44336;'> | |
<p><strong>π {exam['title']}</strong></p> | |
<p>β° All day</p> | |
<p>Prepare by reviewing materials and practicing problems</p> | |
</div> | |
""") | |
current_date += datetime.timedelta(days=1) | |
return ( | |
"\n".join(calendar_html) if calendar_html else "<div class='alert-box'>No study sessions scheduled yet</div>", | |
gr.update(visible=study_calendar.create_calendar_visualization(calendar) is not None) | |
) | |
# Add goal functionality | |
add_goal_btn.click( | |
fn=lambda gt, desc, date, val: ( | |
goal_tracker.add_goal(name.value, gt, desc, date.isoformat(), val), | |
update_goals_display(name.value) | |
), | |
inputs=[goal_type, goal_description, goal_target_date, goal_target_value], | |
outputs=[goals_output, goal_viz] | |
).then( | |
fn=lambda: name.value, | |
inputs=None, | |
outputs=None | |
).then( | |
fn=update_goals_display, | |
inputs=name, | |
outputs=[goals_output, goal_viz] | |
) | |
# Generate calendar functionality | |
generate_calendar_btn.click( | |
fn=lambda date: ( | |
update_calendar_display(name.value, date) | |
), | |
inputs=calendar_start_date, | |
outputs=[calendar_output, calendar_viz] | |
) | |
# Navigation logic | |
def navigate_to_tab(tab_index: int, tab_completed_status: dict): | |
# Check if all previous tabs are completed | |
for i in range(tab_index): | |
if not tab_completed_status.get(i, False): | |
messages = [ | |
"Please complete the transcript analysis first", | |
"Please complete the learning style quiz first", | |
"Please fill out your personal information first", | |
"Please save your profile first", | |
"Please complete the previous steps first" | |
] | |
return ( | |
gr.Tabs(selected=i), # Go to first incomplete tab | |
gr.update( | |
value=f"<div class='error-message'>β {messages[i]}</div>", | |
visible=True | |
) | |
) | |
return gr.Tabs(selected=tab_index), gr.update(visible=False) | |
step1.click( | |
lambda idx, status: navigate_to_tab(idx, status), | |
inputs=[gr.State(0), tab_completed], | |
outputs=[tabs, nav_message] | |
) | |
step2.click( | |
lambda idx, status: navigate_to_tab(idx, status), | |
inputs=[gr.State(1), tab_completed], | |
outputs=[tabs, nav_message] | |
) | |
step3.click( | |
lambda idx, status: navigate_to_tab(idx, status), | |
inputs=[gr.State(2), tab_completed], | |
outputs=[tabs, nav_message] | |
) | |
step4.click( | |
lambda idx, status: navigate_to_tab(idx, status), | |
inputs=[gr.State(3), tab_completed], | |
outputs=[tabs, nav_message] | |
) | |
step5.click( | |
lambda idx, status: navigate_to_tab(idx, status), | |
inputs=[gr.State(4), tab_completed], | |
outputs=[tabs, nav_message] | |
) | |
step6.click( | |
lambda idx, status: navigate_to_tab(idx, status), | |
inputs=[gr.State(5), tab_completed], | |
outputs=[tabs, nav_message] | |
) | |
# Dark mode toggle | |
def toggle_dark_mode(dark): | |
return gr.themes.Soft(primary_hue="blue", secondary_hue="gray") if not dark else gr.themes.Soft(primary_hue="blue", secondary_hue="gray", neutral_hue="slate") | |
dark_mode.change( | |
fn=toggle_dark_mode, | |
inputs=dark_mode, | |
outputs=None | |
) | |
# Load model on startup | |
app.load(fn=lambda: model_loader.load_model(), outputs=[]) | |
return app | |
app = create_enhanced_interface() | |
if __name__ == "__main__": | |
app.launch(server_name="0.0.0.0", server_port=7860) | |