Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| from ultralytics import YOLO | |
| import cvzone | |
| import base64 | |
| import os | |
| import gradio as gr | |
| from langchain_core.messages import HumanMessage | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| # β Set up Google API Key | |
| os.environ["GOOGLE_API_KEY"] = "AIzaSyDT0y1kJqgGKiOYiYFMXc-2kTgV_WLbOpA"#os.getenv("GOOGLE_API_KEY") | |
| # β Initialize the Gemini model | |
| gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash") | |
| # β Load the YOLO model | |
| yolo_model = YOLO("best.pt") | |
| names = yolo_model.names | |
| def encode_image_to_base64(image): | |
| """Encodes an image to a base64 string.""" | |
| _, img_buffer = cv2.imencode('.jpg', image) | |
| return base64.b64encode(img_buffer).decode('utf-8') | |
| def analyze_image_with_gemini(image): | |
| """Sends an image to Gemini AI for analysis.""" | |
| if image is None or image.shape[0] == 0 or image.shape[1] == 0: | |
| return "Error: Invalid image." | |
| image_data = encode_image_to_base64(image) | |
| message = HumanMessage(content=[ | |
| {"type": "text", "text": """ | |
| Analyze this image and determine if the label is present on the bottle. | |
| Return the result strictly in a structured table format: | |
| | Label Present | Damage | | |
| |--------------|--------| | |
| | Yes/No | Yes/No | | |
| """}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_data}"}, "description": "Detected product"} | |
| ]) | |
| try: | |
| response = gemini_model.invoke([message]) | |
| return response.content | |
| except Exception as e: | |
| return f"Error processing image: {e}" | |
| def process_video(video_path): | |
| """Processes the uploaded video frame by frame using YOLO and Gemini AI.""" | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return "Error: Could not open video file.", "" | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| output_video_path = "output.mp4" | |
| out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) | |
| vertical_center = width // 2 | |
| analyzed_objects = {} | |
| log_messages = [] | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| results = yolo_model.track(frame, persist=True) | |
| if results and results[0].boxes is not None and results[0].boxes.xyxy is not None: | |
| boxes = results[0].boxes.xyxy.int().cpu().tolist() | |
| class_ids = results[0].boxes.cls.int().cpu().tolist() | |
| track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes) | |
| for box, track_id, class_id in zip(boxes, track_ids, class_ids): | |
| x1, y1, x2, y2 = box | |
| center_x = (x1 + x2) // 2 | |
| # β Apply bounding box only after the bottle reaches the left half of the frame | |
| if center_x > vertical_center: | |
| continue # Skip drawing before it crosses the center to the left side | |
| # Draw detection box | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
| cvzone.putTextRect(frame, f'ID: {track_id}', (x2, y2), 1, 1) | |
| cvzone.putTextRect(frame, f'{names[class_id]}', (x1, y1), 1, 1) | |
| # β Ensure label (analysis result) remains visible after detection | |
| if track_id not in analyzed_objects: | |
| crop = frame[y1:y2, x1:x2] | |
| response = analyze_image_with_gemini(crop) | |
| analyzed_objects[track_id] = response | |
| log_messages.append(f"Object {track_id}: {response}") # β Add log | |
| print(f"Object {track_id}: {response}") # β Print log for debugging | |
| # π οΈ Keep analysis text on screen for each analyzed object | |
| if track_id in analyzed_objects: | |
| response_text = analyzed_objects[track_id] | |
| text_x = 50 # Left side | |
| text_y = height // 2 # Middle of the frame | |
| cvzone.putTextRect(frame, response_text, (text_x, text_y), 2, 2, colorT=(255, 255, 255), colorR=(0, 0, 255)) | |
| out.write(frame) | |
| cap.release() | |
| out.release() | |
| return output_video_path, "\n".join(log_messages) # β Return logs along with the processed video | |
| def gradio_interface(video_path): | |
| """Handles Gradio video input and processes it.""" | |
| if video_path is None: | |
| return "Error: No video uploaded.", "" | |
| return process_video(video_path) | |
| # β Sample video file | |
| sample_video_path = "vid4.mp4" # Make sure this file is available in the working directory | |
| # β Gradio UI setup with sample video | |
| iface = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=gr.File(value=sample_video_path, type="filepath", label="Upload Video (Sample Included)"), | |
| outputs=[ | |
| gr.Video(label="Processed Video"), | |
| gr.Textbox(label="Processing Logs", lines=10, interactive=False) | |
| ], | |
| title="YOLO + Gemini AI Video Analysis", | |
| description="Upload a video to detect objects and analyze them using Gemini AI.\nA sample video is preloaded for quick testing.", | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(share=True) |