#!/usr/bin/env python3 """ Phase 2 Rejected/Unreviewed Items Report Script This script lists all rejected or unreviewed items from Phase 2 review process, showing TTS data indices, rejection reasons, and detailed information. """ import argparse import sys import os from datetime import datetime from sqlalchemy import and_, or_ from sqlalchemy.orm import joinedload # Add project root to Python path project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) if project_root not in sys.path: sys.path.insert(0, project_root) from utils.database import get_db from data.models import Annotator, Annotation, Validation, TTSData from utils.logger import Logger from config import conf log = Logger() def list_rejected_unreviewed_items(status_filter="all", reviewer_filter=None, annotator_filter=None, export_csv=False): """ Lists rejected or unreviewed items from Phase 2 review process. Args: status_filter (str): Filter by status - "rejected", "unreviewed", or "all" reviewer_filter (str): Filter by specific reviewer name annotator_filter (str): Filter by specific annotator whose work is being reviewed export_csv (bool): Export results to CSV file """ with get_db() as db: try: print("=" * 80) print(" PHASE 2 REJECTED/UNREVIEWED ITEMS REPORT") print("=" * 80) print(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"Status filter: {status_filter.upper()}") if reviewer_filter: print(f"Reviewer filter: {reviewer_filter}") if annotator_filter: print(f"Annotator filter: {annotator_filter}") print() # Get review mapping pairs review_pairs = [] for annotator_name, reviewer_name in conf.REVIEW_MAPPING.items(): # Apply filters if reviewer_filter and reviewer_name != reviewer_filter: continue if annotator_filter and annotator_name != annotator_filter: continue # Get annotator and reviewer objects annotator = db.query(Annotator).filter_by(name=annotator_name).first() reviewer = db.query(Annotator).filter_by(name=reviewer_name).first() if annotator and reviewer: review_pairs.append((annotator, reviewer)) else: print(f"āš ļø Warning: Missing annotator ({annotator_name}) or reviewer ({reviewer_name}) in database") if not review_pairs: print("No valid review pairs found with current filters.") return all_items = [] total_rejected = 0 total_unreviewed = 0 # Process each review pair for annotator, reviewer in review_pairs: print(f"\nšŸ“‹ REVIEWER: {reviewer.name} → ANNOTATOR: {annotator.name}") print("-" * 60) # Get all annotations by this annotator annotations_query = db.query(Annotation).join(TTSData).filter( Annotation.annotator_id == annotator.id, # Only include annotations that have actual content Annotation.annotated_sentence.isnot(None), Annotation.annotated_sentence != "" ).options( joinedload(Annotation.tts_data) ).order_by(TTSData.id) annotations = annotations_query.all() if not annotations: print(" No annotations found for this annotator.") continue print(f" Total annotations to review: {len(annotations)}") rejected_items = [] unreviewed_items = [] for annotation in annotations: # Check if this annotation has been reviewed by the assigned reviewer validation = db.query(Validation).filter_by( annotation_id=annotation.id, validator_id=reviewer.id ).first() item_data = { "tts_id": annotation.tts_data.id, "filename": annotation.tts_data.filename, "original_sentence": annotation.tts_data.sentence, "annotated_sentence": annotation.annotated_sentence, "annotator": annotator.name, "reviewer": reviewer.name, "annotated_at": annotation.annotated_at.strftime('%Y-%m-%d %H:%M:%S') if annotation.annotated_at else "N/A" } if not validation: # Unreviewed item_data["status"] = "Unreviewed" item_data["rejection_reason"] = "" unreviewed_items.append(item_data) all_items.append(item_data) elif not validation.validated: # Rejected item_data["status"] = "Rejected" item_data["rejection_reason"] = validation.description or "No reason provided" rejected_items.append(item_data) all_items.append(item_data) # Print summary for this pair pair_rejected = len(rejected_items) pair_unreviewed = len(unreviewed_items) total_rejected += pair_rejected total_unreviewed += pair_unreviewed print(f" āŒ Rejected: {pair_rejected}") print(f" ā³ Unreviewed: {pair_unreviewed}") # Show detailed items based on filter items_to_show = [] if status_filter == "rejected" or status_filter == "all": items_to_show.extend(rejected_items) if status_filter == "unreviewed" or status_filter == "all": items_to_show.extend(unreviewed_items) if items_to_show: print(f"\n šŸ“ Detailed Items ({len(items_to_show)}):") for item in sorted(items_to_show, key=lambda x: x["tts_id"]): status_icon = "āŒ" if item["status"] == "Rejected" else "ā³" print(f" {status_icon} ID: {item['tts_id']} | Status: {item['status']}") if item["status"] == "Rejected" and item["rejection_reason"]: print(f" Reason: {item['rejection_reason']}") # Show truncated sentences for context orig_preview = item["original_sentence"][:80] + "..." if len(item["original_sentence"]) > 80 else item["original_sentence"] ann_preview = item["annotated_sentence"][:80] + "..." if len(item["annotated_sentence"]) > 80 else item["annotated_sentence"] print(f" Original: {orig_preview}") print(f" Annotated: {ann_preview}") print(f" Annotated at: {item['annotated_at']}") print() # Overall summary print("\n" + "=" * 80) print(" OVERALL SUMMARY") print("=" * 80) print(f"šŸ“Š Total items found: {len(all_items)}") print(f"āŒ Total rejected: {total_rejected}") print(f"ā³ Total unreviewed: {total_unreviewed}") # Export to CSV if requested if export_csv and all_items: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') csv_filename = f"phase2_rejected_unreviewed_{timestamp}.csv" import csv with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = ['tts_id', 'status', 'rejection_reason', 'annotator', 'reviewer', 'filename', 'original_sentence', 'annotated_sentence', 'annotated_at'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for item in sorted(all_items, key=lambda x: x["tts_id"]): writer.writerow(item) print(f"\nšŸ’¾ Results exported to: {csv_filename}") except Exception as e: log.error(f"Error generating rejected/unreviewed items report: {e}") print(f"āŒ Error: {e}") def list_by_ids(ids_list, export_csv=False): """ Lists specific TTS data items by their IDs and shows their Phase 2 review status. Args: ids_list (list): List of TTS data IDs to look up export_csv (bool): Export results to CSV file """ with get_db() as db: try: print("=" * 80) print(" PHASE 2 STATUS FOR SPECIFIC IDS") print("=" * 80) print(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"Requested IDs: {', '.join(map(str, ids_list))}") print() found_items = [] not_found = [] for tts_id in ids_list: # Find the TTS data tts_data = db.query(TTSData).filter_by(id=tts_id).first() if not tts_data: not_found.append(tts_id) continue # Find the annotation for this TTS data annotation = db.query(Annotation).filter_by(tts_data_id=tts_id).first() if not annotation: print(f"āš ļø ID {tts_id}: No annotation found") continue # Find the assigned reviewer for this annotator annotator = db.query(Annotator).filter_by(id=annotation.annotator_id).first() if not annotator: print(f"āš ļø ID {tts_id}: Annotator not found") continue reviewer_name = conf.REVIEW_MAPPING.get(annotator.name) if not reviewer_name: print(f"āš ļø ID {tts_id}: No reviewer assigned for annotator {annotator.name}") continue reviewer = db.query(Annotator).filter_by(name=reviewer_name).first() if not reviewer: print(f"āš ļø ID {tts_id}: Reviewer {reviewer_name} not found in database") continue # Check validation status validation = db.query(Validation).filter_by( annotation_id=annotation.id, validator_id=reviewer.id ).first() status = "Unreviewed" rejection_reason = "" if validation: if validation.validated: status = "Approved" else: status = "Rejected" rejection_reason = validation.description or "No reason provided" item_data = { "tts_id": tts_id, "status": status, "rejection_reason": rejection_reason, "annotator": annotator.name, "reviewer": reviewer.name, "filename": tts_data.filename, "original_sentence": tts_data.sentence, "annotated_sentence": annotation.annotated_sentence or "[No annotation]", "annotated_at": annotation.annotated_at.strftime('%Y-%m-%d %H:%M:%S') if annotation.annotated_at else "N/A" } found_items.append(item_data) # Display the item status_icon = "āœ…" if status == "Approved" else "āŒ" if status == "Rejected" else "ā³" print(f"{status_icon} ID: {tts_id} | Status: {status} | Annotator: {annotator.name} | Reviewer: {reviewer.name}") if status == "Rejected" and rejection_reason: print(f" Rejection Reason: {rejection_reason}") orig_preview = tts_data.sentence[:100] + "..." if len(tts_data.sentence) > 100 else tts_data.sentence ann_preview = (annotation.annotated_sentence[:100] + "..." if annotation.annotated_sentence and len(annotation.annotated_sentence) > 100 else annotation.annotated_sentence or "[No annotation]") print(f" Original: {orig_preview}") print(f" Annotated: {ann_preview}") print(f" Annotated at: {item_data['annotated_at']}") print() if not_found: print(f"āš ļø IDs not found: {', '.join(map(str, not_found))}") # Export to CSV if requested if export_csv and found_items: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') csv_filename = f"phase2_specific_ids_{timestamp}.csv" import csv with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = ['tts_id', 'status', 'rejection_reason', 'annotator', 'reviewer', 'filename', 'original_sentence', 'annotated_sentence', 'annotated_at'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for item in found_items: writer.writerow(item) print(f"šŸ’¾ Results exported to: {csv_filename}") except Exception as e: log.error(f"Error looking up specific IDs: {e}") print(f"āŒ Error: {e}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="List rejected or unreviewed items from Phase 2 review process.") subparsers = parser.add_subparsers(dest='command', help='Available commands') # List command list_parser = subparsers.add_parser('list', help='List rejected/unreviewed items') list_parser.add_argument( "--status", choices=["rejected", "unreviewed", "all"], default="all", help="Filter by status (default: all)" ) list_parser.add_argument( "--reviewer", type=str, help="Filter by specific reviewer name" ) list_parser.add_argument( "--annotator", type=str, help="Filter by specific annotator whose work is being reviewed" ) list_parser.add_argument( "--csv", action="store_true", help="Export results to CSV file" ) # IDs command ids_parser = subparsers.add_parser('ids', help='Check status of specific TTS data IDs') ids_parser.add_argument( "ids", nargs='+', type=int, help="TTS data IDs to check" ) ids_parser.add_argument( "--csv", action="store_true", help="Export results to CSV file" ) args = parser.parse_args() if args.command == 'list': list_rejected_unreviewed_items( status_filter=args.status, reviewer_filter=args.reviewer, annotator_filter=args.annotator, export_csv=args.csv ) elif args.command == 'ids': list_by_ids(args.ids, export_csv=args.csv) else: parser.print_help()