import gradio as gr import cv2 import numpy as np from groq import Groq import time from PIL import Image as PILImage import io import base64 import torch class RobustSafetyMonitor: def __init__(self): """Initialize the robust safety detection tool with configuration.""" self.client = Groq() self.model_name = "llama-3.2-90b-vision-preview" self.max_image_size = (800, 800) self.colors = [(0, 0, 255), (255, 0, 0), (0, 255, 0), (255, 255, 0), (255, 0, 255)] # Load YOLOv5 model for general object detection self.yolo_model = torch.hub.load('ultralytics/yolov5', 'yolov5s') def preprocess_image(self, frame): """Process image for analysis.""" if len(frame.shape) == 2: frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) elif len(frame.shape) == 3 and frame.shape[2] == 4: frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB) return self.resize_image(frame) def resize_image(self, image): """Resize image while maintaining aspect ratio.""" height, width = image.shape[:2] if height > self.max_image_size[1] or width > self.max_image_size[0]: aspect = width / height if width > height: new_width = self.max_image_size[0] new_height = int(new_width / aspect) else: new_height = self.max_image_size[1] new_width = int(new_height * aspect) return cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) return image def encode_image(self, frame): """Convert image to base64 encoding without extra formatting.""" frame_pil = PILImage.fromarray(frame) buffered = io.BytesIO() frame_pil.save(buffered, format="JPEG", quality=95) # Ensure JPEG format img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') return img_base64 # Return only the base64 string, no "data:image/jpeg;base64," def detect_objects(self, frame): """Detect objects using YOLOv5.""" results = self.yolo_model(frame) # Extract bounding boxes, class labels, and confidence scores bbox_data = results.xyxy[0].numpy() # Bounding box coordinates labels = results.names # Class names return bbox_data, labels def analyze_frame(self, frame): """Perform safety analysis on the frame using Llama Vision 3.2.""" if frame is None: return "No frame received", {} frame = self.preprocess_image(frame) image_base64 = self.encode_image(frame) try: # Use Llama Vision 3.2 to analyze the context of the image and detect risks completion = self.client.chat.completions.create( model=self.model_name, messages=[ { "role": "user", "content": [ { "type": "text", "text": """Analyze this workplace image and identify any potential safety risks. Consider the positioning of workers, the equipment, materials, and environment. Flag risks like improper equipment use, worker proximity to danger zones, unstable materials, and environmental hazards.""" }, { "type": "image_url", "image_url": { "url": image_base64 # Corrected: Send only the base64 string } } ] } ], temperature=0.7, max_tokens=1024, stream=False ) return completion.choices[0].message.content, {} except Exception as e: print(f"Analysis error: {str(e)}") return f"Analysis Error: {str(e)}", {} def draw_bounding_boxes(self, image, bboxes, labels, safety_issues): """Draw bounding boxes around objects based on safety issues flagged by Llama Vision.""" font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.5 thickness = 2 for idx, bbox in enumerate(bboxes): x1, y1, x2, y2, conf, class_id = bbox label = labels[int(class_id)] color = self.colors[idx % len(self.colors)] # Draw bounding box cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), color, thickness) # Link detected object to potential risks based on Llama Vision analysis if any(safety_issue.lower() in label.lower() for safety_issue in safety_issues): label_text = f"Risk: {label}" cv2.putText(image, label_text, (int(x1), int(y1) - 10), font, font_scale, (0, 0, 255), thickness) else: label_text = f"{label} {conf:.2f}" cv2.putText(image, label_text, (int(x1), int(y1) - 10), font, font_scale, color, thickness) return image def process_frame(self, frame): """Main processing pipeline for dynamic, robust safety analysis.""" if frame is None: return None, "No image provided" try: # Detect objects dynamically in the image using YOLO bbox_data, labels = self.detect_objects(frame) frame_with_boxes = self.draw_bounding_boxes(frame, bbox_data, labels, []) # Get dynamic safety analysis from Llama Vision 3.2 analysis, _ = self.analyze_frame(frame) # Dynamically parse the analysis to identify safety issues flagged safety_issues = self.parse_safety_analysis(analysis) # Update the frame with bounding boxes based on safety issues flagged annotated_frame = self.draw_bounding_boxes(frame_with_boxes, bbox_data, labels, safety_issues) return annotated_frame, analysis except Exception as e: print(f"Processing error: {str(e)}") return None, f"Error processing image: {str(e)}" def parse_safety_analysis(self, analysis): """Dynamically parse the safety analysis to identify contextual issues.""" safety_issues = [] for line in analysis.split('\n'): if "risk" in line.lower() or "hazard" in line.lower(): safety_issues.append(line.strip()) return safety_issues def create_monitor_interface(): monitor = RobustSafetyMonitor() with gr.Blocks() as demo: gr.Markdown("# Robust Safety Analysis System powered by Llama Vision 3.2") with gr.Row(): input_image = gr.Image(label="Upload Image") output_image = gr.Image(label="Safety Analysis") analysis_text = gr.Textbox(label="Detailed Analysis", lines=5) def analyze_image(image): if image is None: return None, "No image provided" try: processed_frame, analysis = monitor.process_frame(image) return processed_frame, analysis except Exception as e: print(f"Processing error: {str(e)}") return None, f"Error processing image: {str(e)}" input_image.change( fn=analyze_image, inputs=input_image, outputs=[output_image, analysis_text] ) gr.Markdown(""" ## Instructions: 1. Upload any workplace/safety-related image 2. View identified hazards and their locations 3. Read detailed analysis of safety concerns based on the image """) return demo if __name__ == "__main__": demo = create_monitor_interface() demo.launch()