capradeepgujaran's picture
Update app.py
a5f647b verified
raw
history blame
8 kB
import gradio as gr
import cv2
import numpy as np
from groq import Groq
import time
from PIL import Image as PILImage
import io
import base64
import torch
class RobustSafetyMonitor:
def __init__(self):
"""Initialize the robust safety detection tool with configuration."""
self.client = Groq()
self.model_name = "llama-3.2-90b-vision-preview"
self.max_image_size = (800, 800)
self.colors = [(0, 0, 255), (255, 0, 0), (0, 255, 0), (255, 255, 0), (255, 0, 255)]
# Load YOLOv5 model for general object detection
self.yolo_model = torch.hub.load('ultralytics/yolov5', 'yolov5s')
def preprocess_image(self, frame):
"""Process image for analysis."""
if len(frame.shape) == 2:
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
elif len(frame.shape) == 3 and frame.shape[2] == 4:
frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
return self.resize_image(frame)
def resize_image(self, image):
"""Resize image while maintaining aspect ratio."""
height, width = image.shape[:2]
if height > self.max_image_size[1] or width > self.max_image_size[0]:
aspect = width / height
if width > height:
new_width = self.max_image_size[0]
new_height = int(new_width / aspect)
else:
new_height = self.max_image_size[1]
new_width = int(new_height * aspect)
return cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
return image
def encode_image(self, frame):
"""Convert image to base64 encoding."""
frame_pil = PILImage.fromarray(frame)
buffered = io.BytesIO()
frame_pil.save(buffered, format="JPEG", quality=95)
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
return f"data:image/jpeg;base64,{img_base64}"
def detect_objects(self, frame):
"""Detect objects using YOLOv5."""
results = self.yolo_model(frame)
# Extract bounding boxes, class labels, and confidence scores
bbox_data = results.xyxy[0].numpy() # Bounding box coordinates
labels = results.names # Class names
return bbox_data, labels
def analyze_frame(self, frame):
"""Perform safety analysis on the frame using Llama Vision 3.2."""
if frame is None:
return "No frame received", {}
frame = self.preprocess_image(frame)
image_base64 = self.encode_image(frame)
try:
# Use Llama Vision 3.2 to analyze the context of the image and detect risks
completion = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": """Analyze this workplace image and identify any potential safety risks.
Consider the positioning of workers, the equipment, materials, and environment.
Flag risks like improper equipment use, worker proximity to danger zones, unstable materials, and environmental hazards."""
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}" # Use base64 for image
}
}
]
}
],
temperature=0.7,
max_tokens=1024,
stream=False
)
return completion.choices[0].message.content, {}
except Exception as e:
print(f"Analysis error: {str(e)}")
return f"Analysis Error: {str(e)}", {}
def draw_bounding_boxes(self, image, bboxes, labels, safety_issues):
"""Draw bounding boxes around objects based on safety issues flagged by Llama Vision."""
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
thickness = 2
for idx, bbox in enumerate(bboxes):
x1, y1, x2, y2, conf, class_id = bbox
label = labels[int(class_id)]
color = self.colors[idx % len(self.colors)]
# Draw bounding box
cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), color, thickness)
# Link detected object to potential risks based on Llama Vision analysis
if any(safety_issue.lower() in label.lower() for safety_issue in safety_issues):
label_text = f"Risk: {label}"
cv2.putText(image, label_text, (int(x1), int(y1) - 10), font, font_scale, (0, 0, 255), thickness)
else:
label_text = f"{label} {conf:.2f}"
cv2.putText(image, label_text, (int(x1), int(y1) - 10), font, font_scale, color, thickness)
return image
def process_frame(self, frame):
"""Main processing pipeline for dynamic, robust safety analysis."""
if frame is None:
return None, "No image provided"
try:
# Detect objects dynamically in the image using YOLO
bbox_data, labels = self.detect_objects(frame)
frame_with_boxes = self.draw_bounding_boxes(frame, bbox_data, labels, [])
# Get dynamic safety analysis from Llama Vision 3.2
analysis, _ = self.analyze_frame(frame)
# Dynamically parse the analysis to identify safety issues flagged
safety_issues = self.parse_safety_analysis(analysis)
# Update the frame with bounding boxes based on safety issues flagged
annotated_frame = self.draw_bounding_boxes(frame_with_boxes, bbox_data, labels, safety_issues)
return annotated_frame, analysis
except Exception as e:
print(f"Processing error: {str(e)}")
return None, f"Error processing image: {str(e)}"
def parse_safety_analysis(self, analysis):
"""Dynamically parse the safety analysis to identify contextual issues."""
safety_issues = []
for line in analysis.split('\n'):
if "risk" in line.lower() or "hazard" in line.lower():
safety_issues.append(line.strip())
return safety_issues
def create_monitor_interface():
monitor = RobustSafetyMonitor()
with gr.Blocks() as demo:
gr.Markdown("# Robust Safety Analysis System powered by Llama Vision 3.2")
with gr.Row():
input_image = gr.Image(label="Upload Image")
output_image = gr.Image(label="Safety Analysis")
analysis_text = gr.Textbox(label="Detailed Analysis", lines=5)
def analyze_image(image):
if image is None:
return None, "No image provided"
try:
processed_frame, analysis = monitor.process_frame(image)
return processed_frame, analysis
except Exception as e:
print(f"Processing error: {str(e)}")
return None, f"Error processing image: {str(e)}"
input_image.change(
fn=analyze_image,
inputs=input_image,
outputs=[output_image, analysis_text]
)
gr.Markdown("""
## Instructions:
1. Upload any workplace/safety-related image
2. View identified hazards and their locations
3. Read detailed analysis of safety concerns based on the image
""")
return demo
if __name__ == "__main__":
demo = create_monitor_interface()
demo.launch()