Spaces:
Sleeping
Sleeping
import time | |
import gradio as gr | |
import pandas as pd | |
import openvino_genai | |
from huggingface_hub import snapshot_download | |
from threading import Lock | |
import os | |
import numpy as np | |
import requests | |
from PIL import Image | |
from io import BytesIO | |
import cpuinfo | |
import openvino as ov | |
import librosa | |
from googleapiclient.discovery import build | |
import gc | |
import tempfile | |
from PyPDF2 import PdfReader | |
from docx import Document | |
import textwrap | |
# Google API configuration | |
GOOGLE_API_KEY = "AIzaSyAo-1iW5MEZbc53DlEldtnUnDaYuTHUDH4" | |
GOOGLE_CSE_ID = "3027bedf3c88a4efb" | |
DEFAULT_MAX_TOKENS = 100 | |
DEFAULT_NUM_IMAGES = 1 | |
MAX_HISTORY_TURNS = 3 | |
MAX_TOKENS_LIMIT = 1000 | |
class UnifiedAISystem: | |
def __init__(self): | |
self.pipe_lock = Lock() | |
self.current_df = None | |
self.mistral_pipe = None | |
self.internvl_pipe = None | |
self.whisper_pipe = None | |
self.current_document_text = None # Store document content | |
self.initialize_models() | |
def initialize_models(self): | |
"""Initialize all required models""" | |
# Download models if not exists | |
if not os.path.exists("mistral-ov"): | |
snapshot_download(repo_id="OpenVINO/mistral-7b-instruct-v0.1-int8-ov", local_dir="mistral-ov") | |
if not os.path.exists("internvl-ov"): | |
snapshot_download(repo_id="OpenVINO/InternVL2-1B-int8-ov", local_dir="internvl-ov") | |
if not os.path.exists("whisper-ov-model"): | |
snapshot_download(repo_id="OpenVINO/whisper-tiny-fp16-ov", local_dir="whisper-ov-model") | |
# CPU-specific configuration | |
cpu_features = cpuinfo.get_cpu_info()['flags'] | |
config_options = {} | |
if 'avx512' in cpu_features: | |
config_options["ENFORCE_BF16"] = "YES" | |
elif 'avx2' in cpu_features: | |
config_options["INFERENCE_PRECISION_HINT"] = "f32" | |
# Initialize Mistral model | |
self.mistral_pipe = openvino_genai.LLMPipeline( | |
"mistral-ov", | |
device="CPU", | |
config={"PERFORMANCE_HINT": "THROUGHPUT", **config_options} | |
) | |
# Initialize Whisper for audio processing | |
self.whisper_pipe = openvino_genai.WhisperPipeline("whisper-ov-model", device="CPU") | |
def load_data(self, file_path): | |
"""Load student data from file""" | |
try: | |
file_ext = os.path.splitext(file_path)[1].lower() | |
if file_ext == '.csv': | |
self.current_df = pd.read_csv(file_path) | |
elif file_ext in ['.xlsx', '.xls']: | |
self.current_df = pd.read_excel(file_path) | |
else: | |
return False, "❌ Unsupported file format. Please upload a .csv or .xlsx file." | |
return True, f"✅ Loaded {len(self.current_df)} records from {os.path.basename(file_path)}" | |
except Exception as e: | |
return False, f"❌ Error loading file: {str(e)}" | |
def extract_text_from_document(self, file_path): | |
"""Extract text from PDF or DOCX documents""" | |
text = "" | |
try: | |
file_ext = os.path.splitext(file_path)[1].lower() | |
if file_ext == '.pdf': | |
with open(file_path, 'rb') as file: | |
pdf_reader = PdfReader(file) | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
elif file_ext == '.docx': | |
doc = Document(file_path) | |
for para in doc.paragraphs: | |
text += para.text + "\n" | |
else: | |
return False, "❌ Unsupported document format. Please upload PDF or DOCX." | |
# Clean and format text | |
text = text.replace('\x0c', '') # Remove form feed characters | |
text = textwrap.dedent(text) # Remove common leading whitespace | |
self.current_document_text = text | |
return True, f"✅ Extracted text from {os.path.basename(file_path)}" | |
except Exception as e: | |
return False, f"❌ Error processing document: {str(e)}" | |
def analyze_student_data(self, query): | |
"""Analyze student data using AI with streaming""" | |
if not query or not query.strip(): | |
yield "⚠️ Please enter a valid question" | |
return | |
if self.current_df is None: | |
yield "⚠️ Please upload and load a student data file first" | |
return | |
data_summary = self._prepare_data_summary(self.current_df) | |
prompt = f"""You are an expert education analyst. Analyze the following student performance data: | |
{data_summary} | |
Question: {query} | |
Please include: | |
1. Direct answer to the question | |
2. Relevant statistics | |
3. Key insights | |
4. Actionable recommendations | |
Format the output with clear headings""" | |
optimized_config = openvino_genai.GenerationConfig( | |
max_new_tokens=500, | |
temperature=0.3, | |
top_p=0.9, | |
streaming=True | |
) | |
full_response = "" | |
try: | |
with self.pipe_lock: | |
token_iterator = self.mistral_pipe.generate(prompt, optimized_config, streaming=True) | |
for token in token_iterator: | |
full_response += token | |
yield full_response | |
except Exception as e: | |
yield f"❌ Error during analysis: {str(e)}" | |
def _prepare_data_summary(self, df): | |
"""Summarize the uploaded data""" | |
summary = f"Student performance data with {len(df)} rows and {len(df.columns)} columns.\n" | |
summary += "Columns: " + ", ".join(df.columns) + "\n" | |
summary += "First 3 rows:\n" + df.head(3).to_string(index=False) | |
return summary | |
def analyze_image(self, image, url, prompt): | |
"""Analyze image with InternVL model""" | |
try: | |
if image is not None: | |
image_source = image | |
elif url and url.startswith(("http://", "https://")): | |
response = requests.get(url) | |
image_source = Image.open(BytesIO(response.content)).convert("RGB") | |
else: | |
return "⚠️ Please upload an image or enter a valid URL" | |
# Convert to OpenVINO tensor | |
image_data = np.array(image_source.getdata()).reshape( | |
1, image_source.size[1], image_source.size[0], 3 | |
).astype(np.byte) | |
image_tensor = ov.Tensor(image_data) | |
# Lazy initialize InternVL | |
if self.internvl_pipe is None: | |
self.internvl_pipe = openvino_genai.VLMPipeline("internvl-ov", device="CPU") | |
with self.pipe_lock: | |
self.internvl_pipe.start_chat() | |
output = self.internvl_pipe.generate(prompt, image=image_tensor, max_new_tokens=100) | |
self.internvl_pipe.finish_chat() | |
return output | |
except Exception as e: | |
return f"❌ Error: {str(e)}" | |
def process_audio(self, data, sr): | |
"""Process audio data for speech recognition""" | |
try: | |
# Convert to mono | |
if data.ndim > 1: | |
data = np.mean(data, axis=1) # Simple mono conversion | |
else: | |
data = data | |
# Convert to float32 and normalize | |
data = data.astype(np.float32) | |
max_val = np.max(np.abs(data)) + 1e-7 | |
data /= max_val | |
# Simple noise reduction | |
data = np.clip(data, -0.5, 0.5) | |
# Trim silence | |
energy = np.abs(data) | |
threshold = np.percentile(energy, 25) # Simple threshold | |
mask = energy > threshold | |
indices = np.where(mask)[0] | |
if len(indices) > 0: | |
start = max(0, indices[0] - 1000) | |
end = min(len(data), indices[-1] + 1000) | |
data = data[start:end] | |
# Resample if needed using simpler method | |
if sr != 16000: | |
# Calculate new length | |
new_length = int(len(data) * 16000 / sr) | |
# Linear interpolation for resampling | |
data = np.interp( | |
np.linspace(0, len(data)-1, new_length), | |
np.arange(len(data)), | |
data | |
) | |
sr = 16000 | |
return data | |
except Exception as e: | |
print(f"Audio processing error: {e}") | |
return np.array([], dtype=np.float32) | |
def transcribe(self, audio): | |
"""Transcribe audio using Whisper model with improved error handling""" | |
if audio is None: | |
return "" | |
sr, data = audio | |
# Skip if audio is too short (less than 0.5 seconds) | |
if len(data)/sr < 0.5: | |
return "" | |
try: | |
processed = self.process_audio(data, sr) | |
# Skip if audio is still too short after processing | |
if len(processed) < 8000: # 0.5 seconds at 16kHz | |
return "" | |
# Use OpenVINO Whisper pipeline | |
result = self.whisper_pipe.generate(processed) | |
return result | |
except Exception as e: | |
print(f"Transcription error: {e}") | |
return "❌ Transcription failed - please try again" | |
def generate_lesson_plan(self, topic, duration, additional_instructions=""): | |
"""Generate a lesson plan based on document content""" | |
if not self.current_document_text: | |
return "⚠️ Please upload and process a document first" | |
prompt = f"""As an expert educator, create a focused lesson plan using the provided content. | |
**Core Requirements:** | |
1. TOPIC: {topic} | |
2. TOTAL DURATION: {duration} periods | |
3. ADDITIONAL INSTRUCTIONS: {additional_instructions or 'None'} | |
**Content Summary:** | |
{self.current_document_text[:2500]}... [truncated] | |
**Output Structure:** | |
1. PERIOD ALLOCATION (Break topic into {duration} logical segments): | |
- Period 1: [Subtopic 1] | |
- Period 2: [Subtopic 2] | |
... | |
2. LEARNING OBJECTIVES (Max 3 bullet points) | |
3. TEACHING ACTIVITIES (One engaging method per period) | |
4. RESOURCES (Key materials from document) | |
5. ASSESSMENT (Simple checks for understanding) | |
6. PAGE REFERENCES (Specific source pages) | |
**Key Rules:** | |
- Strictly divide content into exactly {duration} periods | |
- Prioritize document content over creativity | |
- Keep objectives measurable | |
- Use only document resources | |
- Make page references specific""" | |
optimized_config = openvino_genai.GenerationConfig( | |
max_new_tokens=1200, | |
temperature=0.4, | |
top_p=0.85 | |
) | |
try: | |
with self.pipe_lock: | |
return self.mistral_pipe.generate(prompt, optimized_config) | |
except Exception as e: | |
return f"❌ Error generating lesson plan: {str(e)}" | |
def fetch_images(self, query: str, num: int = DEFAULT_NUM_IMAGES) -> list: | |
"""Fetch unique images by requesting different result pages""" | |
if num <= 0: | |
return [] | |
try: | |
service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY) | |
image_links = [] | |
seen_urls = set() # To track unique URLs | |
# Start from different positions to get unique images | |
for start_index in range(1, num * 2, 2): | |
if len(image_links) >= num: | |
break | |
res = service.cse().list( | |
q=query, | |
cx=GOOGLE_CSE_ID, | |
searchType="image", | |
num=1, | |
start=start_index | |
).execute() | |
if "items" in res and res["items"]: | |
item = res["items"][0] | |
# Skip duplicates | |
if item["link"] not in seen_urls: | |
image_links.append(item["link"]) | |
seen_urls.add(item["link"]) | |
return image_links[:num] | |
except Exception as e: | |
print(f"Error in image fetching: {e}") | |
return [] | |
def stream_answer(self, message: str, max_tokens: int) -> str: | |
"""Stream tokens with typing indicator""" | |
optimized_config = openvino_genai.GenerationConfig( | |
max_new_tokens=max_tokens, | |
temperature=0.7, | |
top_p=0.9, | |
streaming=True | |
) | |
full_response = "" | |
try: | |
with self.pipe_lock: | |
token_iterator = self.mistral_pipe.generate(message, optimized_config, streaming=True) | |
for token in token_iterator: | |
full_response += token | |
yield full_response | |
# Periodic garbage collection | |
if len(full_response) % 20 == 0: | |
gc.collect() | |
except Exception as e: | |
yield f"❌ Error: {str(e)}" | |
# Initialize global object | |
ai_system = UnifiedAISystem() | |
# CSS styles with improved output box | |
css = """ | |
.gradio-container { | |
background-color: #121212; | |
color: #fff; | |
} | |
.user-msg, .bot-msg { | |
padding: 12px 16px; | |
border-radius: 18px; | |
margin: 8px 0; | |
line-height: 1.5; | |
border: none; | |
box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
} | |
.user-msg { | |
background: linear-gradient(135deg, #4a5568, #2d3748); | |
color: white; | |
margin-left: 20%; | |
border-bottom-right-radius: 5px; | |
border: none; | |
} | |
.bot-msg { | |
background: linear-gradient(135deg, #2d3748, #1a202c); | |
color: white; | |
margin-right: 20%; | |
border-bottom-left-radius: 5px; | |
border: none; | |
} | |
/* Remove top border from chat messages */ | |
.user-msg, .bot-msg { | |
border-top: none !important; | |
} | |
/* Remove borders from chat container */ | |
.chatbot > div { | |
border: none !important; | |
} | |
.chatbot .message { | |
border: none !important; | |
} | |
/* Improve scrollbar */ | |
.chatbot::-webkit-scrollbar { | |
width: 8px; | |
} | |
.chatbot::-webkit-scrollbar-track { | |
background: #2a2a2a; | |
border-radius: 4px; | |
} | |
.chatbot::-webkit-scrollbar-thumb { | |
background: #4a5568; | |
border-radius: 4px; | |
} | |
.chatbot::-webkit-scrollbar-thumb:hover { | |
background: #5a6578; | |
} | |
/* Rest of the CSS remains the same */ | |
.gradio-container { | |
background-color: #121212; | |
color: #fff; | |
} | |
.upload-box { | |
background-color: #333; | |
border-radius: 8px; | |
padding: 16px; | |
margin-bottom: 16px; | |
} | |
#question-input { | |
background-color: #333; | |
color: #fff; | |
border-radius: 8px; | |
padding: 12px; | |
border: 1px solid #555; | |
} | |
.mode-checkbox { | |
background-color: #333; | |
color: #fff; | |
border: 1px solid #555; | |
border-radius: 8px; | |
padding: 10px; | |
margin: 5px; | |
} | |
.slider-container { | |
margin-top: 20px; | |
padding: 15px; | |
border-radius: 10px; | |
background-color: #2a2a2a; | |
} | |
.system-info { | |
background-color: #7B9BDB; | |
padding: 15px; | |
border-radius: 8px; | |
margin: 15px 0; | |
border-left: 4px solid #1890ff; | |
} | |
.chat-image { | |
cursor: pointer; | |
transition: transform 0.2s; | |
max-height: 100px; | |
margin: 4px; | |
border-radius: 8px; | |
box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
} | |
.chat-image:hover { | |
transform: scale(1.05); | |
box-shadow: 0 4px 8px rgba(0,0,0,0.2); | |
} | |
.modal { | |
position: fixed; | |
top: 0; | |
left: 0; | |
width: 100%; | |
height: 100%; | |
background: rgba(0,0,0,0.8); | |
display: none; | |
z-index: 1000; | |
cursor: zoom-out; | |
} | |
.modal-content { | |
position: absolute; | |
top: 50%; | |
left: 50%; | |
transform: translate(-50%, -50%); | |
max-width: 90%; | |
max-height: 90%; | |
background: white; | |
padding: 10px; | |
border-radius: 12px; | |
} | |
.modal-img { | |
width: auto; | |
height: auto; | |
max-width: 100%; | |
max-height: 100%; | |
border-radius: 8px; | |
} | |
.typing-indicator { | |
display: inline-block; | |
position: relative; | |
width: 40px; | |
height: 20px; | |
} | |
.typing-dot { | |
display: inline-block; | |
width: 6px; | |
height: 6px; | |
border-radius: 50%; | |
background-color: #fff; | |
position: absolute; | |
animation: typing 1.4s infinite ease-in-out; | |
} | |
.typing-dot:nth-child(1) { | |
left: 0; | |
animation-delay: 0s; | |
} | |
.typing-dot:nth-child(2) { | |
left: 12px; | |
animation-delay: 0.2s; | |
} | |
.typing-dot:nth-child(3) { | |
left: 24px; | |
animation-delay: 0.4s; | |
} | |
@keyframes typing { | |
0%, 60%, 100% { transform: translateY(0); } | |
30% { transform: translateY(-5px); } | |
} | |
.lesson-plan { | |
background: linear-gradient(135deg, #1a202c, #2d3748); | |
padding: 15px; | |
border-radius: 12px; | |
margin: 10px 0; | |
border-left: 4px solid #4a9df0; | |
} | |
.lesson-section { | |
margin-bottom: 15px; | |
padding-bottom: 10px; | |
border-bottom: 1px solid #4a5568; | |
} | |
.lesson-title { | |
font-size: 1.2em; | |
font-weight: bold; | |
color: #4a9df0; | |
margin-bottom: 8px; | |
} | |
.page-ref { | |
background-color: #4a5568; | |
padding: 3px 8px; | |
border-radius: 4px; | |
font-size: 0.9em; | |
display: inline-block; | |
margin: 3px; | |
} | |
""" | |
# Create Gradio interface | |
with gr.Blocks(css=css, title="Unified EDU Assistant") as demo: | |
gr.Markdown("# 🤖 Unified EDU Assistant by Phanindra Reddy K") | |
# System info banner | |
gr.HTML(""" | |
<div class="system-info"> | |
<strong>Multi-Modal AI Assistant</strong> | |
<ul> | |
<li>Text & Voice Chat with Mistral-7B</li> | |
<li>Image Understanding with InternVL</li> | |
<li>Student Data Analysis</li> | |
<li>Visual Search with Google Images</li> | |
<li>Lesson Planning from Documents</li> | |
</ul> | |
</div> | |
""") | |
# Modal for image preview | |
modal_html = """ | |
<div class="modal" id="imageModal" onclick="this.style.display='none'"> | |
<div class="modal-content"> | |
<img class="modal-img" id="expandedImg"> | |
</div> | |
</div> | |
<script> | |
function showImage(url) { | |
document.getElementById('expandedImg').src = url; | |
document.getElementById('imageModal').style.display = 'block'; | |
} | |
</script> | |
""" | |
gr.HTML(modal_html) | |
chat_state = gr.State([]) | |
with gr.Column(scale=2, elem_classes="chat-container"): | |
chatbot = gr.Chatbot(label="Conversation", height=500, bubble_full_width=False, | |
avatar_images=("user.png", "bot.png"), show_label=False) | |
# Mode selection | |
with gr.Row(): | |
chat_mode = gr.Checkbox(label="💬 General Chat", value=True, elem_classes="mode-checkbox") | |
student_mode = gr.Checkbox(label="🎓 Student Analytics", value=False, elem_classes="mode-checkbox") | |
image_mode = gr.Checkbox(label="🖼️ Image Analysis", value=False, elem_classes="mode-checkbox") | |
lesson_mode = gr.Checkbox(label="📝 Lesson Planning", value=False, elem_classes="mode-checkbox") | |
# Dynamic input fields | |
with gr.Column() as chat_inputs: | |
include_images = gr.Checkbox(label="Include Visuals", value=True) | |
user_input = gr.Textbox( | |
placeholder="Type your question here...", | |
label="Your Question", | |
container=False, | |
elem_id="question-input" | |
) | |
with gr.Row(): | |
max_tokens = gr.Slider( | |
minimum=10, | |
maximum=1000, | |
value=100, | |
step=10, | |
label="Response Length (Tokens)" | |
) | |
num_images = gr.Slider( | |
minimum=0, | |
maximum=5, | |
value=1, | |
step=1, | |
label="Number of Images", | |
visible=True | |
) | |
with gr.Column(visible=False) as student_inputs: | |
file_upload = gr.File(label="CSV/Excel File", file_types=[".csv", ".xlsx"], type="filepath") | |
student_question = gr.Textbox( | |
placeholder="Ask questions about student data...", | |
label="Your Question", | |
elem_id="question-input" | |
) | |
student_status = gr.Markdown("No file loaded") | |
with gr.Column(visible=False) as image_inputs: | |
image_upload = gr.Image(type="pil", label="Upload Image") | |
image_url = gr.Textbox( | |
label="OR Enter Image URL", | |
placeholder="https://example.com/image.jpg", | |
elem_id="question-input" | |
) | |
image_question = gr.Textbox( | |
placeholder="Ask questions about the image...", | |
label="Your Question", | |
elem_id="question-input" | |
) | |
# Lesson planning section | |
with gr.Column(visible=False) as lesson_inputs: | |
gr.Markdown("### 📚 Lesson Planning") | |
doc_upload = gr.File( | |
label="Upload Curriculum Document (PDF/DOCX)", | |
file_types=[".pdf", ".docx"], | |
type="filepath" | |
) | |
doc_status = gr.Markdown("No document uploaded") | |
with gr.Row(): | |
topic_input = gr.Textbox( | |
label="Lesson Topic", | |
placeholder="Enter the main topic for the lesson plan" | |
) | |
duration_input = gr.Number( | |
label="Total Periods", | |
value=5, | |
minimum=1, | |
maximum=20, | |
step=1 | |
) | |
additional_instructions = gr.Textbox( | |
label="Additional Requirements (optional)", | |
placeholder="Specific teaching methods, resources, or special considerations..." | |
) | |
generate_btn = gr.Button("Generate Lesson Plan", variant="primary") | |
# Common controls | |
with gr.Row(): | |
submit_btn = gr.Button("Send", variant="primary") | |
mic_btn = gr.Button("Transcribe Voice", variant="secondary") | |
mic = gr.Audio(sources=["microphone"], type="numpy", label="Voice Input") | |
processing = gr.HTML(""" | |
<div style="display: none;"> | |
<div class="processing">🔮 Processing your request...</div> | |
</div> | |
""") | |
# Event handlers | |
def toggle_modes(chat, student, image, lesson): | |
return [ | |
gr.update(visible=chat), | |
gr.update(visible=student), | |
gr.update(visible=image), | |
gr.update(visible=lesson) | |
] | |
def load_student_file(file_path): | |
success, message = ai_system.load_data(file_path) | |
return message | |
def process_document(file_path): | |
if not file_path: | |
return "⚠️ Please select a document first" | |
success, message = ai_system.extract_text_from_document(file_path) | |
return message | |
def render_history(history): | |
"""Render chat history with images and proper formatting""" | |
rendered = [] | |
for user_msg, bot_msg, image_links in history: | |
# Apply proper styling to messages | |
user_html = f"<div class='user-msg'>{user_msg}</div>" | |
# Special formatting for lesson plans | |
if "Lesson Plan:" in bot_msg: | |
bot_html = f"<div class='lesson-plan'>{bot_msg}</div>" | |
else: | |
bot_html = f"<div class='bot-msg'>{bot_msg}</div>" | |
# Add images if available | |
if image_links: | |
images_html = "".join( | |
f"<img src='{url}' class='chat-image' onclick='showImage(\"{url}\")' />" | |
for url in image_links | |
) | |
bot_html += f"<br><br><b>📸 Related Visuals:</b><br><div style='display: flex; flex-wrap: wrap;'>{images_html}</div>" | |
rendered.append((user_html, bot_html)) | |
return rendered | |
def respond(message, chat_hist, chat, student, image, lesson, | |
tokens, student_q, image_q, image_upload, image_url, | |
include_visuals, num_imgs): | |
# If in lesson planning mode, skip this handler | |
if lesson: | |
return chat_hist, message | |
# Determine the actual question based on mode | |
if chat: | |
actual_question = message | |
elif student: | |
actual_question = student_q | |
elif image: | |
actual_question = image_q | |
else: | |
actual_question = message | |
# Immediately show user question in chat | |
typing_html = "<div class='typing-indicator'><div class='typing-dot'></div><div class='typing-dot'></div><div class='typing-dot'></div></div>" | |
chat_hist.append((actual_question, typing_html, [])) | |
yield render_history(chat_hist), "" | |
if chat: | |
# General chat mode | |
full_response = "" | |
for chunk in ai_system.stream_answer(message, tokens): | |
full_response = chunk | |
# Update with current response | |
chat_hist[-1] = (actual_question, full_response, []) | |
yield render_history(chat_hist), "" | |
# Fetch images if requested | |
image_links = [] | |
if include_visuals and num_imgs > 0: | |
image_links = ai_system.fetch_images(message, num_imgs) | |
# Update with final response and images | |
chat_hist[-1] = (actual_question, full_response, image_links) | |
yield render_history(chat_hist), "" | |
elif student: | |
# Student analytics mode | |
if ai_system.current_df is None: | |
chat_hist[-1] = (actual_question, "⚠️ Please upload a student data file first", []) | |
yield render_history(chat_hist), "" | |
else: | |
response = "" | |
for chunk in ai_system.analyze_student_data(student_q): | |
response = chunk | |
chat_hist[-1] = (actual_question, response, []) | |
yield render_history(chat_hist), "" | |
elif image: | |
# Image analysis mode | |
if not image_upload and not image_url: | |
chat_hist[-1] = (actual_question, "⚠️ Please upload an image or enter a URL", []) | |
yield render_history(chat_hist), "" | |
else: | |
try: | |
result = ai_system.analyze_image(image_upload, image_url, image_q) | |
chat_hist[-1] = (actual_question, result, []) | |
yield render_history(chat_hist), "" | |
except Exception as e: | |
error_msg = f"❌ Error analyzing image: {str(e)}" | |
chat_hist[-1] = (actual_question, error_msg, []) | |
yield render_history(chat_hist), "" | |
# Trim history if too long | |
if len(chat_hist) > MAX_HISTORY_TURNS: | |
chat_hist = chat_hist[-MAX_HISTORY_TURNS:] | |
yield render_history(chat_hist), "" | |
def generate_lesson_plan(topic, duration, instructions, chat_hist): | |
if not topic: | |
return chat_hist, "⚠️ Please enter a lesson topic" | |
# Show processing message | |
processing_msg = "<div class='typing-indicator'><div class='typing-dot'></div><div class='typing-dot'></div><div class='typing-dot'></div></div>" | |
chat_hist.append((f"Generate lesson plan for: {topic}", processing_msg, [])) | |
yield render_history(chat_hist), "" | |
# Generate the plan | |
plan = ai_system.generate_lesson_plan(topic, duration, instructions) | |
# Format with proper headings | |
formatted_plan = f""" | |
<div class='lesson-plan'> | |
<div class='lesson-title'>📝 Lesson Plan: {topic} ({duration} periods)</div> | |
{plan} | |
</div> | |
""" | |
# Update chat history with final plan | |
chat_hist[-1] = ( | |
f"Generate lesson plan for: {topic}", | |
formatted_plan, | |
[] | |
) | |
yield render_history(chat_hist), "" | |
# Mode toggles | |
chat_mode.change(fn=toggle_modes, inputs=[chat_mode, student_mode, image_mode, lesson_mode], | |
outputs=[chat_inputs, student_inputs, image_inputs, lesson_inputs]) | |
student_mode.change(fn=toggle_modes, inputs=[chat_mode, student_mode, image_mode, lesson_mode], | |
outputs=[chat_inputs, student_inputs, image_inputs, lesson_inputs]) | |
image_mode.change(fn=toggle_modes, inputs=[chat_mode, student_mode, image_mode, lesson_mode], | |
outputs=[chat_inputs, student_inputs, image_inputs, lesson_inputs]) | |
lesson_mode.change(fn=toggle_modes, inputs=[chat_mode, student_mode, image_mode, lesson_mode], | |
outputs=[chat_inputs, student_inputs, image_inputs, lesson_inputs]) | |
# File upload handler | |
file_upload.change(fn=load_student_file, inputs=file_upload, outputs=student_status) | |
# Document upload handler | |
doc_upload.change(fn=process_document, inputs=doc_upload, outputs=doc_status) | |
# Voice transcription | |
def transcribe_audio(audio): | |
return ai_system.transcribe(audio) | |
mic_btn.click(fn=transcribe_audio, inputs=mic, outputs=user_input) | |
# Submit handler | |
submit_btn.click( | |
fn=respond, | |
inputs=[ | |
user_input, chat_state, chat_mode, student_mode, image_mode, lesson_mode, | |
max_tokens, student_question, image_question, image_upload, image_url, | |
include_images, num_images | |
], | |
outputs=[chatbot, user_input] | |
) | |
# Lesson plan generation button | |
generate_btn.click( | |
fn=generate_lesson_plan, | |
inputs=[topic_input, duration_input, additional_instructions, chat_state], | |
outputs=[chatbot, topic_input] | |
) | |
if __name__ == "__main__": | |
demo.launch(share=True, debug=True) |