import gradio as gr from groq import Groq import os from PIL import Image, ImageDraw, ImageFont from datetime import datetime import json import tempfile from typing import List, Dict, Tuple, Optional, Any from dataclasses import dataclass import subprocess from pathlib import Path import logging from functools import lru_cache # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass(frozen=True) class Question: """Immutable question data structure with validation""" question: str options: Tuple[str, ...] # Using tuple for immutability correct_answer: int def __post_init__(self): if not isinstance(self.options, tuple): object.__setattr__(self, 'options', tuple(self.options)) if not (0 <= self.correct_answer < len(self.options)): raise ValueError(f"Correct answer index {self.correct_answer} out of range") if len(self.options) != 4: raise ValueError(f"Must have exactly 4 options, got {len(self.options)}") @dataclass(frozen=True) class QuizFeedback: """Immutable feedback data structure""" is_correct: bool selected: Optional[str] correct_answer: str class CacheableQuizGenerator: """Quiz generator with caching capabilities""" def __init__(self, api_key: str): self.client = Groq(api_key=api_key) self._cache = {} @lru_cache(maxsize=100) def _generate_cached_questions(self, text_hash: str, num_questions: int) -> List[Question]: """Cache questions based on content hash""" text = self._cache.get(text_hash) if not text: raise KeyError("Text not found in cache") return self._generate_questions_internal(text, num_questions) def generate_questions(self, text: str, num_questions: int) -> List[Question]: """Generate questions with caching""" text_hash = hash(text) self._cache[text_hash] = text try: return self._generate_cached_questions(text_hash, num_questions) except Exception as e: logger.error(f"Error generating questions: {e}") raise QuizGenerationError(f"Failed to generate questions: {e}") def _generate_questions_internal(self, text: str, num_questions: int) -> List[Question]: """Internal question generation logic""" try: response = self._get_llm_response(text, num_questions) questions = self._parse_response(response) return self._validate_questions(questions, num_questions) except Exception as e: logger.error(f"Error in question generation: {e}") raise def _get_llm_response(self, text: str, num_questions: int) -> str: """Get response from LLM with retry logic""" max_retries = 3 for attempt in range(max_retries): try: response = self.client.chat.completions.create( messages=self._create_messages(text, num_questions), model="llama-3.2-3b-preview", temperature=0, max_tokens=2048 ) return response.choices[0].message.content except Exception as e: if attempt == max_retries - 1: raise logger.warning(f"Retry {attempt + 1}/{max_retries} after error: {e}") continue @staticmethod def _create_messages(text: str, num_questions: int) -> List[Dict[str, str]]: """Create messages for LLM prompt""" return [ { "role": "system", "content": "You are a quiz generator. Create clear questions with concise answer options." }, { "role": "user", "content": f"""Generate exactly {num_questions} multiple choice questions based on this text: {text} Return only a JSON array with this format: [ {{ "question": "Question text?", "options": ["Option 1", "Option 2", "Option 3", "Option 4"], "correct_answer": 0 }} ]""" } ] def _parse_response(self, response_text: str) -> List[Dict]: """Parse response with improved error handling""" try: # Clean up the response text cleaned_text = response_text.strip() cleaned_text = cleaned_text.replace('```json', '').replace('```', '').strip() # Find the JSON array start_idx = cleaned_text.find('[') end_idx = cleaned_text.rfind(']') if start_idx == -1 or end_idx == -1: raise ValueError("No valid JSON array found in response") json_text = cleaned_text[start_idx:end_idx + 1] # Attempt to parse the JSON try: return json.loads(json_text) except json.JSONDecodeError as e: print(f"JSON Parse Error: {str(e)}") print(f"Attempted to parse: {json_text}") raise except Exception as e: print(f"Error parsing response: {str(e)}") print(f"Original response: {response_text}") raise ValueError(f"Failed to parse response: {str(e)}") def _validate_questions(self, questions: List[Dict], num_questions: int) -> List[Question]: """Validate questions with improved error checking""" validated = [] for q in questions: try: if not self._is_valid_question(q): print(f"Invalid question format: {q}") continue validated.append(Question( question=q["question"].strip(), options=[str(opt).strip()[:100] for opt in q["options"]], correct_answer=int(q["correct_answer"]) % 4 )) except Exception as e: print(f"Error validating question: {str(e)}") continue if not validated: raise ValueError("No valid questions after validation") return validated[:num_questions] def _is_valid_question(self, question: Dict) -> bool: """Check if question format is valid""" try: return ( isinstance(question, dict) and all(key in question for key in ["question", "options", "correct_answer"]) and isinstance(question["question"], str) and isinstance(question["options"], list) and len(question["options"]) == 4 and all(isinstance(opt, str) for opt in question["options"]) and isinstance(question["correct_answer"], (int, str)) and int(question["correct_answer"]) in range(4) ) except Exception as e: print(f"Question validation error: {str(e)}") return False class FontManager: """Manages font installation and loading for the certificate generator""" @staticmethod def install_fonts(): """Install required fonts if they're not already present""" try: # Install fonts package subprocess.run([ "apt-get", "update", "-y" ], check=True) subprocess.run([ "apt-get", "install", "-y", "fonts-liberation", # Liberation Sans fonts "fontconfig", # Font configuration "fonts-dejavu-core" # DejaVu fonts as fallback ], check=True) # Clear font cache subprocess.run(["fc-cache", "-f"], check=True) print("Fonts installed successfully") except subprocess.CalledProcessError as e: print(f"Warning: Could not install fonts: {e}") except Exception as e: print(f"Warning: Unexpected error installing fonts: {e}") @staticmethod def get_font_paths() -> Dict[str, str]: """Get the paths to the required fonts with multiple fallbacks""" standard_paths = [ "/usr/share/fonts", "/usr/local/share/fonts", "/usr/share/fonts/truetype", "~/.fonts" ] font_paths = { 'regular': None, 'bold': None } # Common font filenames to try fonts_to_try = { 'regular': [ 'LiberationSans-Regular.ttf', 'DejaVuSans.ttf', 'FreeSans.ttf' ], 'bold': [ 'LiberationSans-Bold.ttf', 'DejaVuSans-Bold.ttf', 'FreeSans-Bold.ttf' ] } def find_font(font_name: str) -> Optional[str]: """Search for a font file in standard locations""" for base_path in standard_paths: for root, _, files in os.walk(os.path.expanduser(base_path)): if font_name in files: return os.path.join(root, font_name) return None # Try to find each font for style in ['regular', 'bold']: for font_name in fonts_to_try[style]: font_path = find_font(font_name) if font_path: font_paths[style] = font_path break # If no fonts found, try using fc-match as fallback if not all(font_paths.values()): try: for style in ['regular', 'bold']: if not font_paths[style]: result = subprocess.run( ['fc-match', '-f', '%{file}', 'sans-serif:style=' + style], capture_output=True, text=True ) if result.returncode == 0 and result.stdout.strip(): font_paths[style] = result.stdout.strip() except Exception as e: print(f"Warning: Could not use fc-match to find fonts: {e}") return font_paths class QuizGenerationError(Exception): """Exception raised for errors in quiz generation""" pass class CertificateGenerator: def __init__(self): self.certificate_size = (1200, 800) self.background_color = '#FFFFFF' self.border_color = '#1C1D1F' # Install fonts if needed FontManager.install_fonts() self.font_paths = FontManager.get_font_paths() def _load_fonts(self) -> Dict[str, ImageFont.FreeTypeFont]: """Load fonts with fallbacks""" fonts = {} try: if self.font_paths['regular'] and self.font_paths['bold']: fonts['title'] = ImageFont.truetype(self.font_paths['bold'], 36) fonts['subtitle'] = ImageFont.truetype(self.font_paths['regular'], 14) fonts['text'] = ImageFont.truetype(self.font_paths['regular'], 20) fonts['name'] = ImageFont.truetype(self.font_paths['bold'], 32) else: raise ValueError("No suitable fonts found") except Exception as e: print(f"Font loading error: {e}. Using default font.") default = ImageFont.load_default() fonts = { 'title': default, 'subtitle': default, 'text': default, 'name': default } return fonts def _add_professional_border(self, draw: ImageDraw.Draw): # Single elegant inner border with padding padding = 40 draw.rectangle( [(padding, padding), (self.certificate_size[0] - padding, self.certificate_size[1] - padding)], outline='#1C1D1F', width=2 ) def _add_content( self, draw: ImageDraw.Draw, fonts: Dict[str, ImageFont.FreeTypeFont], name: str, course_name: str, score: float ): # Add "CERTIFICATE OF COMPLETION" text draw.text((60, 140), "CERTIFICATE OF COMPLETION", font=fonts['subtitle'], fill='#666666') # Add course name (large and bold) course_name = course_name.strip() or "Assessment" text_width = draw.textlength(course_name, fonts['title']) max_width = self.certificate_size[0] - 120 # Leave margins if text_width > max_width: words = course_name.split() lines = [] current_line = [] current_width = 0 for word in words: word_width = draw.textlength(word + " ", fonts['title']) if current_width + word_width <= max_width: current_line.append(word) current_width += word_width else: lines.append(" ".join(current_line)) current_line = [word] current_width = word_width if current_line: lines.append(" ".join(current_line)) course_name = "\n".join(lines) draw.multiline_text((60, 200), course_name, font=fonts['title'], fill='#1C1D1F', spacing=10) # Add instructor info draw.text((60, 300), "Instructor", font=fonts['subtitle'], fill='#666666') draw.text((60, 330), "CertifyMe AI", font=fonts['text'], fill='#1C1D1F') # Add participant name (large) name = name.strip() or "Participant" draw.text((60, 420), name, font=fonts['name'], fill='#1C1D1F') # Add date and score info with spacing date_str = datetime.now().strftime("%b. %d, %Y") # Date section draw.text((60, 500), "Date", font=fonts['subtitle'], fill='#666666') draw.text((60, 530), date_str, font=fonts['text'], fill='#1C1D1F') # Score section draw.text((300, 500), "Score", font=fonts['subtitle'], fill='#666666') draw.text((300, 530), f"{float(score):.1f}%", font=fonts['text'], fill='#1C1D1F') # Footer section with certificate number and reference certificate_id = f"Certificate no: {datetime.now().strftime('%Y%m%d')}-{abs(hash(name)) % 10000:04d}" ref_number = f"Reference Number: {abs(hash(name + date_str)) % 10000:04d}" # Draw footer text aligned to left and right draw.text((60, 720), certificate_id, font=fonts['subtitle'], fill='#666666') draw.text((1140, 720), ref_number, font=fonts['subtitle'], fill='#666666', anchor="ra") def _add_logo(self, certificate: Image.Image, logo_path: str): try: logo = Image.open(logo_path) # Resize logo to appropriate size logo.thumbnail((150, 80)) # Position in top-left corner with padding certificate.paste(logo, (60, 50), mask=logo if 'A' in logo.getbands() else None) except Exception as e: print(f"Error adding logo: {e}") def _add_photo(self, certificate: Image.Image, photo_path: str): try: photo = Image.open(photo_path) # Create circular mask size = (100, 100) mask = Image.new('L', size, 0) draw = ImageDraw.Draw(mask) draw.ellipse((0, 0, size[0], size[1]), fill=255) # Resize photo maintaining aspect ratio photo.thumbnail(size) # Create a circular photo output = Image.new('RGBA', size, (0, 0, 0, 0)) output.paste(photo, (0, 0)) output.putalpha(mask) # Position in top-right corner with padding certificate.paste(output, (1000, 50), mask=output) except Exception as e: print(f"Error adding photo: {e}") def generate( self, score: float, name: str, course_name: str, company_logo: Optional[str] = None, participant_photo: Optional[str] = None ) -> str: try: certificate = self._create_base_certificate() draw = ImageDraw.Draw(certificate) # Add professional border self._add_professional_border(draw) fonts = self._load_fonts() self._add_content(draw, fonts, str(name), str(course_name), float(score)) if company_logo: self._add_logo(certificate, company_logo) if participant_photo: self._add_photo(certificate, participant_photo) return self._save_certificate(certificate) except Exception as e: print(f"Error generating certificate: {e}") return None def _create_base_certificate(self) -> Image.Image: return Image.new('RGB', self.certificate_size, self.background_color) def _save_certificate(self, certificate: Image.Image) -> str: temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') certificate.save(temp_file.name, 'PNG', quality=95) return temp_file.name class QuizApp: """Enhanced quiz application with proper state management""" def __init__(self, api_key: str): self.quiz_generator = CacheableQuizGenerator(api_key) self.certificate_generator = CertificateGenerator() self._current_questions: List[Question] = [] self._user_answers: List[Optional[str]] = [] @property def current_questions(self) -> List[Question]: """Get current questions with defensive copy""" return self._current_questions.copy() def set_questions(self, questions: List[Question]) -> None: """Set current questions with validation""" if not all(isinstance(q, Question) for q in questions): raise ValueError("All elements must be Question instances") self._current_questions = questions.copy() self._user_answers = [None] * len(questions) def submit_answer(self, question_idx: int, answer: Optional[str]) -> None: """Submit an answer with validation""" if not 0 <= question_idx < len(self._current_questions): raise ValueError(f"Invalid question index: {question_idx}") self._user_answers[question_idx] = answer def calculate_score(self) -> Tuple[float, bool, List[QuizFeedback]]: """Calculate quiz score with proper validation""" if not self._current_questions or not self._user_answers: return 0.0, False, [] if len(self._current_questions) != len(self._user_answers): raise ValueError("Questions and answers length mismatch") feedback = [] correct_count = 0 for question, answer in zip(self._current_questions, self._user_answers): is_correct = False if answer is not None: try: selected_index = question.options.index(answer) is_correct = selected_index == question.correct_answer if is_correct: correct_count += 1 except ValueError: pass feedback.append(QuizFeedback( is_correct=is_correct, selected=answer, correct_answer=question.options[question.correct_answer] )) score = (correct_count / len(self._current_questions)) * 100 return score, score >= 80, feedback def update_questions(self, text: str, num_questions: int) -> Tuple[gr.update, gr.update, List[gr.update], List[Question], gr.update]: """ Event handler for generating new questions """ if not text.strip(): return ( gr.update(value=""), gr.update(value="⚠️ Please enter some text content to generate questions."), *[gr.update(visible=False, choices=[]) for _ in range(5)], [], gr.update(selected=1) ) success, questions = self.generate_questions(text, num_questions) if not success or not questions: return ( gr.update(value=""), gr.update(value="❌ Failed to generate questions. Please try again."), *[gr.update(visible=False, choices=[]) for _ in range(5)], [], gr.update(selected=1) ) # Create question display questions_html = "# 📝 Assessment Questions\n\n" questions_html += "> Please select one answer for each question.\n\n" # Update radio buttons updates = [] for i, q in enumerate(questions): questions_html += f"### Question {i+1}\n{q.question}\n\n" updates.append(gr.update( visible=True, choices=q.options, value=None, label=f"Select your answer:" )) # Hide unused radio buttons for i in range(len(questions), 5): updates.append(gr.update(visible=False, choices=[])) return ( gr.update(value=questions_html), gr.update(value=""), *updates, questions, gr.update(selected=1) ) def submit_quiz(self, q1: Optional[str], q2: Optional[str], q3: Optional[str], q4: Optional[str], q5: Optional[str], questions: List[Question] ) -> Tuple[gr.update, List[gr.update], float, str, gr.update]: """ Event handler for quiz submission """ answers = [q1, q2, q3, q4, q5][:len(questions)] if not all(a is not None for a in answers): return ( gr.update(value="⚠️ Please answer all questions before submitting."), *[gr.update() for _ in range(5)], 0, "", gr.update(selected=1) ) score, passed, feedback = self.calculate_score(answers) # Create feedback HTML feedback_html = "# Assessment Results\n\n" for i, (q, f) in enumerate(zip(self.current_questions, feedback)): color = "green" if f.is_correct else "red" symbol = "✅" if f.is_correct else "❌" feedback_html += f""" ### Question {i+1} {q.question}
You passed the assessment with a score of {score:.1f}%
Your certificate has been generated.
Your score: {score:.1f}%
You need 80% or higher to pass and receive a certificate.