Spaces:
Running
Running
# components/review_dashboard_page.py | |
import gradio as gr | |
import datetime | |
import sentry_sdk | |
from sqlalchemy import orm | |
from components.header import Header | |
from utils.logger import Logger | |
from utils.cloud_server_audio_loader import CloudServerAudioLoader | |
from config import conf | |
from utils.database import get_db | |
from data.models import Annotation, TTSData, Annotator, Validation, AnnotationInterval | |
from data.repository.annotator_workload_repo import AnnotatorWorkloadRepo | |
from sqlalchemy import and_ | |
log = Logger() | |
LOADER = CloudServerAudioLoader(conf.FTP_URL) | |
class ReviewDashboardPage: | |
def __init__(self) -> None: | |
with gr.Column(visible=False) as self.container: | |
self.header = Header() | |
self.load_trigger = gr.Number(value=0, visible=False) # Add this hidden trigger | |
# Review info banner | |
with gr.Row(): | |
self.review_info = gr.Markdown("", elem_classes="review-banner") | |
with gr.Row(): | |
# Left Column - Review Content | |
with gr.Column(scale=3): | |
with gr.Row(): | |
self.tts_id = gr.Textbox(label="ID", interactive=False, scale=1) | |
self.filename = gr.Textbox(label="Filename", interactive=False, scale=3) | |
self.sentence = gr.Textbox( | |
label="Original Sentence", interactive=False, max_lines=5, rtl=True | |
) | |
self.ann_sentence = gr.Textbox( | |
label="Annotated Sentence (by Original Annotator)", | |
interactive=False, max_lines=5, rtl=True | |
) | |
with gr.Row(): | |
# self.annotator_name = gr.Textbox(label="Original Annotator", interactive=False, scale=1) # Removed for anonymization | |
self.annotated_at = gr.Textbox(label="Annotated At", interactive=False, scale=2) | |
# Review Actions | |
with gr.Row(): | |
self.btn_approve = gr.Button("β Approve", variant="primary", min_width=120) | |
self.btn_reject = gr.Button("β Reject", variant="stop", min_width=120) | |
self.btn_skip = gr.Button("βοΈ Skip (No Decision)", min_width=150) | |
# Navigation | |
with gr.Row(): | |
self.btn_prev = gr.Button("β¬ οΈ Previous", min_width=120) | |
self.btn_next = gr.Button("Next β‘οΈ", min_width=120) | |
# Jump controls | |
with gr.Row(): | |
self.jump_data_id_input = gr.Number( | |
label="Jump to ID", | |
value=None, | |
precision=0, | |
interactive=True, | |
min_width=120 | |
) | |
self.btn_jump = gr.Button("Go to ID", min_width=70) | |
# Right Column - Audio | |
with gr.Column(scale=2): | |
self.btn_load_voice = gr.Button("Load Audio & Play", min_width=150) | |
self.audio = gr.Audio( | |
label="π Audio", interactive=False, autoplay=True | |
) | |
# Review status display | |
with gr.Group(): | |
gr.Markdown("### Review Status") | |
self.current_validation_status = gr.Textbox( | |
label="Current Status", interactive=False | |
) | |
self.rejection_reason_input = gr.Textbox( | |
label="Rejection Reason", | |
placeholder="Enter reason and press Enter or click away...", | |
interactive=True, | |
visible=False, | |
max_lines=3, | |
elem_id="rejection_reason_input" # Added elem_id for clarity | |
) | |
# State variables | |
self.items_state = gr.State([]) | |
self.idx_state = gr.State(0) | |
self.original_audio_state = gr.State(None) | |
self.rejection_mode_active = gr.State(False) # Track if waiting for rejection reason | |
# List of interactive UI elements for enabling/disabling | |
self.interactive_ui_elements = [ | |
self.btn_prev, self.btn_next, self.btn_approve, self.btn_reject, | |
self.btn_skip, self.btn_jump, self.jump_data_id_input, self.btn_load_voice | |
] | |
def register_callbacks(self, login_page, session_state: gr.State, root_blocks: gr.Blocks): | |
self.header.register_callbacks(login_page, self, session_state) | |
def update_ui_interactive_state(is_interactive: bool): | |
updates = [] | |
for elem in self.interactive_ui_elements: | |
if elem == self.btn_load_voice and not is_interactive: | |
updates.append(gr.update(value="β³ Loading Audio...", interactive=False)) | |
elif elem == self.btn_load_voice and is_interactive: | |
updates.append(gr.update(value="Load Audio & Play", interactive=True)) | |
else: | |
updates.append(gr.update(interactive=is_interactive)) | |
return updates | |
def download_voice_fn(filename_to_load): | |
if not filename_to_load: | |
return None, None, gr.update(value=None, autoplay=False) | |
# try: | |
log.info(f"Downloading voice for review: {filename_to_load}") | |
# Show progress to user | |
# gr.Info(f"Loading audio file: {filename_to_load}") | |
sr, wav = LOADER.load_audio(filename_to_load) | |
log.info(f"Successfully loaded audio: {filename_to_load} (SR: {sr}, Length: {len(wav)} samples)") | |
# gr.Info(f"β Audio loaded successfully!") | |
return (sr, wav), (sr, wav.copy()), gr.update(value=(sr, wav), autoplay=True) | |
# except TimeoutError as e: | |
# log.error(f"Audio download timeout for {filename_to_load}: {e}") | |
# sentry_sdk.capture_exception(e) | |
# raise | |
# except ConnectionError as e: | |
# log.error(f"Audio download connection error for {filename_to_load}: {e}") | |
# sentry_sdk.capture_exception(e) | |
# gr.Error(f"π Connection error loading audio: {filename_to_load}. Please check your internet connection.") | |
# return None, None, gr.update(value=None, autoplay=False) | |
# except FileNotFoundError as e: | |
# log.error(f"Audio file not found for {filename_to_load}: {e}") | |
# sentry_sdk.capture_exception(e) | |
# gr.Error(f"π Audio file not found: {filename_to_load}") | |
# return None, None, gr.update(value=None, autoplay=False) | |
# except Exception as e: | |
# log.error(f"Audio download failed for {filename_to_load}: {e}") | |
# sentry_sdk.capture_exception(e) | |
# gr.Error(f"β Failed to load audio: {filename_to_load}. Error: {e}") | |
# return None, None, gr.update(value=None, autoplay=False) | |
def get_validation_status_for_item(db, annotation_id, user_id, annotation_obj): | |
"""Get validation status for a specific item - called on-demand""" | |
validation = db.query(Validation).filter_by( | |
annotation_id=annotation_id, | |
validator_id=user_id | |
).first() | |
# Check if annotation is deleted | |
is_deleted = not annotation_obj.annotated_sentence or annotation_obj.annotated_sentence.strip() == "" | |
validation_status = "Not Reviewed" | |
if validation: | |
if validation.validated: | |
validation_status = "Approved" | |
else: | |
validation_status = "Rejected" | |
if validation.description: | |
validation_status += f" ({validation.description})" | |
# For deleted annotations, show special status | |
if is_deleted and validation_status == "Not Reviewed": | |
validation_status = "Not Reviewed (Deleted)" | |
return validation_status, is_deleted | |
def get_review_progress_fn(session): | |
"""Calculate review progress for the current reviewer with beautiful tqdm-style display""" | |
user_id = session.get("user_id") | |
username = session.get("username") | |
if not user_id or not username: | |
return "" | |
# Check if user is a reviewer | |
if username not in conf.REVIEW_MAPPING.values(): | |
return "" | |
# Find target annotator | |
target_annotator = None | |
for annotator_name, reviewer_name in conf.REVIEW_MAPPING.items(): | |
if reviewer_name == username: | |
target_annotator = annotator_name | |
break | |
if not target_annotator: | |
return "" | |
with get_db() as db: | |
try: | |
# Get target annotator's ID | |
target_annotator_obj = db.query(Annotator).filter_by(name=target_annotator).first() | |
if not target_annotator_obj: | |
return f"β οΈ **Error:** Annotator '{target_annotator}' not found" | |
# Get the target annotator's assigned intervals | |
assigned_intervals = db.query(AnnotationInterval).filter( | |
AnnotationInterval.annotator_id == target_annotator_obj.id | |
).all() | |
if not assigned_intervals: | |
return f"β οΈ **Error:** No assigned intervals for annotator '{target_annotator}'" | |
# Count total annotations within assigned intervals for target annotator | |
total_count = 0 | |
for interval in assigned_intervals: | |
if interval.start_index is None or interval.end_index is None: | |
continue | |
interval_count = db.query(Annotation).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id >= interval.start_index, | |
TTSData.id <= interval.end_index | |
).count() | |
total_count += interval_count | |
# Count reviewed annotations within assigned intervals (have validation from this reviewer) | |
reviewed_count = 0 | |
for interval in assigned_intervals: | |
if interval.start_index is None or interval.end_index is None: | |
continue | |
interval_reviewed = db.query(Annotation).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).join( | |
Validation, Annotation.id == Validation.annotation_id | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id >= interval.start_index, | |
TTSData.id <= interval.end_index, | |
Validation.validator_id == user_id | |
).count() | |
reviewed_count += interval_reviewed | |
if total_count > 0: | |
percentage = (reviewed_count / total_count) * 100 | |
# Create tqdm-style progress bar | |
bar_width = 30 # Width of the progress bar in characters | |
filled = int((percentage / 100) * bar_width) | |
empty = bar_width - filled | |
# Different colors based on progress | |
if percentage < 25: | |
color = "π΄" # Red for low progress | |
bar_color = "progress-bar-low" | |
elif percentage < 50: | |
color = "π‘" # Yellow for medium-low progress | |
bar_color = "progress-bar-medium-low" | |
elif percentage < 75: | |
color = "π " # Orange for medium progress | |
bar_color = "progress-bar-medium" | |
elif percentage < 100: | |
color = "π’" # Green for high progress | |
bar_color = "progress-bar-high" | |
else: | |
color = "β " # Check mark for complete | |
bar_color = "progress-bar-complete" | |
# Create the visual progress bar with Unicode blocks | |
progress_bar = "β" * filled + "β" * empty | |
# Estimate remaining items | |
remaining = total_count - reviewed_count | |
# Create the beautiful progress display | |
progress_html = f""" | |
<div class="progress-container"> | |
<div class="progress-header"> | |
<span class="progress-icon">{color}</span> | |
<strong>Review Progress</strong> | |
</div> | |
<div class="progress-bar-container"> | |
<span class="progress-percentage">{percentage:.1f}%</span> | |
<div class="progress-bar {bar_color}"> | |
<span class="progress-fill" style="width: {percentage:.1f}%"></span> | |
</div> | |
<span class="progress-stats">{reviewed_count}/{total_count}</span> | |
</div> | |
<div class="progress-details"> | |
π <code>{progress_bar}</code> | |
<span class="remaining-items">({remaining} remaining)</span> | |
</div> | |
</div> | |
""" | |
return progress_html | |
else: | |
return f"π **No items found for {target_annotator}**" | |
except Exception as e: | |
log.error(f"Error calculating review progress for user {user_id}: {e}") | |
return f"β οΈ **Error calculating progress**" | |
def load_review_items_fn(session): | |
user_id = session.get("user_id") | |
username = session.get("username") | |
if not user_id or not username: | |
log.warning("load_review_items_fn: user not found in session") | |
return [], 0, "", "", "", "", "", "", "", "", gr.update(value=None, autoplay=False), gr.update(visible=False, value=""), False, gr.update(value="β Reject") | |
# Check if user is in Phase 2 (should be a reviewer) | |
if username not in conf.REVIEW_MAPPING.values(): | |
log.warning(f"User {username} is not assigned as a reviewer") | |
return [], 0, "", "", "", "", "", "", "", "", gr.update(value=None, autoplay=False), gr.update(visible=False, value=""), False, gr.update(value="β Reject") | |
# Find which annotator this user should review | |
target_annotator = None | |
for annotator_name, reviewer_name in conf.REVIEW_MAPPING.items(): | |
if reviewer_name == username: | |
target_annotator = annotator_name | |
break | |
if not target_annotator: | |
log.warning(f"No target annotator found for reviewer {username}") | |
return [], 0, "", "", "", "", "", "", "", "", gr.update(value=None, autoplay=False), gr.update(visible=False, value=""), False, gr.update(value="β Reject") | |
# Load annotations from target annotator with FAST INITIAL LOADING | |
with get_db() as db: | |
# Get target annotator's ID | |
target_annotator_obj = db.query(Annotator).filter_by(name=target_annotator).first() | |
if not target_annotator_obj: | |
log.error(f"Target annotator {target_annotator} not found in database") | |
return [], 0, f"Review Target Error: Annotator '{target_annotator}' not found.", "", "", "", "", "", "", "", gr.update(value=None, autoplay=False), gr.update(visible=False, value=""), False, gr.update(value="β Reject") | |
log.info(f"Found target annotator with ID: {target_annotator_obj.id}") | |
# FAST INITIAL QUERY: Load only essential data without complex validation processing | |
# Increased batch size for better navigation experience | |
INITIAL_BATCH_SIZE = 10 # Load 10 items initially for better navigation | |
# Get the target annotator's assigned intervals | |
assigned_intervals = db.query(AnnotationInterval).filter( | |
AnnotationInterval.annotator_id == target_annotator_obj.id | |
).all() | |
if not assigned_intervals: | |
log.warning(f"No assigned intervals found for annotator {target_annotator}") | |
return [], 0, f"Review Target Error: No assigned intervals for annotator '{target_annotator}'.", "", "", "", "", "", "", "", gr.update(value=None, autoplay=False), gr.update(visible=False, value=""), False, gr.update(value="β Reject") | |
# Find the first UNREVIEWED annotation within assigned intervals for this reviewer | |
all_reviewed = False | |
first_unreviewed_tts_id = None | |
# Query for the first TTS data ID within assigned intervals that has no validation by this reviewer | |
for interval in assigned_intervals: | |
if interval.start_index is None or interval.end_index is None: | |
continue | |
unreviewed_query = db.query(TTSData.id).join( | |
Annotation, Annotation.tts_data_id == TTSData.id | |
).outerjoin( | |
Validation, | |
(Validation.annotation_id == Annotation.id) & (Validation.validator_id == user_id) | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id >= interval.start_index, | |
TTSData.id <= interval.end_index, | |
Validation.id.is_(None) # No validation by this reviewer (fixed SQLAlchemy syntax) | |
).order_by(TTSData.id.asc()).first() | |
if unreviewed_query: | |
first_unreviewed_tts_id = unreviewed_query[0] | |
break | |
if first_unreviewed_tts_id is None: | |
# Everything reviewed: flag and we will load the last batch from the last interval | |
all_reviewed = True | |
# Count total annotations within assigned intervals for progress info | |
total_count = 0 | |
for interval in assigned_intervals: | |
if interval.start_index is None or interval.end_index is None: | |
continue | |
interval_count = db.query(Annotation).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id >= interval.start_index, | |
TTSData.id <= interval.end_index | |
).count() | |
total_count += interval_count | |
# Query to get annotations with a window around the first unreviewed item | |
if not all_reviewed and first_unreviewed_tts_id: | |
# Load a larger window around the first unreviewed TTS ID for better navigation | |
WINDOW_BEFORE = 5 # Load 5 items before the first unreviewed | |
WINDOW_AFTER = INITIAL_BATCH_SIZE - WINDOW_BEFORE - 1 # Rest after | |
# Get a range starting before the first unreviewed item | |
window_start_id = max(1, first_unreviewed_tts_id - WINDOW_BEFORE) | |
initial_query = db.query( | |
Annotation, | |
TTSData.filename, | |
TTSData.sentence | |
).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).join( | |
AnnotationInterval, | |
and_( | |
AnnotationInterval.annotator_id == target_annotator_obj.id, | |
TTSData.id >= AnnotationInterval.start_index, | |
TTSData.id <= AnnotationInterval.end_index | |
) | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id >= window_start_id | |
).order_by(TTSData.id).limit(INITIAL_BATCH_SIZE) | |
initial_results = initial_query.all() | |
else: | |
# Everything reviewed or no unreviewed items: load the last batch from assigned intervals | |
all_reviewed = True | |
if assigned_intervals and total_count > 0: | |
# Find the last interval and load the last batch from there | |
last_interval = max(assigned_intervals, key=lambda x: x.end_index or 0) | |
if last_interval.start_index is not None and last_interval.end_index is not None: | |
initial_query = db.query( | |
Annotation, | |
TTSData.filename, | |
TTSData.sentence | |
).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id >= last_interval.start_index, | |
TTSData.id <= last_interval.end_index | |
).order_by(TTSData.id.desc()).limit(INITIAL_BATCH_SIZE) | |
initial_results = initial_query.all() | |
initial_results.reverse() # Restore ascending order | |
else: | |
initial_results = [] | |
else: | |
initial_results = [] | |
log.info(f"Fast initial load: {len(initial_results)} annotations out of {total_count} total for target annotator ID {target_annotator_obj.id}") | |
# Process items with minimal data - validation status will be loaded on-demand | |
items = [] | |
for annotation, filename, sentence in initial_results: | |
# Check if annotation is deleted (minimal processing) | |
is_deleted = not annotation.annotated_sentence or annotation.annotated_sentence.strip() == "" | |
annotated_sentence_display = "[DELETED ANNOTATION]" if is_deleted else annotation.annotated_sentence | |
items.append({ | |
"annotation_id": annotation.id, | |
"tts_id": annotation.tts_data_id, | |
"filename": filename, | |
"sentence": sentence, | |
"annotated_sentence": annotated_sentence_display, | |
"is_deleted": is_deleted, | |
"annotated_at": annotation.annotated_at.isoformat() if annotation.annotated_at else "", | |
"validation_status": "Loading...", # Will be loaded on-demand | |
"validation_loaded": False # Track if validation status has been loaded | |
}) | |
# Determine initial index inside the loaded batch | |
initial_idx = 0 | |
if items and not all_reviewed and first_unreviewed_tts_id: | |
# Find the first unreviewed item within the loaded batch | |
for i, item in enumerate(items): | |
if item["tts_id"] == first_unreviewed_tts_id: | |
initial_idx = i | |
break | |
elif items and all_reviewed: | |
# All reviewed: start at last item in the batch for browsing | |
initial_idx = len(items) - 1 | |
# Set initial display | |
if items: | |
initial_item = items[initial_idx] | |
review_info_text = f"π **Phase 2 Review Mode** - Reviewing assigned annotations. Loaded {len(items)} of {total_count} total items." | |
# Ensure correct order of return values for 12 outputs | |
# items, idx, review_info, tts_id, filename, sentence, ann_sentence, annotated_at, validation_status, annotator_placeholder, audio_update, rejection_reason_update | |
rejection_reason_val = "" | |
rejection_visible_val = False | |
if initial_item["validation_status"].startswith("Rejected"): | |
start_paren = initial_item["validation_status"].find("(") | |
end_paren = initial_item["validation_status"].find(")") | |
if start_paren != -1 and end_paren != -1: | |
rejection_reason_val = initial_item["validation_status"][start_paren+1:end_paren] | |
rejection_visible_val = True | |
return ( | |
items, | |
initial_idx, | |
review_info_text, | |
str(initial_item["tts_id"]), | |
initial_item["filename"], | |
initial_item["sentence"], | |
initial_item["annotated_sentence"], | |
initial_item["annotated_at"], | |
initial_item["validation_status"], | |
"", # Placeholder for the original annotator name (maps to header.welcome) | |
gr.update(value=None, autoplay=False), # audio_update | |
gr.update(visible=rejection_visible_val, value=rejection_reason_val), # rejection_reason_input update | |
False, # Reset rejection mode | |
gr.update(value="β Reject") # Reset reject button | |
) | |
else: | |
# Ensure correct order and number of return values for empty items (14 outputs) | |
return [], 0, f"π **Phase 2 Review Mode** - No annotations found for review.", "", "", "", "", "", "", "", gr.update(value=None, autoplay=False), gr.update(visible=False, value=""), False, gr.update(value="β Reject") | |
# except Exception as e: | |
# log.error(f"Error loading review items: {e}") | |
# sentry_sdk.capture_exception(e) | |
# gr.Error(f"Failed to load review data: {e}") | |
# # Ensure correct order and number of return values for error case (14 outputs) | |
# return [], 0, "", "", "", "", "", "", "", "", gr.update(value=None, autoplay=False), gr.update(visible=False, value=""), False, gr.update(value="β Reject") | |
def show_current_review_item_fn(items, idx, session): | |
if not items or idx >= len(items) or idx < 0: | |
# tts_id, filename, sentence, ann_sentence, annotated_at, validation_status, annotator_name_placeholder, audio_update, rejection_reason_update, rejection_mode_reset, btn_reject_update | |
return "", "", "", "", "", "", "", gr.update(value=None, autoplay=False), gr.update(visible=False, value=""), False, gr.update(value="β Reject") | |
current_item = items[idx] | |
# Load validation status on-demand if not already loaded | |
if not current_item.get("validation_loaded", False): | |
user_id = session.get("user_id") | |
if user_id: | |
with get_db() as db: | |
try: | |
# Get the full annotation object for validation processing | |
annotation_obj = db.query(Annotation).filter_by(id=current_item["annotation_id"]).first() | |
if annotation_obj: | |
validation_status, is_deleted = get_validation_status_for_item(db, current_item["annotation_id"], user_id, annotation_obj) | |
current_item["validation_status"] = validation_status | |
current_item["is_deleted"] = is_deleted | |
current_item["validation_loaded"] = True | |
# Update displayed annotation if deleted | |
if is_deleted: | |
current_item["annotated_sentence"] = "[DELETED ANNOTATION]" | |
log.info(f"Loaded validation status for item {idx}: {validation_status}") | |
except Exception as e: | |
log.error(f"Error loading validation status for item {idx}: {e}") | |
current_item["validation_status"] = "Error loading status" | |
rejection_reason = "" | |
rejection_visible = False | |
# Check if this is a deleted annotation | |
is_deleted = current_item.get("is_deleted", False) | |
if current_item["validation_status"].startswith("Rejected"): | |
# Extract reason from status like "Rejected (reason)" or just use empty if no parenthesis | |
start_paren = current_item["validation_status"].find("(") | |
end_paren = current_item["validation_status"].find(")") | |
if start_paren != -1 and end_paren != -1: | |
rejection_reason = current_item["validation_status"][start_paren+1:end_paren] | |
rejection_visible = True | |
return ( | |
str(current_item["tts_id"]), | |
current_item["filename"], | |
current_item["sentence"], | |
current_item["annotated_sentence"], | |
current_item["annotated_at"], | |
current_item["validation_status"], | |
"", # Placeholder for annotator_name | |
gr.update(value=None, autoplay=False), | |
gr.update(visible=rejection_visible, value=rejection_reason), | |
False, # Reset rejection mode | |
gr.update(value="β Reject") # Reset reject button text | |
) | |
def update_review_info_fn(items, total_count): | |
"""Update the review info banner with current loaded items count""" | |
if items: | |
return f"π **Phase 2 Review Mode** - Reviewing assigned annotations. Loaded {len(items)} of {total_count} total items." | |
else: | |
return f"π **Phase 2 Review Mode** - No annotations found for review." | |
def navigate_and_load_fn(items, current_idx, direction, session): | |
"""Combined navigation and loading function with dynamic loading in both directions""" | |
if not items: | |
return items, 0, "" | |
# Navigate | |
if direction == "next": | |
new_idx = min(current_idx + 1, len(items) - 1) | |
# Only load more items when user reaches the LAST item of a batch | |
should_load_more = (new_idx == len(items) - 1 and len(items) % 5 == 0) | |
if should_load_more: | |
log.info(f"User reached end of loaded items ({new_idx}/{len(items)}), will load more items") | |
# Load more items | |
updated_items, total_count = load_more_items_fn(items, session, current_batch_size=10) | |
# Update review info with new count | |
review_info = update_review_info_fn(updated_items, total_count) | |
return updated_items, new_idx, review_info | |
else: | |
return items, new_idx, "" # No review info update needed | |
else: # prev | |
new_idx = max(current_idx - 1, 0) | |
# Load more items when user reaches the FIRST item of the batch | |
should_load_previous = (new_idx == 0 and current_idx == 0) | |
if should_load_previous: | |
log.info(f"User reached beginning of loaded items, will load previous items") | |
# Load previous items | |
updated_items, total_count, loaded_count = load_previous_items_fn(items, session, current_batch_size=5) | |
# Adjust index to account for new items loaded at the beginning | |
adjusted_idx = new_idx + loaded_count | |
# Update review info with new count | |
review_info = update_review_info_fn(updated_items, total_count) | |
return updated_items, adjusted_idx, review_info | |
else: | |
return items, new_idx, "" # No review info update needed | |
def save_validation_fn(items, idx, session, approved: bool, rejection_reason: str = ""): | |
if not items or idx >= len(items): | |
gr.Error("Invalid item index") | |
return items, "Error: Invalid item index", gr.update(visible=False) | |
user_id = session.get("user_id") | |
if not user_id: | |
gr.Error("User not logged in") | |
return items, "Error: User not logged in", gr.update(visible=False) | |
current_item = items[idx] | |
annotation_id = current_item["annotation_id"] | |
log.info(f"Saving validation for annotation_id: {annotation_id}, validator_id: {user_id}, approved: {approved}, reason: {rejection_reason}") | |
with get_db() as db: | |
# try: | |
existing_validation = db.query(Validation).filter_by( | |
annotation_id=annotation_id, | |
validator_id=user_id | |
).first() | |
if existing_validation: | |
log.info(f"Updating existing validation for annotation_id: {annotation_id}") | |
existing_validation.validated = approved | |
existing_validation.description = rejection_reason if not approved else None | |
existing_validation.validated_at = datetime.datetime.utcnow() | |
else: | |
log.info(f"Creating new validation for annotation_id: {annotation_id}") | |
new_validation = Validation( | |
annotation_id=annotation_id, | |
validator_id=user_id, | |
validated=approved, | |
description=rejection_reason if not approved else None, | |
validated_at=datetime.datetime.utcnow(), | |
) | |
db.add(new_validation) | |
db.commit() | |
log.info(f"Validation saved successfully for annotation_id: {annotation_id}") | |
items[idx]["validation_status"] = "Approved" if approved else f"Rejected ({rejection_reason})" if rejection_reason else "Rejected" | |
# Show rejection reason input only if rejected, otherwise hide and clear | |
rejection_input_update = gr.update(visible=not approved, value="" if approved else rejection_reason) | |
return items, items[idx]["validation_status"], rejection_input_update | |
# except Exception as e: | |
# db.rollback() | |
# log.error(f"Error saving validation: {e}") | |
# sentry_sdk.capture_exception(e) | |
# gr.Error(f"Failed to save validation: {e}") | |
# return items, current_item["validation_status"], gr.update(visible=False) # Return original status and hide input on error | |
def handle_rejection_fn(items, idx, session, rejection_reason, rejection_mode_active): | |
"""Handle rejection button click - two-step process""" | |
if not items or idx >= len(items): | |
return items, "Error: Invalid item", gr.update(visible=False), False, gr.update(value="β Reject") | |
current_item = items[idx] | |
current_status = current_item["validation_status"] | |
if not rejection_mode_active: | |
# First click - show rejection reason input and change button text | |
return ( | |
items, # items unchanged | |
current_status, # Keep current validation status | |
gr.update(visible=True, value=""), # Show rejection reason input, clear any existing value | |
True, # Set rejection mode active | |
gr.update(value="β οΈ Confirm Reject") # Change button text | |
) | |
else: | |
# Second click - validate reason and save if provided | |
if not rejection_reason or rejection_reason.strip() == "": | |
gr.Warning("Rejection reason cannot be empty. Please provide a reason before confirming rejection.") | |
return ( | |
items, # items unchanged | |
current_status, # Keep current validation status | |
gr.update(visible=True, value=rejection_reason), # Keep input visible | |
True, # Keep rejection mode active | |
gr.update(value="β οΈ Confirm Reject") # Keep button text | |
) | |
else: | |
# Save the rejection with reason | |
updated_items, validation_status, rejection_input_update = save_validation_fn( | |
items, idx, session, approved=False, rejection_reason=rejection_reason.strip() | |
) | |
return ( | |
updated_items, | |
validation_status, | |
gr.update(visible=False, value=""), # Hide rejection input after successful save | |
False, # Reset rejection mode | |
gr.update(value="β Reject") # Reset button text | |
) | |
def jump_by_data_id_fn(items, target_data_id, current_idx, session): | |
"""Jump to a specific TTS ID by querying the database and loading a new batch around it""" | |
if not target_data_id: | |
return items, current_idx, "" | |
user_id = session.get("user_id") | |
username = session.get("username") | |
if not user_id or not username: | |
gr.Warning("User session not found") | |
return items, current_idx, "" | |
# Find target annotator | |
target_annotator = None | |
for annotator_name, reviewer_name in conf.REVIEW_MAPPING.items(): | |
if reviewer_name == username: | |
target_annotator = annotator_name | |
break | |
if not target_annotator: | |
gr.Warning("Target annotator not found for user") | |
return items, current_idx, "" | |
try: | |
target_id = int(target_data_id) | |
with get_db() as db: | |
target_annotator_obj = db.query(Annotator).filter_by(name=target_annotator).first() | |
if not target_annotator_obj: | |
gr.Warning("Target annotator not found in database") | |
return items, current_idx, "" | |
# Get the target annotator's assigned intervals | |
assigned_intervals = db.query(AnnotationInterval).filter( | |
AnnotationInterval.annotator_id == target_annotator_obj.id | |
).all() | |
if not assigned_intervals: | |
gr.Warning("No assigned intervals found") | |
return items, current_idx, "" | |
# Check if the target TTS ID exists within the assigned intervals | |
target_annotation = None | |
for interval in assigned_intervals: | |
if interval.start_index is None or interval.end_index is None: | |
continue | |
target_annotation = db.query(Annotation).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id == target_id, | |
TTSData.id >= interval.start_index, | |
TTSData.id <= interval.end_index | |
).first() | |
if target_annotation: | |
break | |
if not target_annotation: | |
gr.Warning(f"Data ID {target_id} not found in assigned review range") | |
return items, current_idx, "" | |
# Load a batch around the target ID | |
BATCH_SIZE = 10 | |
WINDOW_BEFORE = BATCH_SIZE // 2 | |
window_start_id = max(1, target_id - WINDOW_BEFORE) | |
# Query for annotations in the window | |
new_query = db.query( | |
Annotation, | |
TTSData.filename, | |
TTSData.sentence | |
).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).join( | |
AnnotationInterval, | |
and_( | |
AnnotationInterval.annotator_id == target_annotator_obj.id, | |
TTSData.id >= AnnotationInterval.start_index, | |
TTSData.id <= AnnotationInterval.end_index | |
) | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id >= window_start_id | |
).order_by(TTSData.id).limit(BATCH_SIZE) | |
results = new_query.all() | |
# Process new items | |
new_items = [] | |
target_idx = 0 | |
for i, (annotation, filename, sentence) in enumerate(results): | |
# Check if annotation is deleted | |
is_deleted = not annotation.annotated_sentence or annotation.annotated_sentence.strip() == "" | |
annotated_sentence_display = "[DELETED ANNOTATION]" if is_deleted else annotation.annotated_sentence | |
new_items.append({ | |
"annotation_id": annotation.id, | |
"tts_id": annotation.tts_data_id, | |
"filename": filename, | |
"sentence": sentence, | |
"annotated_sentence": annotated_sentence_display, | |
"is_deleted": is_deleted, | |
"annotated_at": annotation.annotated_at.isoformat() if annotation.annotated_at else "", | |
"validation_status": "Loading...", | |
"validation_loaded": False | |
}) | |
# Find the target index within the new batch | |
if annotation.tts_data_id == target_id: | |
target_idx = i | |
if new_items: | |
# Count total for review info | |
total_count = 0 | |
for interval in assigned_intervals: | |
if interval.start_index is None or interval.end_index is None: | |
continue | |
interval_count = db.query(Annotation).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id >= interval.start_index, | |
TTSData.id <= interval.end_index | |
).count() | |
total_count += interval_count | |
review_info = f"π **Phase 2 Review Mode** - Jumped to ID {target_id}. Loaded {len(new_items)} of {total_count} total items." | |
log.info(f"Successfully jumped to TTS ID {target_id}, loaded {len(new_items)} items, target at index {target_idx}") | |
return new_items, target_idx, review_info | |
else: | |
gr.Warning(f"No items loaded around ID {target_id}") | |
return items, current_idx, "" | |
except ValueError: | |
gr.Warning(f"Invalid Data ID format: {target_data_id}") | |
return items, current_idx, "" | |
except Exception as e: | |
log.error(f"Error jumping to ID {target_data_id}: {e}") | |
gr.Warning(f"Error jumping to ID {target_data_id}") | |
return items, current_idx, "" | |
def load_more_items_fn(items, session, current_batch_size=10): | |
"""Load more items when user needs them (pagination support)""" | |
user_id = session.get("user_id") | |
username = session.get("username") | |
if not user_id or not username: | |
return items, 0 # Return existing items if no user session | |
# Find target annotator | |
target_annotator = None | |
for annotator_name, reviewer_name in conf.REVIEW_MAPPING.items(): | |
if reviewer_name == username: | |
target_annotator = annotator_name | |
break | |
if not target_annotator: | |
return items, 0 | |
with get_db() as db: | |
target_annotator_obj = db.query(Annotator).filter_by(name=target_annotator).first() | |
if not target_annotator_obj: | |
return items, 0 | |
# Get the target annotator's assigned intervals | |
assigned_intervals = db.query(AnnotationInterval).filter( | |
AnnotationInterval.annotator_id == target_annotator_obj.id | |
).all() | |
if not assigned_intervals: | |
return items, 0 | |
# Count total annotations within assigned intervals for progress info | |
total_count = 0 | |
for interval in assigned_intervals: | |
if interval.start_index is None or interval.end_index is None: | |
continue | |
interval_count = db.query(Annotation).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id >= interval.start_index, | |
TTSData.id <= interval.end_index | |
).count() | |
total_count += interval_count | |
# Determine the next window based on the last loaded annotation id | |
last_loaded_id = items[-1]["annotation_id"] if items else 0 | |
# FAST LOADING: Use id-based pagination within assigned intervals to continue from current position | |
query = db.query( | |
Annotation, | |
TTSData.filename, | |
TTSData.sentence | |
).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).join( | |
AnnotationInterval, | |
and_( | |
AnnotationInterval.annotator_id == target_annotator_obj.id, | |
TTSData.id >= AnnotationInterval.start_index, | |
TTSData.id <= AnnotationInterval.end_index | |
) | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
Annotation.id > last_loaded_id | |
).order_by(Annotation.id).limit(current_batch_size) | |
results = query.all() | |
# Process new items with minimal data - validation status loaded on-demand | |
new_items = [] | |
for annotation, filename, sentence in results: | |
# Check if annotation is deleted (minimal processing) | |
is_deleted = not annotation.annotated_sentence or annotation.annotated_sentence.strip() == "" | |
annotated_sentence_display = "[DELETED ANNOTATION]" if is_deleted else annotation.annotated_sentence | |
new_items.append({ | |
"annotation_id": annotation.id, | |
"tts_id": annotation.tts_data_id, | |
"filename": filename, | |
"sentence": sentence, | |
"annotated_sentence": annotated_sentence_display, | |
"is_deleted": is_deleted, | |
"annotated_at": annotation.annotated_at.isoformat() if annotation.annotated_at else "", | |
"validation_status": "Loading...", # Will be loaded on-demand | |
"validation_loaded": False # Track if validation status has been loaded | |
}) | |
# Combine with existing items | |
all_items = items + new_items | |
log.info(f"Loaded {len(new_items)} more items after id {last_loaded_id}, total now: {len(all_items)}") | |
return all_items, total_count | |
def load_previous_items_fn(items, session, current_batch_size=5): | |
"""Load items before the current batch when user navigates backward""" | |
user_id = session.get("user_id") | |
username = session.get("username") | |
if not user_id or not username: | |
return items, 0, 0 # Return existing items if no user session | |
# Find target annotator | |
target_annotator = None | |
for annotator_name, reviewer_name in conf.REVIEW_MAPPING.items(): | |
if reviewer_name == username: | |
target_annotator = annotator_name | |
break | |
if not target_annotator: | |
return items, 0, 0 | |
with get_db() as db: | |
target_annotator_obj = db.query(Annotator).filter_by(name=target_annotator).first() | |
if not target_annotator_obj: | |
return items, 0, 0 | |
# Get the target annotator's assigned intervals | |
assigned_intervals = db.query(AnnotationInterval).filter( | |
AnnotationInterval.annotator_id == target_annotator_obj.id | |
).all() | |
if not assigned_intervals: | |
return items, 0, 0 | |
# Count total annotations within assigned intervals for progress info | |
total_count = 0 | |
for interval in assigned_intervals: | |
if interval.start_index is None or interval.end_index is None: | |
continue | |
interval_count = db.query(Annotation).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
TTSData.id >= interval.start_index, | |
TTSData.id <= interval.end_index | |
).count() | |
total_count += interval_count | |
# Get the first loaded annotation id to load items before it | |
first_loaded_id = items[0]["annotation_id"] if items else float('inf') | |
# LOAD ITEMS BEFORE: Use id-based pagination to get previous items | |
query = db.query( | |
Annotation, | |
TTSData.filename, | |
TTSData.sentence | |
).join( | |
TTSData, Annotation.tts_data_id == TTSData.id | |
).join( | |
AnnotationInterval, | |
and_( | |
AnnotationInterval.annotator_id == target_annotator_obj.id, | |
TTSData.id >= AnnotationInterval.start_index, | |
TTSData.id <= AnnotationInterval.end_index | |
) | |
).filter( | |
Annotation.annotator_id == target_annotator_obj.id, | |
Annotation.id < first_loaded_id | |
).order_by(Annotation.id.desc()).limit(current_batch_size) | |
results = query.all() | |
results.reverse() # Restore ascending order | |
# Process new items with minimal data - validation status loaded on-demand | |
new_items = [] | |
for annotation, filename, sentence in results: | |
# Check if annotation is deleted (minimal processing) | |
is_deleted = not annotation.annotated_sentence or annotation.annotated_sentence.strip() == "" | |
annotated_sentence_display = "[DELETED ANNOTATION]" if is_deleted else annotation.annotated_sentence | |
new_items.append({ | |
"annotation_id": annotation.id, | |
"tts_id": annotation.tts_data_id, | |
"filename": filename, | |
"sentence": sentence, | |
"annotated_sentence": annotated_sentence_display, | |
"is_deleted": is_deleted, | |
"annotated_at": annotation.annotated_at.isoformat() if annotation.annotated_at else "", | |
"validation_status": "Loading...", # Will be loaded on-demand | |
"validation_loaded": False # Track if validation status has been loaded | |
}) | |
# Combine with existing items (new items go to the front) | |
all_items = new_items + items | |
loaded_count = len(new_items) | |
log.info(f"Loaded {loaded_count} items before id {first_loaded_id}, total now: {len(all_items)}") | |
return all_items, total_count, loaded_count | |
# Output definitions | |
review_display_outputs = [ | |
self.tts_id, self.filename, self.sentence, self.ann_sentence, | |
self.annotated_at, | |
self.current_validation_status, | |
self.header.welcome, # Placeholder for anonymized annotator name | |
self.audio, | |
self.rejection_reason_input, # Added rejection reason input to display outputs | |
self.rejection_mode_active, # Added rejection mode state | |
self.btn_reject # Added reject button to display outputs | |
] | |
# Trigger data loading when load_trigger changes (after successful login for a reviewer) | |
self.load_trigger.change( | |
fn=lambda: update_ui_interactive_state(False), | |
outputs=self.interactive_ui_elements | |
).then( | |
fn=load_review_items_fn, | |
inputs=[session_state], | |
outputs=[self.items_state, self.idx_state, self.review_info] + review_display_outputs | |
).then( | |
fn=get_review_progress_fn, | |
inputs=[session_state], | |
outputs=[self.header.progress_display] | |
).then( | |
fn=lambda: (None, gr.update(value=None)), # Clear audio state | |
outputs=[self.original_audio_state, self.audio] | |
).then( | |
fn=lambda: update_ui_interactive_state(True), | |
outputs=self.interactive_ui_elements | |
) | |
# Audio loading is now manual only via the Load Audio button | |
# Removed automatic filename.change callback to prevent slow loading during initialization | |
# Navigation buttons | |
for btn, direction in [(self.btn_prev, "prev"), (self.btn_next, "next")]: | |
btn.click( | |
fn=lambda: update_ui_interactive_state(False), | |
outputs=self.interactive_ui_elements | |
).then( | |
fn=lambda items, idx, session, dir=direction: navigate_and_load_fn(items, idx, dir, session), | |
inputs=[self.items_state, self.idx_state, session_state], | |
outputs=[self.items_state, self.idx_state, self.review_info] | |
).then( | |
fn=show_current_review_item_fn, | |
inputs=[self.items_state, self.idx_state, session_state], | |
outputs=review_display_outputs | |
).then( | |
# Auto-load audio with autoplay for smooth navigation | |
fn=download_voice_fn, | |
inputs=[self.filename], | |
outputs=[self.audio, self.original_audio_state, self.audio] | |
).then( | |
lambda: gr.update(value=None), | |
outputs=self.jump_data_id_input | |
).then( | |
fn=lambda: update_ui_interactive_state(True), | |
outputs=self.interactive_ui_elements | |
) | |
# Approve/Reject buttons | |
self.btn_approve.click( | |
fn=lambda items, idx, session: save_validation_fn(items, idx, session, approved=True, rejection_reason=""), # Pass empty rejection_reason | |
inputs=[self.items_state, self.idx_state, session_state], | |
outputs=[self.items_state, self.current_validation_status, self.rejection_reason_input] | |
).then( | |
fn=get_review_progress_fn, # Update progress after approval | |
inputs=[session_state], | |
outputs=[self.header.progress_display] | |
).then( | |
fn=lambda: False, # Reset rejection mode | |
outputs=[self.rejection_mode_active] | |
).then( | |
fn=lambda: gr.update(value="β Reject"), # Reset reject button | |
outputs=[self.btn_reject] | |
).then( | |
fn=lambda items, idx, session: navigate_and_load_fn(items, idx, "next", session), | |
inputs=[self.items_state, self.idx_state, session_state], | |
outputs=[self.items_state, self.idx_state, self.review_info] | |
).then( | |
fn=show_current_review_item_fn, | |
inputs=[self.items_state, self.idx_state, session_state], | |
outputs=review_display_outputs | |
).then( | |
# Auto-load audio with autoplay after moving to next item | |
fn=download_voice_fn, | |
inputs=[self.filename], | |
outputs=[self.audio, self.original_audio_state, self.audio] | |
) | |
self.btn_reject.click( | |
fn=handle_rejection_fn, | |
inputs=[self.items_state, self.idx_state, session_state, self.rejection_reason_input, self.rejection_mode_active], | |
outputs=[self.items_state, self.current_validation_status, self.rejection_reason_input, self.rejection_mode_active, self.btn_reject] | |
).then( | |
fn=lambda items, idx, session, rejection_mode: get_review_progress_fn(session) if not rejection_mode else "", # Update progress only after successful rejection | |
inputs=[self.items_state, self.idx_state, session_state, self.rejection_mode_active], | |
outputs=[self.header.progress_display] | |
).then( | |
fn=lambda items, idx, session, rejection_mode: navigate_and_load_fn(items, idx, "next", session) if not rejection_mode else (items, idx, ""), | |
inputs=[self.items_state, self.idx_state, session_state, self.rejection_mode_active], | |
outputs=[self.items_state, self.idx_state, self.review_info] | |
).then( | |
fn=lambda items, idx, session, rejection_mode: show_current_review_item_fn(items, idx, session) if not rejection_mode else ( | |
str(items[idx]["tts_id"]) if items and idx < len(items) else "", | |
items[idx]["filename"] if items and idx < len(items) else "", | |
items[idx]["sentence"] if items and idx < len(items) else "", | |
items[idx]["annotated_sentence"] if items and idx < len(items) else "", | |
items[idx]["annotated_at"] if items and idx < len(items) else "", | |
items[idx]["validation_status"] if items and idx < len(items) else "", | |
"", # annotator placeholder | |
gr.update(value=None, autoplay=False), # audio | |
gr.update(), # rejection_reason_input - don't change | |
rejection_mode, # keep rejection mode as is | |
gr.update() # btn_reject - don't change | |
), | |
inputs=[self.items_state, self.idx_state, session_state, self.rejection_mode_active], | |
outputs=review_display_outputs | |
).then( | |
# Auto-load audio with autoplay only if we moved to next item (not in rejection mode) | |
fn=lambda filename, rejection_mode: download_voice_fn(filename) if not rejection_mode else (None, None, gr.update(value=None, autoplay=False)), | |
inputs=[self.filename, self.rejection_mode_active], | |
outputs=[self.audio, self.original_audio_state, self.audio] | |
) | |
# Skip button (just navigate to next) | |
self.btn_skip.click( | |
fn=lambda items, idx, session: navigate_and_load_fn(items, idx, "next", session), | |
inputs=[self.items_state, self.idx_state, session_state], | |
outputs=[self.items_state, self.idx_state, self.review_info] | |
).then( | |
fn=show_current_review_item_fn, | |
inputs=[self.items_state, self.idx_state, session_state], | |
outputs=review_display_outputs | |
).then( | |
# Auto-load audio with autoplay after skipping | |
fn=download_voice_fn, | |
inputs=[self.filename], | |
outputs=[self.audio, self.original_audio_state, self.audio] | |
) | |
# Jump button | |
self.btn_jump.click( | |
fn=jump_by_data_id_fn, | |
inputs=[self.items_state, self.jump_data_id_input, self.idx_state, session_state], | |
outputs=[self.items_state, self.idx_state, self.review_info] | |
).then( | |
fn=show_current_review_item_fn, | |
inputs=[self.items_state, self.idx_state, session_state], | |
outputs=review_display_outputs | |
).then( | |
# Auto-load audio with autoplay after jumping | |
fn=download_voice_fn, | |
inputs=[self.filename], | |
outputs=[self.audio, self.original_audio_state, self.audio] | |
).then( | |
lambda: gr.update(value=None), | |
outputs=self.jump_data_id_input | |
) | |
# Load audio button | |
self.btn_load_voice.click( | |
fn=lambda: update_ui_interactive_state(False), | |
outputs=self.interactive_ui_elements | |
).then( | |
fn=download_voice_fn, | |
inputs=[self.filename], | |
outputs=[self.audio, self.original_audio_state, self.audio] | |
).then( | |
fn=lambda: update_ui_interactive_state(True), | |
outputs=self.interactive_ui_elements | |
) | |
return self.container | |