Spaces:
Sleeping
Sleeping
| import os | |
| from flask import Blueprint, request, render_template, jsonify, current_app | |
| # Import services | |
| from .services.tokenizer_service import tokenizer_service | |
| from .services.file_service import file_service | |
| from .utils.validators import validators, ValidationError | |
| # Create Blueprint | |
| main_bp = Blueprint('main', __name__) | |
| def tokenizer_info(): | |
| """Endpoint to get tokenizer information without processing text.""" | |
| model_id = request.args.get('model_id', '') | |
| is_custom = request.args.get('is_custom', 'false').lower() == 'true' | |
| if not model_id: | |
| return jsonify({"error": "No model ID provided"}), 400 | |
| try: | |
| # Validate custom model path if it's a custom model | |
| if is_custom: | |
| try: | |
| validators.validate_model_path(model_id) | |
| except ValidationError as e: | |
| return jsonify({"error": str(e)}), 400 | |
| # For predefined models, use the model name from the dictionary | |
| if not is_custom and tokenizer_service.is_predefined_model(model_id): | |
| model_id_or_name = model_id | |
| else: | |
| # For custom models, use the model ID directly | |
| model_id_or_name = model_id | |
| # Load the tokenizer and get info | |
| tokenizer, info, error = tokenizer_service.load_tokenizer(model_id_or_name) | |
| if error: | |
| return jsonify({"error": error}), 400 | |
| return jsonify(info) | |
| except Exception as e: | |
| return jsonify({"error": f"Failed to get tokenizer info: {str(e)}"}), 500 | |
| def index(): | |
| text = "" | |
| token_data = None | |
| error_message = "" | |
| selected_model = request.args.get('model', request.form.get('model', 'qwen3')) | |
| custom_model = request.args.get('custom_model', request.form.get('custom_model', '')) | |
| model_type = request.args.get('model_type', request.form.get('model_type', 'predefined')) | |
| # Determine which model to use based on model_type | |
| model_to_use = selected_model if model_type == 'predefined' else custom_model | |
| if request.method == 'POST': | |
| # Check if file upload | |
| if 'file' in request.files and request.files['file'].filename: | |
| uploaded_file = request.files['file'] | |
| try: | |
| # Validate file | |
| validators.validate_filename(uploaded_file.filename) | |
| validators.validate_file_extension(uploaded_file.filename, file_service.ALLOWED_EXTENSIONS) | |
| # Validate custom model if needed | |
| if model_type == 'custom' and custom_model: | |
| validators.validate_model_path(custom_model) | |
| # Save file securely | |
| file_path = file_service.save_uploaded_file(uploaded_file, current_app.config['UPLOAD_FOLDER']) | |
| # Read a small preview of the file | |
| preview_char_limit = current_app.config.get('PREVIEW_CHAR_LIMIT', 8096) | |
| with open(file_path, 'r', errors='replace') as f: | |
| text = f.read(preview_char_limit) | |
| try: | |
| # Process the file using file service | |
| token_data = file_service.process_file_for_tokenization( | |
| file_path=file_path, | |
| model_id_or_name=model_to_use, | |
| preview_char_limit=preview_char_limit, | |
| max_display_tokens=current_app.config.get('MAX_DISPLAY_TOKENS', 50000), | |
| chunk_size=current_app.config.get('CHUNK_SIZE', 1024 * 1024) | |
| ) | |
| # Clean up the file after processing | |
| file_service.cleanup_file(file_path) | |
| # If request is AJAX, return JSON | |
| if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
| return jsonify(token_data) | |
| except Exception as e: | |
| error_message = str(e) | |
| file_service.cleanup_file(file_path) | |
| if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
| return jsonify({"error": error_message}), 400 | |
| return render_template( | |
| 'index.html', | |
| text=text, | |
| token_data=None, | |
| models=tokenizer_service.TOKENIZER_MODELS, | |
| selected_model=selected_model, | |
| custom_model=custom_model, | |
| model_type=model_type, | |
| error=error_message | |
| ) | |
| except ValidationError as e: | |
| error_message = str(e) | |
| if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
| return jsonify({"error": error_message}), 400 | |
| return render_template( | |
| 'index.html', | |
| text="", | |
| token_data=None, | |
| models=tokenizer_service.TOKENIZER_MODELS, | |
| selected_model=selected_model, | |
| custom_model=custom_model, | |
| model_type=model_type, | |
| error=error_message | |
| ) | |
| # Regular text processing | |
| else: | |
| text = request.form.get('text', '') | |
| if text: | |
| try: | |
| # Validate text input | |
| validators.validate_text_input(text) | |
| # Validate custom model if needed | |
| if model_type == 'custom' and custom_model: | |
| validators.validate_model_path(custom_model) | |
| # Process text using file service | |
| token_data = file_service.process_text_for_tokenization( | |
| text=text, | |
| model_id_or_name=model_to_use, | |
| preview_char_limit=current_app.config.get('PREVIEW_CHAR_LIMIT', 8096), | |
| max_display_tokens=current_app.config.get('MAX_DISPLAY_TOKENS', 50000) | |
| ) | |
| # If request is AJAX, return JSON | |
| if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
| return jsonify(token_data) | |
| except ValidationError as e: | |
| error_message = str(e) | |
| if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
| return jsonify({"error": error_message}), 400 | |
| return render_template( | |
| 'index.html', | |
| text=text, | |
| token_data=None, | |
| models=tokenizer_service.TOKENIZER_MODELS, | |
| selected_model=selected_model, | |
| custom_model=custom_model, | |
| model_type=model_type, | |
| error=error_message | |
| ) | |
| except Exception as e: | |
| error_message = str(e) | |
| if request.headers.get('X-Requested-With') == 'XMLHttpRequest': | |
| return jsonify({"error": error_message}), 400 | |
| return render_template( | |
| 'index.html', | |
| text=text, | |
| token_data=None, | |
| models=tokenizer_service.TOKENIZER_MODELS, | |
| selected_model=selected_model, | |
| custom_model=custom_model, | |
| model_type=model_type, | |
| error=error_message | |
| ) | |
| return render_template( | |
| 'index.html', | |
| text=text, | |
| token_data=token_data, | |
| models=tokenizer_service.TOKENIZER_MODELS, | |
| selected_model=selected_model, | |
| custom_model=custom_model, | |
| model_type=model_type, | |
| error=error_message | |
| ) | |
| def health_check(): | |
| """Basic health check endpoint.""" | |
| import time | |
| import psutil | |
| from flask import __version__ as flask_version | |
| try: | |
| # Basic application status | |
| status = { | |
| "status": "healthy", | |
| "timestamp": int(time.time()), | |
| "version": "1.0.0", | |
| "flask_version": flask_version, | |
| "uptime": int(time.time()), # Simple uptime since this request | |
| } | |
| return jsonify(status), 200 | |
| except Exception as e: | |
| return jsonify({ | |
| "status": "unhealthy", | |
| "error": str(e), | |
| "timestamp": int(time.time()) | |
| }), 500 | |
| def detailed_health_check(): | |
| """Detailed health check with system and service status.""" | |
| import time | |
| import psutil | |
| import os | |
| from flask import __version__ as flask_version | |
| try: | |
| # System information | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| memory = psutil.virtual_memory() | |
| disk = psutil.disk_usage('/') | |
| # Check tokenizer service | |
| tokenizer_status = "healthy" | |
| tokenizer_cache_size = len(tokenizer_service.tokenizers) + len(tokenizer_service.custom_tokenizers) | |
| # Test basic tokenizer loading | |
| try: | |
| test_tokenizer, _, error = tokenizer_service.load_tokenizer('gpt2') | |
| if error: | |
| tokenizer_status = f"warning: {error}" | |
| except Exception as e: | |
| tokenizer_status = f"error: {str(e)}" | |
| # Check upload directory | |
| upload_folder = current_app.config.get('UPLOAD_FOLDER', '/tmp') | |
| upload_dir_exists = os.path.exists(upload_folder) | |
| upload_dir_writable = os.access(upload_folder, os.W_OK) if upload_dir_exists else False | |
| status = { | |
| "status": "healthy", | |
| "timestamp": int(time.time()), | |
| "version": "1.0.0", | |
| "flask_version": flask_version, | |
| "system": { | |
| "cpu_percent": round(cpu_percent, 1), | |
| "memory": { | |
| "total": memory.total, | |
| "available": memory.available, | |
| "percent": memory.percent, | |
| "used": memory.used | |
| }, | |
| "disk": { | |
| "total": disk.total, | |
| "used": disk.used, | |
| "free": disk.free, | |
| "percent": round((disk.used / disk.total) * 100, 1) | |
| } | |
| }, | |
| "services": { | |
| "tokenizer_service": { | |
| "status": tokenizer_status, | |
| "cached_tokenizers": tokenizer_cache_size, | |
| "available_models": len(tokenizer_service.TOKENIZER_MODELS) | |
| }, | |
| "file_service": { | |
| "upload_directory": upload_folder, | |
| "directory_exists": upload_dir_exists, | |
| "directory_writable": upload_dir_writable, | |
| "allowed_extensions": list(file_service.ALLOWED_EXTENSIONS) | |
| } | |
| }, | |
| "configuration": { | |
| "max_content_length": current_app.config.get('MAX_CONTENT_LENGTH'), | |
| "cache_expiration": current_app.config.get('CACHE_EXPIRATION', 3600), | |
| "max_display_tokens": current_app.config.get('MAX_DISPLAY_TOKENS', 50000), | |
| "preview_char_limit": current_app.config.get('PREVIEW_CHAR_LIMIT', 8096) | |
| } | |
| } | |
| # Determine overall status | |
| overall_status = "healthy" | |
| if tokenizer_status.startswith("error"): | |
| overall_status = "unhealthy" | |
| elif tokenizer_status.startswith("warning") or not upload_dir_writable: | |
| overall_status = "degraded" | |
| status["status"] = overall_status | |
| return jsonify(status), 200 if overall_status == "healthy" else 503 | |
| except Exception as e: | |
| return jsonify({ | |
| "status": "unhealthy", | |
| "error": str(e), | |
| "timestamp": int(time.time()) | |
| }), 500 | |
| def readiness_check(): | |
| """Readiness check - determines if the application is ready to serve requests.""" | |
| try: | |
| # Check if core services are ready | |
| checks = { | |
| "tokenizer_service": False, | |
| "file_service": False, | |
| "configuration": False | |
| } | |
| # Test tokenizer service | |
| try: | |
| test_tokenizer, _, error = tokenizer_service.load_tokenizer('gpt2') | |
| checks["tokenizer_service"] = error is None | |
| except: | |
| checks["tokenizer_service"] = False | |
| # Test file service | |
| try: | |
| upload_folder = current_app.config.get('UPLOAD_FOLDER', '/tmp') | |
| checks["file_service"] = os.path.exists(upload_folder) and os.access(upload_folder, os.W_OK) | |
| except: | |
| checks["file_service"] = False | |
| # Check configuration | |
| required_configs = ['MAX_CONTENT_LENGTH', 'UPLOAD_FOLDER'] | |
| checks["configuration"] = all(current_app.config.get(config) is not None for config in required_configs) | |
| all_ready = all(checks.values()) | |
| return jsonify({ | |
| "ready": all_ready, | |
| "checks": checks, | |
| "timestamp": int(time.time()) | |
| }), 200 if all_ready else 503 | |
| except Exception as e: | |
| return jsonify({ | |
| "ready": False, | |
| "error": str(e), | |
| "timestamp": int(time.time()) | |
| }), 500 |