File size: 4,410 Bytes
ba059c9 54e76cb 6975a6c 54e76cb 6975a6c ba059c9 6975a6c 915ba3e 6975a6c 915ba3e 6975a6c 915ba3e 6975a6c 54e76cb 6975a6c ba059c9 6975a6c ba059c9 54e76cb 6975a6c 915ba3e ba059c9 54e76cb 915ba3e 22ddfde 6975a6c 915ba3e 6975a6c 915ba3e ba059c9 915ba3e 22ddfde 54e76cb 915ba3e 54e76cb ba059c9 54e76cb ba059c9 046137b ba059c9 22ddfde ba059c9 046137b ba059c9 22ddfde ba059c9 046137b ba059c9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
import gradio as gr
import cv2
import numpy as np
import os
import time
import threading
import base64
from ultralytics import YOLO
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
# Set up Google API Key
os.environ["GOOGLE_API_KEY"] = "YOUR_GOOGLE_API_KEY" # Replace with your API Key
gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash")
# Load YOLO model
yolo_model = YOLO("best.pt")
names = yolo_model.names
# Constants for ROI detection
cx1 = 491
offset = 8
current_date = time.strftime("%Y-%m-%d")
crop_folder = f"crop_{current_date}"
if not os.path.exists(crop_folder):
os.makedirs(crop_folder)
# Track processed IDs to avoid duplicate processing
processed_track_ids = set()
lock = threading.Lock() # Ensure thread-safe operations
def encode_image_to_base64(image):
_, img_buffer = cv2.imencode('.jpg', image)
return base64.b64encode(img_buffer).decode('utf-8')
def analyze_image_with_gemini(current_image):
if current_image is None:
return "No image available for analysis."
current_image_data = encode_image_to_base64(current_image)
message = HumanMessage(
content=[
{"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"}
]
)
try:
response = gemini_model.invoke([message])
return response.content
except Exception as e:
return f"Error processing image: {e}"
def save_crop_image(crop, track_id):
filename = f"{crop_folder}/{track_id}.jpg"
cv2.imwrite(filename, crop)
return filename
def process_crop_image(crop, track_id, responses):
response = analyze_image_with_gemini(crop)
responses.append((track_id, response))
def process_video(video_path):
cap = cv2.VideoCapture(video_path)
output_path = "output_video.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, 20.0, (1020, 500))
responses = []
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame = cv2.resize(frame, (1020, 500))
results = yolo_model.track(frame, persist=True)
if results[0].boxes is not None:
boxes = results[0].boxes.xyxy.int().cpu().tolist()
track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)
for box, track_id in zip(boxes, track_ids):
with lock: # Prevent race condition
if track_id not in processed_track_ids:
x1, y1, x2, y2 = box
crop = frame[y1:y2, x1:x2]
save_crop_image(crop, track_id)
threading.Thread(target=process_crop_image, args=(crop, track_id, responses)).start()
processed_track_ids.add(track_id)
out.write(frame)
cap.release()
out.release()
return output_path, responses
def process_and_return(video_file):
if not video_file:
return None, "No video uploaded."
video_path = "uploaded_video.mp4"
with open(video_path, "wb") as f:
f.write(video_file)
output_video_path, analysis_results = process_video(video_path)
results_text = "\n".join([f"**Track ID {track_id}:** {response}" for track_id, response in analysis_results])
return output_video_path, results_text
# Gradio Interface
with gr.Blocks() as demo:
gr.Markdown("# Bottle Label Checking using YOLO & Gemini AI")
with gr.Row():
video_input = gr.File(label="Upload a video", type="binary")
process_button = gr.Button("Process Video")
with gr.Row():
video_output = gr.Video(label="Processed Video")
download_button = gr.File(label="Download Processed Video")
analysis_results = gr.Markdown(label="AI Analysis Results")
process_button.click(
fn=process_and_return,
inputs=video_input,
outputs=[video_output, analysis_results]
)
download_button.change(
fn=lambda x: x if x else None,
inputs=video_output,
outputs=download_button
)
demo.launch()
|