Spaces:
Runtime error
Runtime error
| import subprocess | |
| # Install flash attention | |
| subprocess.run( | |
| "pip install flash-attn --no-build-isolation", | |
| env={"FLASH_ATTENTION_SKIP_CUDA_BUILD": "TRUE"}, | |
| shell=True, | |
| check=True # This will raise an exception if the command fails | |
| ) | |
| # Rest of your app.py code | |
| import gradio as gr | |
| import asyncio | |
| import os | |
| import thinkingframes | |
| import soundfile as sf | |
| import numpy as np | |
| import logging | |
| from transformers import pipeline | |
| from dotenv import load_dotenv | |
| from policy import user_acceptance_policy | |
| from styles import theme | |
| from thinkingframes import generate_prompt, strategy_options, questions | |
| from utils import get_image_html, collect_student_info | |
| from database_functions import add_user_privacy, add_submission | |
| from tab_teachers_dashboard import create_teachers_dashboard_tab | |
| from config import CLASS_OPTIONS | |
| from concurrent.futures import ThreadPoolExecutor | |
| import spaces | |
| from streaming_stt_nemo import Model | |
| import edge_tts | |
| import tempfile | |
| load_dotenv() | |
| default_lang = "en" | |
| engines = {default_lang: Model(default_lang)} | |
| # For maintaining user session (to keep track of userID) | |
| user_state = gr.State(value="") | |
| def transcribe(audio): | |
| lang = "en" | |
| model = engines[lang] | |
| text = model.stt_file(audio)[0] | |
| return text | |
| # Load the Meta-Llama-3-8B model from Hugging Face | |
| llm = pipeline("text-generation", model="models/meta-llama/Meta-Llama-3-8B") | |
| image_path = "picturePerformance.jpg" | |
| img_html = get_image_html(image_path) | |
| executor = ThreadPoolExecutor() | |
| def generate_feedback(user_id, question_choice, strategy_choice, message, feedback_level): | |
| current_question_index = questions.index(question_choice) | |
| strategy, explanation = strategy_options[strategy_choice] | |
| conversation = [{ | |
| "role": "system", | |
| "content": thinkingframes.generate_system_message(current_question_index, feedback_level) | |
| }, { | |
| "role": "user", | |
| "content": message | |
| }] | |
| feedback = llm(conversation, max_length=1000, num_return_sequences=1)[0]["generated_text"] | |
| questionNo = current_question_index + 1 | |
| add_submission(user_id, message, feedback, int(0), "", questionNo) | |
| return feedback | |
| async def generate_audio_feedback(feedback_buffer): | |
| communicate = edge_tts.Communicate(feedback_buffer) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
| tmp_path = tmp_file.name | |
| await communicate.save(tmp_path) | |
| return tmp_path | |
| async def predict(question_choice, strategy_choice, feedback_level, audio): | |
| current_audio_output = None | |
| if audio is None: | |
| yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "No audio data received. Please try again.")], current_audio_output | |
| return | |
| sample_rate, audio_data = audio | |
| if audio_data is None or len(audio_data) == 0: | |
| yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "No audio data received. Please try again.")], current_audio_output | |
| return | |
| audio_path = "audio.wav" | |
| if not isinstance(audio_data, np.ndarray): | |
| raise ValueError("audio_data must be a numpy array") | |
| sf.write(audio_path, audio_data, sample_rate) | |
| chat_history = [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "Transcribing your audio, please listen to your oral response while waiting ...")] | |
| yield chat_history, current_audio_output | |
| try: | |
| transcription_future = executor.submit(transcribe, audio_path) | |
| student_response = await asyncio.wrap_future(transcription_future) | |
| if not student_response.strip(): | |
| yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "Transcription failed. Please try again or seek assistance.")], current_audio_output | |
| return | |
| chat_history.append(("Student", student_response)) | |
| yield chat_history, current_audio_output | |
| chat_history.append(("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "Transcription complete. Generating feedback. Please continue listening to your oral response while waiting ...")) | |
| yield chat_history, current_audio_output | |
| feedback_future = executor.submit(generate_feedback, int(user_state.value), question_choice, strategy_choice, student_response, feedback_level) | |
| feedback = await asyncio.wrap_future(feedback_future) | |
| chat_history.append(("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", feedback)) | |
| yield chat_history, current_audio_output | |
| audio_future = executor.submit(generate_audio_feedback, feedback) | |
| audio_output_path = await asyncio.wrap_future(audio_future) | |
| current_audio_output = (24000, audio_output_path) | |
| yield chat_history, current_audio_output | |
| except Exception as e: | |
| logging.error(f"An error occurred: {str(e)}", exc_info=True) | |
| yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "An error occurred. Please try again or seek assistance.")], current_audio_output | |
| # Modify the toggle_oral_coach_visibility function to call add_user_privacy and store the returned user_id in user_state.value | |
| def toggle_oral_coach_visibility(class_name, index_no, policy_checked): | |
| if not policy_checked: | |
| return "Please agree to the Things to Note When using the Oral Coach ⚡ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡ before submitting.", gr.update(visible=False) | |
| user_id, message = add_user_privacy(class_name, index_no) | |
| if "Error" in message: | |
| return message, gr.update(visible=False) | |
| user_state.value = user_id | |
| return message, gr.update(visible=True) | |
| with gr.Blocks(title="Oral Coach powered by Hugging Face", theme=theme) as app: | |
| with gr.Tab("Oral Coach ⚡ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡"): | |
| gr.Markdown("## Student Information") | |
| class_name = gr.Dropdown(label="Class", choices=CLASS_OPTIONS) | |
| index_no = gr.Dropdown(label="Index No", choices=[f"{i:02}" for i in range(1, 46)]) | |
| policy_text = gr.Markdown(user_acceptance_policy) | |
| policy_checkbox = gr.Checkbox(label="I have read and agree to the Things to Note When using the Oral Coach ⚡ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", value=False) | |
| submit_info_btn = gr.Button("Submit Info") | |
| info_output = gr.Text() | |
| with gr.Column(visible=False) as oral_coach_content: | |
| gr.Markdown("## Powered by Hugging Face") | |
| gr.Markdown(img_html) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Step 1: Choose a Question") | |
| question_choice = gr.Radio(thinkingframes.questions, label="Questions", value=thinkingframes.questions[0]) | |
| gr.Markdown("### Step 2: Choose a Thinking Frame") | |
| strategy_choice = gr.Dropdown(list(strategy_options.keys()), label="Thinking Frame", value=list(strategy_options.keys())[0]) | |
| gr.Markdown("### Step 3: Choose Feedback Level") | |
| feedback_level = gr.Radio(["Brief Feedback", "Moderate Feedback", "Comprehensive Feedback"], label="Feedback Level") | |
| feedback_level.value = "Brief Feedback" | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Step 4: Record Your Answer") | |
| audio_input = gr.Audio(type="numpy", sources=["microphone"], label="Record") | |
| submit_answer_btn = gr.Button("Submit Oral Response") | |
| gr.Markdown("### Step 5: Review your personalised feedback") | |
| feedback_output = gr.Chatbot(label="Feedback", scale=4, height=700, show_label=True) | |
| audio_output = gr.Audio(type="numpy", label="Audio Playback", format="wav", autoplay="True") | |
| submit_answer_btn.click( | |
| predict, | |
| inputs=[question_choice, strategy_choice, feedback_level, audio_input], | |
| outputs=[feedback_output, audio_output] | |
| ) | |
| submit_info_btn.click( | |
| toggle_oral_coach_visibility, | |
| inputs=[class_name, index_no, policy_checkbox], | |
| outputs=[info_output, oral_coach_content] | |
| ) | |
| create_teachers_dashboard_tab() | |
| app.queue(max_size=20).launch( | |
| debug=True, | |
| server_port=int(os.environ.get("PORT", 10000)), | |
| favicon_path="favicon.ico" | |
| ) | |