import time import gradio as gr import pandas as pd import openvino_genai from huggingface_hub import snapshot_download from threading import Lock import os import numpy as np import requests from PIL import Image from io import BytesIO import cpuinfo import openvino as ov import librosa from googleapiclient.discovery import build import gc import tempfile from PyPDF2 import PdfReader from docx import Document import textwrap # Google API configuration GOOGLE_API_KEY = "AIzaSyAo-1iW5MEZbc53DlEldtnUnDaYuTHUDH4" GOOGLE_CSE_ID = "3027bedf3c88a4efb" DEFAULT_MAX_TOKENS = 100 DEFAULT_NUM_IMAGES = 1 MAX_HISTORY_TURNS = 3 MAX_TOKENS_LIMIT = 1000 class UnifiedAISystem: def __init__(self): self.pipe_lock = Lock() self.current_df = None self.mistral_pipe = None self.internvl_pipe = None self.whisper_pipe = None self.current_document_text = None # Store document content self.initialize_models() def initialize_models(self): """Initialize all required models""" # Download models if not exists if not os.path.exists("mistral-ov"): snapshot_download(repo_id="OpenVINO/mistral-7b-instruct-v0.1-int8-ov", local_dir="mistral-ov") if not os.path.exists("internvl-ov"): snapshot_download(repo_id="OpenVINO/InternVL2-1B-int8-ov", local_dir="internvl-ov") if not os.path.exists("whisper-ov-model"): snapshot_download(repo_id="OpenVINO/whisper-tiny-fp16-ov", local_dir="whisper-ov-model") # CPU-specific configuration cpu_features = cpuinfo.get_cpu_info()['flags'] config_options = {} if 'avx512' in cpu_features: config_options["ENFORCE_BF16"] = "YES" elif 'avx2' in cpu_features: config_options["INFERENCE_PRECISION_HINT"] = "f32" # Initialize Mistral model self.mistral_pipe = openvino_genai.LLMPipeline( "mistral-ov", device="CPU", config={"PERFORMANCE_HINT": "THROUGHPUT", **config_options} ) # Initialize Whisper for audio processing self.whisper_pipe = openvino_genai.WhisperPipeline("whisper-ov-model", device="CPU") def load_data(self, file_path): """Load student data from file""" try: file_ext = os.path.splitext(file_path)[1].lower() if file_ext == '.csv': self.current_df = pd.read_csv(file_path) elif file_ext in ['.xlsx', '.xls']: self.current_df = pd.read_excel(file_path) else: return False, "❌ Unsupported file format. Please upload a .csv or .xlsx file." return True, f"✅ Loaded {len(self.current_df)} records from {os.path.basename(file_path)}" except Exception as e: return False, f"❌ Error loading file: {str(e)}" def extract_text_from_document(self, file_path): """Extract text from PDF or DOCX documents""" text = "" try: file_ext = os.path.splitext(file_path)[1].lower() if file_ext == '.pdf': with open(file_path, 'rb') as file: pdf_reader = PdfReader(file) for page in pdf_reader.pages: text += page.extract_text() + "\n" elif file_ext == '.docx': doc = Document(file_path) for para in doc.paragraphs: text += para.text + "\n" else: return False, "❌ Unsupported document format. Please upload PDF or DOCX." # Clean and format text text = text.replace('\x0c', '') # Remove form feed characters text = textwrap.dedent(text) # Remove common leading whitespace self.current_document_text = text return True, f"✅ Extracted text from {os.path.basename(file_path)}" except Exception as e: return False, f"❌ Error processing document: {str(e)}" def analyze_student_data(self, query): """Analyze student data using AI with streaming""" if not query or not query.strip(): yield "⚠️ Please enter a valid question" return if self.current_df is None: yield "⚠️ Please upload and load a student data file first" return data_summary = self._prepare_data_summary(self.current_df) prompt = f"""You are an expert education analyst. Analyze the following student performance data: {data_summary} Question: {query} Please include: 1. Direct answer to the question 2. Relevant statistics 3. Key insights 4. Actionable recommendations Format the output with clear headings""" optimized_config = openvino_genai.GenerationConfig( max_new_tokens=500, temperature=0.3, top_p=0.9, streaming=True ) full_response = "" try: with self.pipe_lock: token_iterator = self.mistral_pipe.generate(prompt, optimized_config, streaming=True) for token in token_iterator: full_response += token yield full_response except Exception as e: yield f"❌ Error during analysis: {str(e)}" def _prepare_data_summary(self, df): """Summarize the uploaded data""" summary = f"Student performance data with {len(df)} rows and {len(df.columns)} columns.\n" summary += "Columns: " + ", ".join(df.columns) + "\n" summary += "First 3 rows:\n" + df.head(3).to_string(index=False) return summary def analyze_image(self, image, url, prompt): """Analyze image with InternVL model""" try: if image is not None: image_source = image elif url and url.startswith(("http://", "https://")): response = requests.get(url) image_source = Image.open(BytesIO(response.content)).convert("RGB") else: return "⚠️ Please upload an image or enter a valid URL" # Convert to OpenVINO tensor image_data = np.array(image_source.getdata()).reshape( 1, image_source.size[1], image_source.size[0], 3 ).astype(np.byte) image_tensor = ov.Tensor(image_data) # Lazy initialize InternVL if self.internvl_pipe is None: self.internvl_pipe = openvino_genai.VLMPipeline("internvl-ov", device="CPU") with self.pipe_lock: self.internvl_pipe.start_chat() output = self.internvl_pipe.generate(prompt, image=image_tensor, max_new_tokens=100) self.internvl_pipe.finish_chat() return output except Exception as e: return f"❌ Error: {str(e)}" def process_audio(self, data, sr): """Process audio data for speech recognition""" try: # Convert to mono if data.ndim > 1: data = np.mean(data, axis=1) # Simple mono conversion else: data = data # Convert to float32 and normalize data = data.astype(np.float32) max_val = np.max(np.abs(data)) + 1e-7 data /= max_val # Simple noise reduction data = np.clip(data, -0.5, 0.5) # Trim silence energy = np.abs(data) threshold = np.percentile(energy, 25) # Simple threshold mask = energy > threshold indices = np.where(mask)[0] if len(indices) > 0: start = max(0, indices[0] - 1000) end = min(len(data), indices[-1] + 1000) data = data[start:end] # Resample if needed using simpler method if sr != 16000: # Calculate new length new_length = int(len(data) * 16000 / sr) # Linear interpolation for resampling data = np.interp( np.linspace(0, len(data)-1, new_length), np.arange(len(data)), data ) sr = 16000 return data except Exception as e: print(f"Audio processing error: {e}") return np.array([], dtype=np.float32) def transcribe(self, audio): """Transcribe audio using Whisper model with improved error handling""" if audio is None: return "" sr, data = audio # Skip if audio is too short (less than 0.5 seconds) if len(data)/sr < 0.5: return "" try: processed = self.process_audio(data, sr) # Skip if audio is still too short after processing if len(processed) < 8000: # 0.5 seconds at 16kHz return "" # Use OpenVINO Whisper pipeline result = self.whisper_pipe.generate(processed) return result except Exception as e: print(f"Transcription error: {e}") return "❌ Transcription failed - please try again" def generate_lesson_plan(self, topic, duration, additional_instructions=""): """Generate a lesson plan based on document content""" if not self.current_document_text: return "⚠️ Please upload and process a document first" prompt = f"""As an expert educator, create a focused lesson plan using the provided content. **Core Requirements:** 1. TOPIC: {topic} 2. TOTAL DURATION: {duration} periods 3. ADDITIONAL INSTRUCTIONS: {additional_instructions or 'None'} **Content Summary:** {self.current_document_text[:2500]}... [truncated] **Output Structure:** 1. PERIOD ALLOCATION (Break topic into {duration} logical segments): - Period 1: [Subtopic 1] - Period 2: [Subtopic 2] ... 2. LEARNING OBJECTIVES (Max 3 bullet points) 3. TEACHING ACTIVITIES (One engaging method per period) 4. RESOURCES (Key materials from document) 5. ASSESSMENT (Simple checks for understanding) 6. PAGE REFERENCES (Specific source pages) **Key Rules:** - Strictly divide content into exactly {duration} periods - Prioritize document content over creativity - Keep objectives measurable - Use only document resources - Make page references specific""" optimized_config = openvino_genai.GenerationConfig( max_new_tokens=1200, temperature=0.4, top_p=0.85 ) try: with self.pipe_lock: return self.mistral_pipe.generate(prompt, optimized_config) except Exception as e: return f"❌ Error generating lesson plan: {str(e)}" def fetch_images(self, query: str, num: int = DEFAULT_NUM_IMAGES) -> list: """Fetch unique images by requesting different result pages""" if num <= 0: return [] try: service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY) image_links = [] seen_urls = set() # To track unique URLs # Start from different positions to get unique images for start_index in range(1, num * 2, 2): if len(image_links) >= num: break res = service.cse().list( q=query, cx=GOOGLE_CSE_ID, searchType="image", num=1, start=start_index ).execute() if "items" in res and res["items"]: item = res["items"][0] # Skip duplicates if item["link"] not in seen_urls: image_links.append(item["link"]) seen_urls.add(item["link"]) return image_links[:num] except Exception as e: print(f"Error in image fetching: {e}") return [] def stream_answer(self, message: str, max_tokens: int) -> str: """Stream tokens with typing indicator""" optimized_config = openvino_genai.GenerationConfig( max_new_tokens=max_tokens, temperature=0.7, top_p=0.9, streaming=True ) full_response = "" try: with self.pipe_lock: token_iterator = self.mistral_pipe.generate(message, optimized_config, streaming=True) for token in token_iterator: full_response += token yield full_response # Periodic garbage collection if len(full_response) % 20 == 0: gc.collect() except Exception as e: yield f"❌ Error: {str(e)}" # Initialize global object ai_system = UnifiedAISystem() # CSS styles with improved output box css = """ .gradio-container { background-color: #121212; color: #fff; } .user-msg, .bot-msg { padding: 12px 16px; border-radius: 18px; margin: 8px 0; line-height: 1.5; border: none; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } .user-msg { background: linear-gradient(135deg, #4a5568, #2d3748); color: white; margin-left: 20%; border-bottom-right-radius: 5px; border: none; } .bot-msg { background: linear-gradient(135deg, #2d3748, #1a202c); color: white; margin-right: 20%; border-bottom-left-radius: 5px; border: none; } /* Remove top border from chat messages */ .user-msg, .bot-msg { border-top: none !important; } /* Remove borders from chat container */ .chatbot > div { border: none !important; } .chatbot .message { border: none !important; } /* Improve scrollbar */ .chatbot::-webkit-scrollbar { width: 8px; } .chatbot::-webkit-scrollbar-track { background: #2a2a2a; border-radius: 4px; } .chatbot::-webkit-scrollbar-thumb { background: #4a5568; border-radius: 4px; } .chatbot::-webkit-scrollbar-thumb:hover { background: #5a6578; } /* Rest of the CSS remains the same */ .gradio-container { background-color: #121212; color: #fff; } .upload-box { background-color: #333; border-radius: 8px; padding: 16px; margin-bottom: 16px; } #question-input { background-color: #333; color: #fff; border-radius: 8px; padding: 12px; border: 1px solid #555; } .mode-checkbox { background-color: #333; color: #fff; border: 1px solid #555; border-radius: 8px; padding: 10px; margin: 5px; } .slider-container { margin-top: 20px; padding: 15px; border-radius: 10px; background-color: #2a2a2a; } .system-info { background-color: #7B9BDB; padding: 15px; border-radius: 8px; margin: 15px 0; border-left: 4px solid #1890ff; } .chat-image { cursor: pointer; transition: transform 0.2s; max-height: 100px; margin: 4px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } .chat-image:hover { transform: scale(1.05); box-shadow: 0 4px 8px rgba(0,0,0,0.2); } .modal { position: fixed; top: 0; left: 0; width: 100%; height: 100%; background: rgba(0,0,0,0.8); display: none; z-index: 1000; cursor: zoom-out; } .modal-content { position: absolute; top: 50%; left: 50%; transform: translate(-50%, -50%); max-width: 90%; max-height: 90%; background: white; padding: 10px; border-radius: 12px; } .modal-img { width: auto; height: auto; max-width: 100%; max-height: 100%; border-radius: 8px; } .typing-indicator { display: inline-block; position: relative; width: 40px; height: 20px; } .typing-dot { display: inline-block; width: 6px; height: 6px; border-radius: 50%; background-color: #fff; position: absolute; animation: typing 1.4s infinite ease-in-out; } .typing-dot:nth-child(1) { left: 0; animation-delay: 0s; } .typing-dot:nth-child(2) { left: 12px; animation-delay: 0.2s; } .typing-dot:nth-child(3) { left: 24px; animation-delay: 0.4s; } @keyframes typing { 0%, 60%, 100% { transform: translateY(0); } 30% { transform: translateY(-5px); } } .lesson-plan { background: linear-gradient(135deg, #1a202c, #2d3748); padding: 15px; border-radius: 12px; margin: 10px 0; border-left: 4px solid #4a9df0; } .lesson-section { margin-bottom: 15px; padding-bottom: 10px; border-bottom: 1px solid #4a5568; } .lesson-title { font-size: 1.2em; font-weight: bold; color: #4a9df0; margin-bottom: 8px; } .page-ref { background-color: #4a5568; padding: 3px 8px; border-radius: 4px; font-size: 0.9em; display: inline-block; margin: 3px; } """ # Create Gradio interface with gr.Blocks(css=css, title="Unified EDU Assistant") as demo: gr.Markdown("# 🤖 Unified EDU Assistant by Phanindra Reddy K") # System info banner gr.HTML("""