import gradio as gr import cv2 import numpy as np import os import time import threading import base64 from ultralytics import YOLO from langchain_core.messages import HumanMessage from langchain_google_genai import ChatGoogleGenerativeAI # Set up Google API Key os.environ["GOOGLE_API_KEY"] = "YOUR_GOOGLE_API_KEY" # Replace with your API Key gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash") # Load YOLO model yolo_model = YOLO("best.pt") names = yolo_model.names # Constants for ROI detection cx1 = 491 offset = 8 current_date = time.strftime("%Y-%m-%d") crop_folder = f"crop_{current_date}" if not os.path.exists(crop_folder): os.makedirs(crop_folder) # Track processed IDs to avoid duplicate processing processed_track_ids = set() lock = threading.Lock() # Ensure thread-safe operations def encode_image_to_base64(image): _, img_buffer = cv2.imencode('.jpg', image) return base64.b64encode(img_buffer).decode('utf-8') def analyze_image_with_gemini(current_image): if current_image is None: return "No image available for analysis." current_image_data = encode_image_to_base64(current_image) message = HumanMessage( content=[ {"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"} ] ) try: response = gemini_model.invoke([message]) return response.content except Exception as e: return f"Error processing image: {e}" def save_crop_image(crop, track_id): filename = f"{crop_folder}/{track_id}.jpg" cv2.imwrite(filename, crop) return filename def process_crop_image(crop, track_id, responses): response = analyze_image_with_gemini(crop) responses.append((track_id, response)) def process_video(video_path): cap = cv2.VideoCapture(video_path) output_path = "output_video.mp4" fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(output_path, fourcc, 20.0, (1020, 500)) responses = [] while cap.isOpened(): ret, frame = cap.read() if not ret: break frame = cv2.resize(frame, (1020, 500)) results = yolo_model.track(frame, persist=True) if results[0].boxes is not None: boxes = results[0].boxes.xyxy.int().cpu().tolist() track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes) for box, track_id in zip(boxes, track_ids): with lock: # Prevent race condition if track_id not in processed_track_ids: x1, y1, x2, y2 = box crop = frame[y1:y2, x1:x2] save_crop_image(crop, track_id) threading.Thread(target=process_crop_image, args=(crop, track_id, responses)).start() processed_track_ids.add(track_id) out.write(frame) cap.release() out.release() return output_path, responses def process_and_return(video_file): if not video_file: return None, "No video uploaded." video_path = "uploaded_video.mp4" with open(video_path, "wb") as f: f.write(video_file) output_video_path, analysis_results = process_video(video_path) results_text = "\n".join([f"**Track ID {track_id}:** {response}" for track_id, response in analysis_results]) return output_video_path, results_text # Gradio Interface with gr.Blocks() as demo: gr.Markdown("# Bottle Label Checking using YOLO & Gemini AI") with gr.Row(): video_input = gr.File(label="Upload a video", type="binary") process_button = gr.Button("Process Video") with gr.Row(): video_output = gr.Video(label="Processed Video") download_button = gr.File(label="Download Processed Video") analysis_results = gr.Markdown(label="AI Analysis Results") process_button.click( fn=process_and_return, inputs=video_input, outputs=[video_output, analysis_results] ) download_button.change( fn=lambda x: x if x else None, inputs=video_output, outputs=download_button ) demo.launch()