Spaces:
Runtime error
Runtime error
import subprocess | |
# Install flash attention | |
subprocess.run( | |
"pip install flash-attn --no-build-isolation", | |
env={"FLASH_ATTENTION_SKIP_CUDA_BUILD": "TRUE"}, | |
shell=True, | |
check=True # This will raise an exception if the command fails | |
) | |
# Rest of your app.py code | |
import gradio as gr | |
import asyncio | |
import os | |
import thinkingframes | |
import soundfile as sf | |
import numpy as np | |
import logging | |
from transformers import pipeline | |
from dotenv import load_dotenv | |
from policy import user_acceptance_policy | |
from styles import theme | |
from thinkingframes import generate_prompt, strategy_options, questions | |
from utils import get_image_html, collect_student_info | |
from database_functions import add_user_privacy, add_submission | |
from tab_teachers_dashboard import create_teachers_dashboard_tab | |
from config import CLASS_OPTIONS | |
from concurrent.futures import ThreadPoolExecutor | |
import spaces | |
from streaming_stt_nemo import Model | |
import edge_tts | |
import tempfile | |
load_dotenv() | |
# Load CSS from external file | |
with open('styles.css', 'r') as file: | |
css = file.read() | |
# For maintaining user session (to keep track of userID) | |
user_state = gr.State(value="") | |
load_dotenv() | |
client = OpenAI() | |
image_path = "picturePerformance.jpg" | |
img_html = get_image_html(image_path) | |
# Create a thread pool executor | |
executor = ThreadPoolExecutor() | |
def transcribe_audio(audio_path): | |
with open(audio_path, "rb") as audio_file: | |
transcript = client.audio.transcriptions.create(file=audio_file, model="whisper-1", language="en") | |
return transcript.text | |
async def generate_feedback(user_id, question_choice, strategy_choice, message, feedback_level): | |
current_question_index = thinkingframes.questions.index(question_choice) | |
strategy, explanation = thinkingframes.strategy_options[strategy_choice] | |
conversation = [{ | |
"role": "system", | |
"content": f"You are an expert Primary 6 English Language Teacher in a Singapore Primary school, " | |
f"directly guiding a Primary 6 student in Singapore in their oral responses. " | |
f"Format the feedback in Markdown so that it can be easily read. " | |
f"Address the student directly in the second person in your feedback. " | |
f"The student is answering the question: '{thinkingframes.questions[current_question_index]}'. " | |
f"For Question 1, consider the picture description: '{thinkingframes.description}'. " | |
f"For Questions 2 and 3, the picture is not relevant, so the student should not refer to it in their response. " | |
f"Analyze the student's response using the following step-by-step approach: " | |
f"1. Evaluate the response against the {strategy} thinking frame. " | |
f"2. Assess how well the student's response addresses each component of the {strategy} thinking frame: " | |
f" - Assign emoticon scores based on how well the student comprehensively covered each component: " | |
f" - 😊😊😊 (three smiling faces) for a good coverage " | |
f" - 😊😊 (two smiling faces) for an average coverage " | |
f" - 😊 (one smiling face) for a poor coverage " | |
f" - Provide a clear, direct, and concise explanation of how well the answer addresses each component. " | |
f" - Identify specific areas for improvement in students responses, and provide targeted suggestions for improvement. " | |
f"3. Identify overall strengths and areas for improvement in the student's response using the {strategy} to format and provide targeted areas for improvement. " | |
f"4. Provide specific feedback on grammar, vocabulary, and sentence structure. " | |
f" Suggest age-appropriate enhancements that are one level higher than the student's current response. " | |
f"5. Conclude with follow-up questions for reflection. " | |
f"If the student's response deviates from the question, provide clear and concise feedback to help them refocus and try again. " | |
f"Ensure that the vocabulary and sentence structure recommendations are achievable for Primary 6 students in Singapore. " | |
f"Example Feedback Structure for Each Component: " | |
f"Component: [Component Name] " | |
f"Score: [Smiling emoticons] " | |
f"Explanation: [Clear, direct, and concise explanation of how well the answer addresses the component. Identify specific areas for improvement, and provide targeted suggestions for improvement.] " | |
f"{thinkingframes.generate_prompt(feedback_level)}" | |
}, { | |
"role": "user", | |
"content": message | |
}] | |
response = client.chat.completions.create( | |
model='gpt-4o-2024-05-13', | |
messages=conversation, | |
temperature=0.6, | |
max_tokens=1000, | |
stream=True | |
) | |
chat_history = [] # Initialize chat history outside the loop | |
full_feedback = "" # Accumulate the entire feedback message | |
try: | |
for chunk in response: | |
if chunk.choices[0].delta and chunk.choices[0].delta.content: | |
feedback_chunk = chunk.choices[0].delta.content | |
full_feedback += feedback_chunk # Accumulate the feedback | |
await asyncio.sleep(0) | |
# Append the complete feedback to the chat history | |
chat_history.append(("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", full_feedback)) | |
yield chat_history # Yield the chat history only once | |
except Exception as e: | |
logging.error(f"An error occurred during feedback generation: {str(e)}") | |
questionNo = current_question_index + 1 | |
# Save complete feedback after streaming | |
add_submission(user_id, message, full_feedback, int(0), "", questionNo) | |
async def generate_audio_feedback(feedback_buffer): | |
try: | |
response = client.audio.speech.create( | |
model="tts-1", | |
voice="alloy", | |
input=feedback_buffer, | |
response_format="wav" | |
) | |
audio_data = np.frombuffer(response.read(), dtype=np.int16) | |
sample_rate = 24000 # Default sample rate for OpenAI's WAV output | |
return (sample_rate, audio_data) | |
except Exception as e: | |
logging.error(f"An error occurred during speech generation: {str(e)}") | |
return None # Return None in case of an error | |
async def predict(question_choice, strategy_choice, feedback_level, audio): | |
current_audio_output = None # Initialize current_audio_output to None | |
final_feedback = "" # Store only the assistant's feedback | |
if audio is None: | |
yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "No audio data received. Please try again.")], current_audio_output | |
return | |
sample_rate, audio_data = audio | |
if audio_data is None or len(audio_data) == 0: | |
yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "No audio data received. Please try again.")], current_audio_output | |
return | |
audio_path = "audio.wav" | |
if not isinstance(audio_data, np.ndarray): | |
raise ValueError("audio_data must be a numpy array") | |
sf.write(audio_path, audio_data, sample_rate) | |
chat_history = [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "Transcribing your audio, please listen to your oral response while waiting ...")] | |
yield chat_history, current_audio_output | |
try: | |
transcription_future = executor.submit(transcribe_audio, audio_path) | |
student_response = await asyncio.wrap_future(transcription_future) | |
if not student_response.strip(): | |
yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "Transcription failed. Please try again or seek assistance.")], current_audio_output | |
return | |
chat_history.append(("Student", student_response)) # Add student's transcript | |
yield chat_history, current_audio_output | |
chat_history.append(("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "Transcription complete. Generating feedback. Please continue listening to your oral response while waiting ...")) | |
yield chat_history, current_audio_output | |
moderation_response = client.moderations.create(input=student_response) | |
flagged = any(result.flagged for result in moderation_response.results) | |
if flagged: | |
moderated_message = "The message has been flagged. Please see your teacher to clarify." | |
questionNo = thinkingframes.questions.index(question_choice) + 1 | |
add_submission(int(user_state.value), moderated_message, "", int(0), "", questionNo) | |
yield chat_history, current_audio_output | |
return | |
async for chat_update in generate_feedback(int(user_state.value), question_choice, strategy_choice, student_response, feedback_level): | |
# Append the assistant's feedback to the existing chat_history | |
chat_history.extend(chat_update) | |
final_feedback = chat_history[-1][1] # Update final_feedback with the latest chunk | |
yield chat_history, current_audio_output # Yield audio output | |
feedback_buffer = final_feedback # Use final_feedback for TTS | |
audio_task = asyncio.create_task(generate_audio_feedback(feedback_buffer)) | |
current_audio_output = await audio_task # Store audio output | |
yield chat_history, current_audio_output # Yield audio output | |
except Exception as e: | |
logging.error(f"An error occurred: {str(e)}", exc_info=True) | |
yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "An error occurred. Please try again or seek assistance.")], current_audio_output | |
with gr.Blocks(title="Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", theme=theme, css=css) as app: | |
with gr.Tab("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡"): | |
gr.Markdown("## Student Information") | |
class_name = gr.Dropdown(label="Class", choices=CLASS_OPTIONS) | |
index_no = gr.Dropdown(label="Index No", choices=[f"{i:02}" for i in range(1, 46)]) | |
policy_text = gr.Markdown(user_acceptance_policy) | |
policy_checkbox = gr.Checkbox(label="I have read and agree to the Things to Note When using the Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", value=False) | |
submit_info_btn = gr.Button("Submit Info") | |
info_output = gr.Text() | |
with gr.Column(visible=False) as oral_coach_content: | |
gr.Markdown("## English Language Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡") | |
gr.Markdown(img_html) # Display the image | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Step 1: Choose a Question") | |
question_choice = gr.Radio(thinkingframes.questions, label="Questions", value=thinkingframes.questions[0]) | |
gr.Markdown("### Step 2: Choose a Thinking Frame") | |
strategy_choice = gr.Dropdown(list(strategy_options.keys()), label="Thinking Frame", value=list(strategy_options.keys())[0]) | |
gr.Markdown("### Step 3: Choose Feedback Level") | |
feedback_level = gr.Radio(["Brief Feedback", "Moderate Feedback", "Comprehensive Feedback"], label="Feedback Level") | |
feedback_level.value = "Brief Feedback" | |
with gr.Column(scale=1): | |
gr.Markdown("### Step 4: Record Your Answer") | |
audio_input = gr.Audio(type="numpy", sources=["microphone"], label="Record") | |
submit_answer_btn = gr.Button("Submit Oral Response") | |
gr.Markdown("### Step 5: Review your personalised feedback") | |
feedback_output = gr.Chatbot( | |
label="Feedback", | |
scale=4, | |
height=700, | |
show_label=True | |
) | |
#audio | |
#submit_answer_here | |
audio_output = gr.Audio(type="numpy", label="Audio Playback", format="wav", autoplay="True") | |
submit_answer_btn.click( | |
predict, | |
inputs=[question_choice, strategy_choice, feedback_level, audio_input], | |
outputs=[feedback_output, audio_output], | |
api_name="predict" | |
) | |
def toggle_oral_coach_visibility(class_name, index_no, policy_checked): | |
if not policy_checked: | |
return "Please agree to the Things to Note When using the Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡ before submitting.", gr.update(visible=False) | |
validation_passed, message, userid = collect_student_info(class_name, index_no) | |
if not validation_passed: | |
return message, gr.update(visible=False) | |
user_state.value = userid | |
return message, gr.update(visible=True) | |
submit_info_btn.click( | |
toggle_oral_coach_visibility, | |
inputs=[class_name, index_no, policy_checkbox], | |
outputs=[info_output, oral_coach_content] | |
) | |
# Define other tabs like Teacher's Dashboard | |
create_teachers_dashboard_tab() | |
app.queue(max_size=20).launch( | |
debug=True, | |
server_port=int(os.environ.get("PORT", 10000)), | |
favicon_path="favicon.ico" | |
) | |