import gradio as gr import numpy as np import datetime import sentry_sdk from sqlalchemy import orm, func # Added func for count from components.header import Header from utils.logger import Logger # Changed from get_logger to Logger from utils.cloud_server_audio_loader import CloudServerAudioLoader from config import conf from utils.database import get_db from data.models import Annotation, AudioTrim, TTSData, AnnotationInterval # Added AnnotationInterval from data.repository.annotator_workload_repo import AnnotatorWorkloadRepo # For progress log = Logger() # Changed from get_logger() to Logger() LOADER = CloudServerAudioLoader(conf.FTP_URL) class DashboardPage: def __init__(self) -> None: with gr.Column(visible=False) as self.container: self.header = Header() # Header now includes progress_display with gr.Row(): # Left Column with gr.Column(scale=3): with gr.Row(): self.tts_id = gr.Textbox(label="ID", interactive=False, scale=1) self.filename = gr.Textbox(label="Filename", interactive=False, scale=3) self.sentence = gr.Textbox( label="Original Sentence", interactive=False, max_lines=5, rtl=True ) with gr.Row(): with gr.Column(scale=1, min_width=10): # Left spacer column pass self.btn_copy_sentence = gr.Button("📋 Copy to Annotated", min_width=150) with gr.Column(scale=1, min_width=10): # Right spacer column pass self.ann_sentence = gr.Textbox( label="Annotated Sentence", interactive=True, max_lines=5, rtl=True, ) with gr.Row(): self.btn_prev = gr.Button("âŦ…ī¸ Previous", min_width=120) self.btn_next_no_save = gr.Button("Next âžĄī¸ (No Save)", min_width=150) self.btn_save_next = gr.Button("Save & Next âžĄī¸", variant="primary", min_width=120) # Combined row for Delete button and Jump controls with gr.Row(): # Removed style argument to fix TypeError with gr.Column(scale=1): self.btn_delete = gr.Button("đŸ—‘ī¸ Delete Annotation") with gr.Column(scale=2): pass # Spacer column to push jump controls to the right. # # This column will expand to fill available space. # with gr.Column(scale=1, min_width=10): # pass # Jump controls, grouped in a nested Row, appearing on the right. # 'scale=0' for this nested Row and its children makes them take minimal/intrinsic space. with gr.Row(scale=0, variant='compact'): # Added variant='compact' self.jump_data_id_input = gr.Number( # show_label=False, # Remove label to reduce height label="Jump to ID (e.g. 123)", # Use placeholder for instruction value=None, # Ensure placeholder shows initially precision=0, interactive=True, min_width=120, # Adjusted for longer placeholder # scale=0 ) self.btn_jump = gr.Button("Go to data ID", min_width=70) # Compact Go button # Removed the old separate rows for delete and jump controls # Right Column with gr.Column(scale=2): self.btn_load_voice = gr.Button("Load Audio & Play", min_width=150) self.audio = gr.Audio( label="🔊 Audio", interactive=False, autoplay=True ) with gr.Group(): # Grouping trim controls gr.Markdown("### Audio Trimming") self.trim_start_sec = gr.Number( label="Trim Start (s)", value=None, # Ensure placeholder shows precision=3, interactive=True, min_width=150 ) self.trim_end_sec = gr.Number( label="Trim End (s)", value=None, # Ensure placeholder shows precision=3, interactive=True, min_width=150 ) with gr.Row(): self.btn_trim = gr.Button("➕ Add Trim", min_width=150) self.btn_undo_trim = gr.Button("â†Šī¸ Delete Last Trim", min_width=150) self.trims_display = gr.DataFrame( headers=["Start (s)", "End (s)"], col_count=(2, "fixed"), interactive=False, label="Applied Trims", wrap=True ) # State variables self.items_state = gr.State([]) self.idx_state = gr.State(0) self.original_audio_state = gr.State(None) self.applied_trims_list_state = gr.State([]) # List of all interactive UI elements for enabling/disabling self.interactive_ui_elements = [ self.btn_prev, self.btn_save_next, self.btn_next_no_save, self.btn_delete, self.btn_jump, self.jump_data_id_input, self.trim_start_sec, self.trim_end_sec, self.btn_trim, self.btn_undo_trim, self.btn_load_voice, self.ann_sentence, self.btn_copy_sentence ] # ---------------- wiring ---------------- # def register_callbacks( self, login_page, session_state: gr.State, root_blocks: gr.Blocks, review_dashboard_page=None ): self.header.register_callbacks(login_page, self, session_state, review_dashboard_page) def update_ui_interactive_state(is_interactive: bool): updates = [] for elem in self.interactive_ui_elements: if elem == self.btn_load_voice and not is_interactive: updates.append(gr.update(value="âŗ Loading Audio...", interactive=False)) elif elem == self.btn_load_voice and is_interactive: updates.append(gr.update(value="Load Audio (Autoplay)", interactive=True)) elif elem == self.btn_save_next and not is_interactive: updates.append(gr.update(value="💾 Saving...", interactive=False)) elif elem == self.btn_save_next and is_interactive: updates.append(gr.update(value="Save & Next âžĄī¸", interactive=True)) # Add similar handling for btn_next_no_save if needed for text change during processing else: updates.append(gr.update(interactive=is_interactive)) return updates def get_user_progress_fn(session): user_id = session.get("user_id") if not user_id: return "Annotation Progress: N/A" # Added label with get_db() as db: # try: # Total items assigned to the user total_assigned_query = db.query(func.sum(AnnotationInterval.end_index - AnnotationInterval.start_index + 1)).filter(AnnotationInterval.annotator_id == user_id) total_assigned_result = total_assigned_query.scalar() total_assigned = total_assigned_result if total_assigned_result is not None else 0 # Count of non-empty annotations by this user within their assigned intervals completed_count_query = db.query(func.count(Annotation.id)).join( TTSData, Annotation.tts_data_id == TTSData.id ).join( AnnotationInterval, (AnnotationInterval.annotator_id == user_id) & (TTSData.id >= AnnotationInterval.start_index) & (TTSData.id <= AnnotationInterval.end_index) ).filter( Annotation.annotator_id == user_id, Annotation.annotated_sentence != None, Annotation.annotated_sentence != "" ) completed_count_result = completed_count_query.scalar() completed_count = completed_count_result if completed_count_result is not None else 0 if total_assigned > 0: percent = (completed_count / total_assigned) * 100 bar_length = 20 # Length of the progress bar filled_length = int(bar_length * completed_count // total_assigned) bar = '█' * filled_length + '░' * (bar_length - filled_length) return f"Progress: {bar} {completed_count}/{total_assigned} ({percent:.1f}%)" elif total_assigned == 0 and completed_count == 0: # Handles case where user has 0 assigned items initially return "Progress: No items assigned yet." else: # Should ideally not happen if logic is correct (e.g. completed > total_assigned) return f"Annotation Progress: {completed_count}/{total_assigned} labeled" # except Exception as e: # log.error(f"Error fetching progress for user {user_id}: {e}") # sentry_sdk.capture_exception(e) # raise def download_voice_fn(filename_to_load, autoplay_on_load=True): # Autoplay here is for the btn_load_voice click if not filename_to_load: return None, None, gr.update(value=None, autoplay=False) # try: log.info(f"Downloading voice: {filename_to_load}, Autoplay: {autoplay_on_load}") sr, wav = LOADER.load_audio(filename_to_load) return (sr, wav), (sr, wav.copy()), gr.update(value=(sr, wav), autoplay=autoplay_on_load) # except Exception as e: # log.error(f"GDrive download failed for {filename_to_load}: {e}") # sentry_sdk.capture_exception(e) # raise def save_annotation_db_fn(current_tts_id, session, ann_text_to_save, applied_trims_list): annotator_id = session.get("user_id") if not current_tts_id or not annotator_id: gr.Error("Cannot save: Missing TTS ID or User ID.") return # Modified: No return value with get_db() as db: # try: annotation_obj = db.query(Annotation).filter_by( tts_data_id=current_tts_id, annotator_id=annotator_id ).options(orm.joinedload(Annotation.audio_trims)).first() if not annotation_obj: annotation_obj = Annotation( tts_data_id=current_tts_id, annotator_id=annotator_id ) db.add(annotation_obj) annotation_obj.annotated_sentence = ann_text_to_save annotation_obj.annotated_at = datetime.datetime.utcnow() # --- Multi-trim handling --- # 1. Delete existing trims for this annotation if annotation_obj.audio_trims: for old_trim in annotation_obj.audio_trims: db.delete(old_trim) annotation_obj.audio_trims = [] # Clear the collection # db.flush() # Ensure deletes are processed before adds if issues arise # 2. Add new trims from applied_trims_list if applied_trims_list: if annotation_obj.id is None: # If new annotation, flush to get ID db.flush() if annotation_obj.id is None: gr.Error("Failed to get annotation ID for saving new trims.") db.rollback(); return # Modified: No return value for trim_info in applied_trims_list: start_to_save_ms = trim_info['start_sec'] * 1000.0 end_to_save_ms = trim_info['end_sec'] * 1000.0 original_data_id_for_trim = current_tts_id new_trim_db_obj = AudioTrim( annotation_id=annotation_obj.id, original_tts_data_id=original_data_id_for_trim, start=start_to_save_ms, end=end_to_save_ms, ) db.add(new_trim_db_obj) # No need to append to annotation_obj.audio_trims if cascade is working correctly # but can be done explicitly: annotation_obj.audio_trims.append(new_trim_db_obj) log.info(f"Saved {len(applied_trims_list)} trims for annotation {annotation_obj.id} (TTS ID: {current_tts_id}).") else: log.info(f"No trims applied for {current_tts_id}, any existing DB trims were cleared.") db.commit() gr.Info(f"Annotation for ID {current_tts_id} saved.") # Removed 'return True' # except Exception as e: # db.rollback() # log.error(f"Failed to save annotation for {current_tts_id}: {e}") # Removed exc_info=True # sentry_sdk.capture_exception(e) # raise def show_current_item_fn(items, idx, session): initial_trims_list_sec = [] initial_trims_df_data = self._convert_trims_to_df_data([]) # Empty by default ui_trim_start_sec = None # Changed from 0.0 to None ui_trim_end_sec = None # Changed from 0.0 to None if not items or idx >= len(items) or idx < 0: return ("", "", "", "", None, ui_trim_start_sec, ui_trim_end_sec, initial_trims_list_sec, initial_trims_df_data, gr.update(value=None, autoplay=False)) current_item = items[idx] tts_data_id = current_item.get("id") annotator_id = session.get("user_id") ann_text = "" if tts_data_id and annotator_id: with get_db() as db: # try: existing_annotation = db.query(Annotation).filter_by( tts_data_id=tts_data_id, annotator_id=annotator_id ).options(orm.joinedload(Annotation.audio_trims)).first() # Changed to audio_trims if existing_annotation: ann_text = existing_annotation.annotated_sentence or "" if existing_annotation.audio_trims: # Check the collection initial_trims_list_sec = [ { 'start_sec': trim.start / 1000.0, 'end_sec': trim.end / 1000.0 } for trim in existing_annotation.audio_trims # Iterate over the collection ] initial_trims_df_data = self._convert_trims_to_df_data(initial_trims_list_sec) # except Exception as e: # log.error(f"DB error in show_current_item_fn for TTS ID {tts_data_id}: {e}") # Removed exc_info=True # sentry_sdk.capture_exception(e) # raise return ( current_item.get("id", ""), current_item.get("filename", ""), current_item.get("sentence", ""), ann_text, None, ui_trim_start_sec, ui_trim_end_sec, initial_trims_list_sec, initial_trims_df_data, gr.update(value=None, autoplay=False) # Ensure audio does not autoplay on item change ) def navigate_idx_fn(items, current_idx, direction): if not items: return 0 new_idx = min(current_idx + 1, len(items) - 1) if direction == "next" else max(current_idx - 1, 0) return new_idx def load_all_items_fn(sess): user_id = sess.get("user_id") # Use user_id for consistency with other functions user_name = sess.get("username") # Changed from "user_name" to "username" to match session key items_to_load = [] initial_idx = 0 # Default to 0 if not user_id: # No user logged in - return empty state without warning # This is normal on initial app load before login log.info("load_all_items_fn: user_id not found in session. Dashboard will display default state until login completes and data is refreshed.") empty_item_display_tuple = ("", "", "", "", None, None, None, [], self._convert_trims_to_df_data([]), gr.update(value=None, autoplay=False)) return [[], 0] + list(empty_item_display_tuple) + [""] if user_id: with get_db() as db: # try: repo = AnnotatorWorkloadRepo(db) raw_items = repo.get_tts_data_with_annotations_for_user_id(user_id, annotator_name_for_log=user_name) items_to_load = [ { "id": item["tts_data"].id, "filename": item["tts_data"].filename, "sentence": item["tts_data"].sentence, "annotated": item["annotation"] is not None and (item["annotation"].annotated_sentence is not None and item["annotation"].annotated_sentence != "") } for item in raw_items ] log.info(f"Loaded {len(items_to_load)} items for user {user_name} (ID: {user_id})") # --- Resume Logic: Find first unannotated or last item --- if items_to_load: first_unannotated_idx = -1 for i, item_data in enumerate(items_to_load): if not item_data["annotated"]: first_unannotated_idx = i break if first_unannotated_idx != -1: initial_idx = first_unannotated_idx log.info(f"Resuming at first unannotated item, index: {initial_idx} (ID: {items_to_load[initial_idx]['id']})") else: # All items are annotated initial_idx = len(items_to_load) - 1 log.info(f"All items annotated, starting at last item, index: {initial_idx} (ID: {items_to_load[initial_idx]['id']})") else: # No items assigned initial_idx = 0 log.info("No items assigned to user, starting at index 0.") # except Exception as e: # log.error(f"Failed to load items or determine resume index for user {user_name}: {e}") # sentry_sdk.capture_exception(e) # raise initial_ui_values_tuple = show_current_item_fn(items_to_load, initial_idx, sess) progress_str = get_user_progress_fn(sess) return [items_to_load, initial_idx] + list(initial_ui_values_tuple) + [progress_str] def jump_by_data_id_fn(items, target_data_id_str, current_idx): if not target_data_id_str: return current_idx # try: target_id = int(target_data_id_str) for i, item_dict in enumerate(items): if item_dict.get("id") == target_id: return i gr.Warning(f"Data ID {target_id} not found in your assigned items.") # except ValueError: # sentry_sdk.capture_exception() # gr.Warning(f"Invalid Data ID format: {target_data_id_str}") return current_idx def delete_db_and_ui_fn(items, current_idx, session, original_audio_data_state): new_ann_sentence = "" new_trim_start_sec_ui = None # Changed from 0.0 new_trim_end_sec_ui = None # Changed from 0.0 new_applied_trims_list = [] new_trims_df_data = self._convert_trims_to_df_data([]) audio_to_display_after_delete = None audio_update_obj_after_delete = gr.update(value=None, autoplay=False) if original_audio_data_state: audio_to_display_after_delete = original_audio_data_state audio_update_obj_after_delete = gr.update(value=original_audio_data_state, autoplay=False) if not items or current_idx >= len(items) or current_idx < 0: progress_str_err = get_user_progress_fn(session) return (items, current_idx, "", "", "", new_ann_sentence, audio_to_display_after_delete, new_trim_start_sec_ui, new_trim_end_sec_ui, new_applied_trims_list, new_trims_df_data, audio_update_obj_after_delete, progress_str_err) current_item = items[current_idx] tts_id_val = current_item.get("id", "") filename_val = current_item.get("filename", "") sentence_val = current_item.get("sentence", "") tts_data_id_to_clear = tts_id_val annotator_id_for_clear = session.get("user_id") if tts_data_id_to_clear and annotator_id_for_clear: with get_db() as db: # try: annotation_obj = db.query(Annotation).filter_by( tts_data_id=tts_data_id_to_clear, annotator_id=annotator_id_for_clear ).options(orm.joinedload(Annotation.audio_trims)).first() # Ensure audio_trims are loaded if annotation_obj: db.delete(annotation_obj) db.commit() gr.Info(f"Annotation and associated trims for ID {tts_data_id_to_clear} deleted from DB.") else: gr.Warning(f"No DB annotation found to delete for ID {tts_data_id_to_clear}.") # except Exception as e: # db.rollback() # log.error(f"Error deleting annotation from DB for {tts_data_id_to_clear}: {e}") # Removed exc_info=True # sentry_sdk.capture_exception(e) # raise else: gr.Error("Cannot clear/delete annotation from DB: Missing TTS ID or User ID.") progress_str = get_user_progress_fn(session) return (items, current_idx, tts_id_val, filename_val, sentence_val, new_ann_sentence, audio_to_display_after_delete, new_trim_start_sec_ui, new_trim_end_sec_ui, new_applied_trims_list, new_trims_df_data, audio_update_obj_after_delete, progress_str) # ---- New Trim Callbacks ---- def add_trim_and_reprocess_ui_fn(start_s, end_s, current_trims_list, original_audio_data): if start_s is None or end_s is None or not (end_s > start_s and start_s >= 0): gr.Warning("Invalid trim times. Start must be >= 0 and End > Start.") # Return current states without change if trim is invalid, also return original start/end for UI return (current_trims_list, self._convert_trims_to_df_data(current_trims_list), original_audio_data, gr.update(value=original_audio_data, autoplay=False), start_s, end_s) new_trim = {'start_sec': float(start_s), 'end_sec': float(end_s)} updated_trims_list = current_trims_list + [new_trim] processed_audio_data, audio_update = self._apply_multiple_trims_fn(original_audio_data, updated_trims_list) # Reset input fields after adding trim ui_trim_start_sec_reset = None # Changed from 0.0 ui_trim_end_sec_reset = None # Changed from 0.0 return (updated_trims_list, self._convert_trims_to_df_data(updated_trims_list), processed_audio_data, audio_update, ui_trim_start_sec_reset, ui_trim_end_sec_reset) def undo_last_trim_and_reprocess_ui_fn(current_trims_list, original_audio_data): if not current_trims_list: gr.Info("No trims to undo.") return (current_trims_list, self._convert_trims_to_df_data(current_trims_list), original_audio_data, gr.update(value=original_audio_data, autoplay=False)) updated_trims_list = current_trims_list[:-1] processed_audio_data, audio_update = self._apply_multiple_trims_fn(original_audio_data, updated_trims_list) return (updated_trims_list, self._convert_trims_to_df_data(updated_trims_list), processed_audio_data, audio_update) # ---- Callback Wiring ---- # outputs_for_display_item: Defines what `show_current_item_fn` and similar full display updates will populate. # It expects 10 values from show_current_item_fn: # (tts_id, filename, sentence, ann_text, audio_placeholder, # trim_start_sec_ui, trim_end_sec_ui, # applied_trims_list_state_val, trims_display_val, audio_update_obj) outputs_for_display_item = [ self.tts_id, self.filename, self.sentence, self.ann_sentence, self.audio, # This will receive the audio data (sr, wav) or None self.trim_start_sec, self.trim_end_sec, # UI fields for new trim self.applied_trims_list_state, self.trims_display, self.audio # This will receive the gr.update object for autoplay etc. ] # Initial Load # Chain: Disable UI -> Load Data (items, idx, initial UI values including trims list & df, progress) -> # Update UI -> Enable UI # Audio is NOT loaded here anymore. root_blocks.load( fn=lambda: update_ui_interactive_state(False), outputs=self.interactive_ui_elements ).then( fn=load_all_items_fn, inputs=[session_state], # Outputs: items_state, idx_state, tts_id, filename, sentence, ann_sentence, # audio (None), trim_start_sec, trim_end_sec, applied_trims_list_state, # trims_display, audio (update obj), progress_display outputs=[self.items_state, self.idx_state] + outputs_for_display_item + [self.header.progress_display], ).then( # Explicitly set original_audio_state to None and clear audio display as it's not loaded. # show_current_item_fn already sets self.audio to (None, gr.update(value=None, autoplay=False)) # We also need to ensure original_audio_state is None if no audio is loaded. lambda: (None, gr.update(value=None), gr.update(value=None)), # original_audio_state, audio data, audio component outputs=[self.original_audio_state, self.audio, self.audio] ).then( fn=lambda: update_ui_interactive_state(True), outputs=self.interactive_ui_elements ) # Navigation (Prev/Save & Next/Next No Save) # Audio will now be loaded automatically by these buttons. for btn_widget, direction_str, performs_save in [ (self.btn_prev, "prev", False), (self.btn_save_next, "next", True), (self.btn_next_no_save, "next", False) ]: event_chain = btn_widget.click( fn=lambda: update_ui_interactive_state(False), outputs=self.interactive_ui_elements ) if performs_save: event_chain = event_chain.then( fn=save_annotation_db_fn, inputs=[self.tts_id, session_state, self.ann_sentence, self.applied_trims_list_state], outputs=[] # save_annotation_db_fn does not return UI updates ).then( # updating progress after save fn=get_user_progress_fn, inputs=[session_state], outputs=self.header.progress_display ) event_chain = event_chain.then( fn=navigate_idx_fn, inputs=[self.items_state, self.idx_state, gr.State(direction_str)], outputs=self.idx_state, ).then( fn=show_current_item_fn, inputs=[self.items_state, self.idx_state, session_state], outputs=outputs_for_display_item, ).then( # Auto-load audio fn=download_voice_fn, inputs=[self.filename, gr.State(True)], # Autoplay TRUE outputs=[self.audio, self.original_audio_state, self.audio], ).then( fn=self._apply_multiple_trims_fn, inputs=[self.original_audio_state, self.applied_trims_list_state], outputs=[self.audio, self.audio] ).then( lambda: gr.update(value=None), # Clear jump input outputs=self.jump_data_id_input ).then( fn=lambda: update_ui_interactive_state(True), outputs=self.interactive_ui_elements ) # Jump Button # Audio will now be loaded automatically by this button. self.btn_jump.click( fn=lambda: update_ui_interactive_state(False), outputs=self.interactive_ui_elements ).then( fn=jump_by_data_id_fn, inputs=[self.items_state, self.jump_data_id_input, self.idx_state], outputs=self.idx_state ).then( fn=show_current_item_fn, inputs=[self.items_state, self.idx_state, session_state], outputs=outputs_for_display_item ).then( # Auto-load audio fn=download_voice_fn, inputs=[self.filename, gr.State(True)], # Autoplay TRUE outputs=[self.audio, self.original_audio_state, self.audio], ).then( fn=self._apply_multiple_trims_fn, inputs=[self.original_audio_state, self.applied_trims_list_state], outputs=[self.audio, self.audio] ).then( lambda: gr.update(value=None), # Clear jump input outputs=self.jump_data_id_input ).then( fn=lambda: update_ui_interactive_state(True), outputs=self.interactive_ui_elements ) # Load Audio Button - This is now the ONLY place audio is downloaded and processed. self.btn_load_voice.click( fn=lambda: update_ui_interactive_state(False), outputs=self.interactive_ui_elements ).then( fn=download_voice_fn, inputs=[self.filename, gr.State(True)], # Autoplay TRUE outputs=[self.audio, self.original_audio_state, self.audio], ).then( fn=self._apply_multiple_trims_fn, inputs=[self.original_audio_state, self.applied_trims_list_state], outputs=[self.audio, self.audio] ).then( fn=lambda: update_ui_interactive_state(True), outputs=self.interactive_ui_elements ) # Copy Sentence Button self.btn_copy_sentence.click( fn=lambda s: s, inputs=self.sentence, outputs=self.ann_sentence ) # Trim Button self.btn_trim.click( fn=add_trim_and_reprocess_ui_fn, inputs=[self.trim_start_sec, self.trim_end_sec, self.applied_trims_list_state, self.original_audio_state], outputs=[self.applied_trims_list_state, self.trims_display, self.audio, self.audio, self.trim_start_sec, self.trim_end_sec] ) # Undo Trim Button self.btn_undo_trim.click( fn=undo_last_trim_and_reprocess_ui_fn, inputs=[self.applied_trims_list_state, self.original_audio_state], outputs=[self.applied_trims_list_state, self.trims_display, self.audio, self.audio] ) # Delete Button outputs_for_delete = [ self.items_state, self.idx_state, self.tts_id, self.filename, self.sentence, self.ann_sentence, self.audio, self.trim_start_sec, self.trim_end_sec, self.applied_trims_list_state, self.trims_display, self.audio, self.header.progress_display ] self.btn_delete.click( fn=lambda: update_ui_interactive_state(False), outputs=self.interactive_ui_elements ).then( fn=delete_db_and_ui_fn, inputs=[self.items_state, self.idx_state, session_state, self.original_audio_state], outputs=outputs_for_delete ).then( fn=lambda: update_ui_interactive_state(True), outputs=self.interactive_ui_elements ) return self.container def _apply_multiple_trims_fn(self, original_audio_data, trims_list_sec): if not original_audio_data: log.warning("apply_multiple_trims_fn: No original audio data.") return None, gr.update(value=None, autoplay=False) # Keep False if no audio sr, wav_orig = original_audio_data current_wav = wav_orig.copy() # Start with a copy of the original waveform if not trims_list_sec: # No trims to apply log.info("apply_multiple_trims_fn: No trims in list, returning original audio with autoplay.") # Autoplay is True here as this function is called after audio load return (sr, current_wav), gr.update(value=(sr, current_wav), autoplay=True) log.info(f"Applying {len(trims_list_sec)} trims sequentially. Initial shape: {current_wav.shape}, Initial duration: {len(current_wav)/sr:.3f}s") for i, trim_info in enumerate(trims_list_sec): start_s = trim_info.get('start_sec') end_s = trim_info.get('end_sec') # Validate trim times for the current audio segment if not (start_s is not None and end_s is not None and end_s > start_s and start_s >= 0): log.warning(f"Trim {i+1}/{len(trims_list_sec)}: Invalid trim definition skipped: {trim_info}") continue if len(current_wav) == 0: log.warning(f"Trim {i+1}/{len(trims_list_sec)}: Audio is already empty, skipping remaining trims.") break # No more audio to trim current_duration_s = len(current_wav) / sr log.info(f"Trim {i+1}: Processing trim {trim_info} on audio of current duration {current_duration_s:.3f}s.") # Convert seconds to sample indices for the current waveform start_sample = int(sr * start_s) end_sample = int(sr * end_s) current_len_samples = len(current_wav) # Clamp sample indices to the bounds of the current waveform # Ensure start_sample is not past the end of the current audio start_sample = max(0, min(start_sample, current_len_samples)) # Ensure end_sample is not past the end, and not before start_sample end_sample = max(start_sample, min(end_sample, current_len_samples)) if start_sample < end_sample: # If there's a segment to remove log.info(f"Trim {i+1}: Applying samples {start_sample}-{end_sample} to current audio (length {current_len_samples} samples). Shape before: {current_wav.shape}") part1 = current_wav[:start_sample] part2 = current_wav[end_sample:] if len(part1) == 0 and len(part2) == 0: # The entire current audio segment was trimmed current_wav = np.array([], dtype=wav_orig.dtype) elif len(part1) == 0: # Trimmed from the beginning of the current segment current_wav = part2 elif len(part2) == 0: # Trimmed to the end of the current segment current_wav = part1 else: # Trimmed from the middle of the current segment current_wav = np.concatenate((part1, part2)) log.info(f"Trim {i+1}: Shape after: {current_wav.shape}, New duration: {len(current_wav)/sr:.3f}s") else: log.info(f"Trim {i+1}: No effective change for trim {trim_info} on current audio (start_sample >= end_sample after clamping or trim times out of bounds for current audio).") log.info(f"Finished sequential trimming. Final shape: {current_wav.shape}, Final duration: {len(current_wav)/sr:.3f}s") # Autoplay is True here as this function is called after audio load and processing return (sr, current_wav), gr.update(value=(sr, current_wav), autoplay=True) def _convert_trims_to_df_data(self, trims_list_sec): if not trims_list_sec: return None # For gr.DataFrame, None clears it return [[f"{t['start_sec']:.3f}", f"{t['end_sec']:.3f}"] for t in trims_list_sec]