from flask import Flask, render_template, request, jsonify, send_from_directory, url_for from flask_cors import CORS import cv2 import torch import numpy as np import os from werkzeug.utils import secure_filename import sys import traceback from tensorflow.keras.models import load_model from tensorflow.keras.preprocessing import image import time # Add bodybuilding_pose_analyzer to path sys.path.append('.') # Assuming app.py is at the root of cv.github.io from bodybuilding_pose_analyzer.src.movenet_analyzer import MoveNetAnalyzer from bodybuilding_pose_analyzer.src.pose_analyzer import PoseAnalyzer # Add YOLOv7 to path sys.path.append('yolov7') from yolov7.models.experimental import attempt_load from yolov7.utils.general import check_img_size, non_max_suppression_kpt, scale_coords from yolov7.utils.torch_utils import select_device from yolov7.utils.plots import plot_skeleton_kpts def wrap_text(text: str, font_face: int, font_scale: float, thickness: int, max_width: int) -> list[str]: """Wrap text to fit within max_width.""" if not text: return [] lines = [] words = text.split(' ') current_line = '' for word in words: # Check width if current_line + word fits test_line = current_line + word + ' ' (text_width, _), _ = cv2.getTextSize(test_line.strip(), font_face, font_scale, thickness) if text_width <= max_width: current_line = test_line else: # Word doesn't fit, so current_line (without the new word) is a complete line lines.append(current_line.strip()) # Start new line with the current word current_line = word + ' ' # If a single word is too long, it will still overflow. Handle by breaking word if necessary (future enhancement) (single_word_width, _), _ = cv2.getTextSize(word.strip(), font_face, font_scale, thickness) if single_word_width > max_width: # For now, just add the long word and let it overflow, or truncate it. # A more complex solution would break the word. lines.append(word.strip()) # Add the long word as its own line current_line = '' # Reset current_line as the long word is handled if current_line.strip(): # Add the last line lines.append(current_line.strip()) return lines if lines else [text] # Ensure at least the original text is returned if no wrapping happens app = Flask(__name__, static_url_path='/static', static_folder='static') CORS(app, resources={r"/*": {"origins": "*"}}) app.config['UPLOAD_FOLDER'] = 'static/uploads' app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size # Ensure upload directory exists os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # Initialize YOLOv7 model device = select_device('') yolo_model = None # Initialize as None stride = None imgsz = None try: yolo_model = attempt_load('yolov7-w6-pose.pt', map_location=device) stride = int(yolo_model.stride.max()) imgsz = check_img_size(640, s=stride) print("YOLOv7 Model loaded successfully") except Exception as e: print(f"Error loading YOLOv7 model: {e}") traceback.print_exc() # Not raising here to allow app to run if only MoveNet is used. Error will be caught if YOLOv7 is selected. # YOLOv7 pose model expects 17 keypoints kpt_shape = (17, 3) # Load CNN model for bodybuilding pose classification cnn_model_path = 'external/BodybuildingPoseClassifier/bodybuilding_pose_classifier.h5' cnn_model = load_model(cnn_model_path) cnn_class_labels = ['side_chest', 'front_double_biceps', 'back_double_biceps', 'front_lat_spread', 'back_lat_spread'] def predict_pose_cnn(img_path): img = image.load_img(img_path, target_size=(150, 150)) img_array = image.img_to_array(img) img_array = np.expand_dims(img_array, axis=0) / 255.0 predictions = cnn_model.predict(img_array) predicted_class = np.argmax(predictions, axis=1) confidence = float(np.max(predictions)) return cnn_class_labels[predicted_class[0]], confidence @app.route('/static/uploads/') def serve_video(filename): response = send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=False) # Ensure correct content type, especially for Safari/iOS if issues arise if filename.lower().endswith('.mp4'): response.headers['Content-Type'] = 'video/mp4' return response @app.after_request def after_request(response): response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization,X-Requested-With,Accept') response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS') return response def process_video_yolov7(video_path): # Renamed from process_video global yolo_model, imgsz, stride # Ensure global model is used if yolo_model is None: raise RuntimeError("YOLOv7 model failed to load. Cannot process video.") try: if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Failed to open video file: {video_path}") fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"Processing video: {width}x{height} @ {fps}fps") # Create output video writer output_path = os.path.join(app.config['UPLOAD_FOLDER'], 'output.mp4') fourcc = cv2.VideoWriter_fourcc(*'avc1') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) frame_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 print(f"Processing frame {frame_count}") # Prepare image img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = cv2.resize(img, (imgsz, imgsz)) img = img.transpose((2, 0, 1)) # HWC to CHW img = np.ascontiguousarray(img) img = torch.from_numpy(img).to(device) img = img.float() / 255.0 if img.ndimension() == 3: img = img.unsqueeze(0) # Inference with torch.no_grad(): pred = yolo_model(img)[0] # Use yolo_model pred = non_max_suppression_kpt(pred, conf_thres=0.25, iou_thres=0.45, nc=yolo_model.yaml['nc'], kpt_label=True) # Draw results output_frame = frame.copy() poses_detected = False for det in pred: if len(det): poses_detected = True det[:, :4] = scale_coords(img.shape[2:], det[:, :4], frame.shape).round() for row in det: xyxy = row[:4] conf = row[4] cls = row[5] kpts = row[6:] kpts = torch.tensor(kpts).view(kpt_shape) output_frame = plot_skeleton_kpts(output_frame, kpts, steps=3, orig_shape=output_frame.shape[:2]) if not poses_detected: print(f"No poses detected in frame {frame_count}") out.write(output_frame) cap.release() out.release() if frame_count == 0: raise ValueError("No frames were processed from the video") print(f"Video processing completed. Processed {frame_count} frames") # Return URL for the client, using the 'serve_video' endpoint output_filename = 'output.mp4' return url_for('serve_video', filename=output_filename, _external=False) except Exception as e: print('Error in process_video:', e) traceback.print_exc() raise def process_video_movenet(video_path, model_variant='lightning', pose_type='front_double_biceps'): try: print(f"[PROCESS_VIDEO_MOVENET] Called with video_path: {video_path}, model_variant: {model_variant}, pose_type: {pose_type}") if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") analyzer = MoveNetAnalyzer(model_name=model_variant) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Failed to open video file: {video_path}") fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Add panel width to total width panel_width = 300 total_width = width + panel_width print(f"Processing video with MoveNet ({model_variant}): {width}x{height} @ {fps}fps") print(f"Output dimensions will be: {total_width}x{height}") output_filename = f'output_movenet_{model_variant}.mp4' output_path = os.path.join(app.config['UPLOAD_FOLDER'], output_filename) print(f"Output path: {output_path}") fourcc = cv2.VideoWriter_fourcc(*'avc1') out = cv2.VideoWriter(output_path, fourcc, fps, (total_width, height)) if not out.isOpened(): raise ValueError(f"Failed to create output video writer at {output_path}") frame_count = 0 current_pose = pose_type segment_length = 4 * fps if fps > 0 else 120 cnn_pose = None last_valid_landmarks = None landmarks_analysis = {'error': 'Processing not started'} # Initialize landmarks_analysis while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % 30 == 0: print(f"Processing frame {frame_count}") # Process frame processed_frame, current_landmarks_analysis, landmarks = analyzer.process_frame(frame, current_pose, last_valid_landmarks=last_valid_landmarks) landmarks_analysis = current_landmarks_analysis # Update with the latest analysis if frame_count % 30 == 0: # Log every 30 frames print(f"[MOVENET_DEBUG] Frame {frame_count} - landmarks_analysis: {landmarks_analysis}") if landmarks: last_valid_landmarks = landmarks # CNN prediction (every 4 seconds) if (frame_count - 1) % segment_length == 0: temp_img_path = f'temp_frame_for_cnn_{frame_count}.jpg' # Unique temp name cv2.imwrite(temp_img_path, frame) try: cnn_pose_pred, cnn_conf = predict_pose_cnn(temp_img_path) print(f"[CNN] Frame {frame_count}: Pose: {cnn_pose_pred}, Conf: {cnn_conf:.2f}") if cnn_conf >= 0.3: current_pose = cnn_pose_pred # Update current_pose for the analyzer except Exception as e: print(f"[CNN] Error predicting pose on frame {frame_count}: {e}") finally: if os.path.exists(temp_img_path): os.remove(temp_img_path) # Create side panel panel = np.zeros((height, panel_width, 3), dtype=np.uint8) # --- Dynamic Text Parameter Calculations --- current_font = cv2.FONT_HERSHEY_DUPLEX # Base font scale and reference video height for scaling # Adjust base_font_scale_at_ref_height if text is generally too large or too small base_font_scale_at_ref_height = 0.6 reference_height_for_font_scale = 640.0 # e.g., a common video height like 480p, 720p # Calculate dynamic font_scale font_scale = (height / reference_height_for_font_scale) * base_font_scale_at_ref_height # Clamp font_scale to a min/max range to avoid extremes font_scale = max(0.4, min(font_scale, 1.2)) # Calculate dynamic thickness thickness = 1 if font_scale < 0.7 else 2 # Calculate dynamic line_height based on actual text height # Using a sample string like "Ag" which has ascenders and descenders (_, text_actual_height), _ = cv2.getTextSize("Ag", current_font, font_scale, thickness) line_spacing_factor = 1.8 # Adjust for more or less space between lines line_height = int(text_actual_height * line_spacing_factor) line_height = max(line_height, 15) # Ensure a minimum line height # Initial y_offset for the first line of text y_offset_panel = max(line_height, 20) # Start considering top margin and text height # --- End of Dynamic Text Parameter Calculations --- display_model_name = f"Gladiator {model_variant.capitalize()}" cv2.putText(panel, f"Model: {display_model_name}", (10, y_offset_panel), current_font, font_scale, (0, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height if 'error' not in landmarks_analysis: cv2.putText(panel, "Angles:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height for joint, angle in landmarks_analysis.get('angles', {}).items(): text_to_display = f"{joint.capitalize()}: {angle:.1f} deg" cv2.putText(panel, text_to_display, (20, y_offset_panel), current_font, font_scale, (0, 255, 0), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height # Define available width for text within the panel, considering padding text_area_x_start = 20 panel_padding = 10 # Padding from the right edge of the panel text_area_width = panel_width - text_area_x_start - panel_padding if landmarks_analysis.get('corrections'): y_offset_panel += int(line_height * 0.5) # Smaller gap before section title cv2.putText(panel, "Corrections:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height for correction_text in landmarks_analysis.get('corrections', []): wrapped_lines = wrap_text(correction_text, current_font, font_scale, thickness, text_area_width) for line in wrapped_lines: cv2.putText(panel, line, (text_area_x_start, y_offset_panel), current_font, font_scale, (0, 0, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height # Display notes if any if landmarks_analysis.get('notes'): y_offset_panel += int(line_height * 0.5) # Smaller gap before section title cv2.putText(panel, "Notes:", (10, y_offset_panel), current_font, font_scale, (200, 200, 200), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height for note_text in landmarks_analysis.get('notes', []): wrapped_lines = wrap_text(note_text, current_font, font_scale, thickness, text_area_width) for line in wrapped_lines: cv2.putText(panel, line, (text_area_x_start, y_offset_panel), current_font, font_scale, (200, 200, 200), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height else: cv2.putText(panel, "Error:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height # Also wrap error message if it can be long error_text = landmarks_analysis.get('error', 'Unknown error') text_area_x_start = 20 # Assuming error message also starts at x=20 panel_padding = 10 text_area_width = panel_width - text_area_x_start - panel_padding wrapped_error_lines = wrap_text(error_text, current_font, font_scale, thickness, text_area_width) for line in wrapped_error_lines: cv2.putText(panel, line, (text_area_x_start, y_offset_panel), current_font, font_scale, (0, 0, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height combined_frame = np.hstack((processed_frame, panel)) out.write(combined_frame) cap.release() out.release() if frame_count == 0: raise ValueError("No frames were processed from the video by MoveNet") print(f"MoveNet video processing completed. Processed {frame_count} frames. Output: {output_path}") print(f"Output file size: {os.path.getsize(output_path)} bytes") return url_for('serve_video', filename=output_filename, _external=False) except Exception as e: print(f'Error in process_video_movenet: {e}') traceback.print_exc() raise def process_video_mediapipe(video_path): try: print(f"[PROCESS_VIDEO_MEDIAPIPE] Called with video_path: {video_path}") if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") analyzer = PoseAnalyzer() cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Failed to open video file: {video_path}") fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Add panel width to total width panel_width = 300 total_width = width + panel_width print(f"Processing video with MediaPipe: {width}x{height} @ {fps}fps") output_filename = f'output_mediapipe.mp4' output_path = os.path.join(app.config['UPLOAD_FOLDER'], output_filename) fourcc = cv2.VideoWriter_fourcc(*'avc1') out = cv2.VideoWriter(output_path, fourcc, fps, (total_width, height)) if not out.isOpened(): raise ValueError(f"Failed to create output video writer at {output_path}") frame_count = 0 current_pose = 'Uncertain' # Initial pose for MediaPipe segment_length = 4 * fps if fps > 0 else 120 cnn_pose = None last_valid_landmarks = None analysis_results = {'error': 'Processing not started'} # Initialize analysis_results while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % 30 == 0: print(f"Processing frame {frame_count}") # Process frame with MediaPipe processed_frame, current_analysis_results, landmarks = analyzer.process_frame(frame, last_valid_landmarks=last_valid_landmarks) analysis_results = current_analysis_results # Update with the latest analysis if landmarks: last_valid_landmarks = landmarks # CNN prediction (every 4 seconds) if (frame_count - 1) % segment_length == 0: temp_img_path = f'temp_frame_for_cnn_{frame_count}.jpg' # Unique temp name cv2.imwrite(temp_img_path, frame) try: cnn_pose_pred, cnn_conf = predict_pose_cnn(temp_img_path) print(f"[CNN] Frame {frame_count}: Pose: {cnn_pose_pred}, Conf: {cnn_conf:.2f}") if cnn_conf >= 0.3: current_pose = cnn_pose_pred # Update current_pose to be displayed except Exception as e: print(f"[CNN] Error predicting pose on frame {frame_count}: {e}") finally: if os.path.exists(temp_img_path): os.remove(temp_img_path) # Create side panel panel = np.zeros((height, panel_width, 3), dtype=np.uint8) # --- Dynamic Text Parameter Calculations --- current_font = cv2.FONT_HERSHEY_DUPLEX # Base font scale and reference video height for scaling # Adjust base_font_scale_at_ref_height if text is generally too large or too small base_font_scale_at_ref_height = 0.6 reference_height_for_font_scale = 640.0 # e.g., a common video height like 480p, 720p # Calculate dynamic font_scale font_scale = (height / reference_height_for_font_scale) * base_font_scale_at_ref_height # Clamp font_scale to a min/max range to avoid extremes font_scale = max(0.4, min(font_scale, 1.2)) # Calculate dynamic thickness thickness = 1 if font_scale < 0.7 else 2 # Calculate dynamic line_height based on actual text height # Using a sample string like "Ag" which has ascenders and descenders (_, text_actual_height), _ = cv2.getTextSize("Ag", current_font, font_scale, thickness) line_spacing_factor = 1.8 # Adjust for more or less space between lines line_height = int(text_actual_height * line_spacing_factor) line_height = max(line_height, 15) # Ensure a minimum line height # Initial y_offset for the first line of text y_offset_panel = max(line_height, 20) # Start considering top margin and text height # --- End of Dynamic Text Parameter Calculations --- cv2.putText(panel, "Model: Gladiator SupaDot", (10, y_offset_panel), current_font, font_scale, (0, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height if frame_count % 30 == 0: # Print every 30 frames to avoid flooding console print(f"[MEDIAPIPE_PANEL] Frame {frame_count} - Current Pose for Panel: {current_pose}") cv2.putText(panel, f"Pose: {current_pose}", (10, y_offset_panel), current_font, font_scale, (255, 0, 0), thickness, lineType=cv2.LINE_AA) y_offset_panel += int(line_height * 1.5) if 'error' not in analysis_results: cv2.putText(panel, "Angles:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height for joint, angle in analysis_results.get('angles', {}).items(): text_to_display = f"{joint.capitalize()}: {angle:.1f} deg" cv2.putText(panel, text_to_display, (20, y_offset_panel), current_font, font_scale, (0, 255, 0), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height if analysis_results.get('corrections'): y_offset_panel += line_height cv2.putText(panel, "Corrections:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height for correction in analysis_results.get('corrections', []): cv2.putText(panel, f"• {correction}", (20, y_offset_panel), current_font, font_scale, (0, 0, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height # Display notes if any if analysis_results.get('notes'): y_offset_panel += line_height cv2.putText(panel, "Notes:", (10, y_offset_panel), current_font, font_scale, (200, 200, 200), thickness, lineType=cv2.LINE_AA) # Grey color for notes y_offset_panel += line_height for note in analysis_results.get('notes', []): cv2.putText(panel, f"• {note}", (20, y_offset_panel), current_font, font_scale, (200, 200, 200), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height else: cv2.putText(panel, "Error:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height cv2.putText(panel, analysis_results.get('error', 'Unknown error'), (20, y_offset_panel), current_font, font_scale, (0, 0, 255), thickness, lineType=cv2.LINE_AA) combined_frame = np.hstack((processed_frame, panel)) # Use processed_frame from analyzer out.write(combined_frame) cap.release() out.release() if frame_count == 0: raise ValueError("No frames were processed from the video by MediaPipe") print(f"MediaPipe video processing completed. Processed {frame_count} frames. Output: {output_path}") return url_for('serve_video', filename=output_filename, _external=False) except Exception as e: print(f'Error in process_video_mediapipe: {e}') traceback.print_exc() raise @app.route('/') def index(): return render_template('index.html') @app.route('/upload', methods=['POST']) def upload_file(): try: if 'video' not in request.files: print("[UPLOAD] No video file in request") return jsonify({'error': 'No video file provided'}), 400 file = request.files['video'] if file.filename == '': print("[UPLOAD] Empty filename") return jsonify({'error': 'No selected file'}), 400 if file: allowed_extensions = {'mp4', 'avi', 'mov', 'mkv'} if '.' not in file.filename or file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions: print(f"[UPLOAD] Invalid file format: {file.filename}") return jsonify({'error': 'Invalid file format. Allowed formats: mp4, avi, mov, mkv'}), 400 # Ensure the filename is properly sanitized filename = secure_filename(file.filename) print(f"[UPLOAD] Original filename: {file.filename}") print(f"[UPLOAD] Sanitized filename: {filename}") # Create a unique filename to prevent conflicts base, ext = os.path.splitext(filename) unique_filename = f"{base}_{int(time.time())}{ext}" filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) print(f"[UPLOAD] Saving file to: {filepath}") file.save(filepath) if not os.path.exists(filepath): print(f"[UPLOAD] File not found after save: {filepath}") return jsonify({'error': 'Failed to save uploaded file'}), 500 print(f"[UPLOAD] File saved successfully. Size: {os.path.getsize(filepath)} bytes") try: model_choice = request.form.get('model_choice', 'Gladiator SupaDot') print(f"[UPLOAD] Processing with model: {model_choice}") if model_choice == 'movenet': movenet_variant = request.form.get('movenet_variant', 'lightning') print(f"[UPLOAD] Using MoveNet variant: {movenet_variant}") output_path_url = process_video_movenet(filepath, model_variant=movenet_variant) else: output_path_url = process_video_mediapipe(filepath) print(f"[UPLOAD] Processing complete. Output URL: {output_path_url}") if not os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], os.path.basename(output_path_url))): print(f"[UPLOAD] Output file not found: {output_path_url}") return jsonify({'error': 'Output video file not found'}), 500 return jsonify({ 'message': f'Video processed successfully with {model_choice}', 'output_path': output_path_url }) except Exception as e: print(f"[UPLOAD] Error processing video: {str(e)}") traceback.print_exc() return jsonify({'error': f'Error processing video: {str(e)}'}), 500 finally: try: if os.path.exists(filepath): os.remove(filepath) print(f"[UPLOAD] Cleaned up input file: {filepath}") except Exception as e: print(f"[UPLOAD] Error cleaning up file: {str(e)}") except Exception as e: print(f"[UPLOAD] Unexpected error: {str(e)}") traceback.print_exc() return jsonify({'error': 'Internal server error'}), 500 if __name__ == '__main__': # Ensure the port is 7860 and debug is False for HF Spaces deployment app.run(host='0.0.0.0', port=7860, debug=False)