Spaces:
Sleeping
Sleeping
import gradio as gr | |
import cv2 | |
import numpy as np | |
import os | |
import time | |
import threading | |
import base64 | |
from ultralytics import YOLO | |
from langchain_core.messages import HumanMessage | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
# Set up Google API Key | |
os.environ["GOOGLE_API_KEY"] = "YOUR_GOOGLE_API_KEY" # Replace with your API Key | |
gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash") | |
# Load YOLO model | |
yolo_model = YOLO("best.pt") | |
names = yolo_model.names | |
# Constants for ROI detection | |
cx1 = 491 | |
offset = 8 | |
current_date = time.strftime("%Y-%m-%d") | |
crop_folder = f"crop_{current_date}" | |
if not os.path.exists(crop_folder): | |
os.makedirs(crop_folder) | |
# Track processed IDs to avoid duplicate processing | |
processed_track_ids = set() | |
lock = threading.Lock() # Ensure thread-safe operations | |
def encode_image_to_base64(image): | |
_, img_buffer = cv2.imencode('.jpg', image) | |
return base64.b64encode(img_buffer).decode('utf-8') | |
def analyze_image_with_gemini(current_image): | |
if current_image is None: | |
return "No image available for analysis." | |
current_image_data = encode_image_to_base64(current_image) | |
message = HumanMessage( | |
content=[ | |
{"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."}, | |
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"} | |
] | |
) | |
try: | |
response = gemini_model.invoke([message]) | |
return response.content | |
except Exception as e: | |
return f"Error processing image: {e}" | |
def save_crop_image(crop, track_id): | |
filename = f"{crop_folder}/{track_id}.jpg" | |
cv2.imwrite(filename, crop) | |
return filename | |
def process_crop_image(crop, track_id, responses): | |
response = analyze_image_with_gemini(crop) | |
responses.append((track_id, response)) | |
def process_video(video_path): | |
cap = cv2.VideoCapture(video_path) | |
output_path = "output_video.mp4" | |
fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
out = cv2.VideoWriter(output_path, fourcc, 20.0, (1020, 500)) | |
responses = [] | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame = cv2.resize(frame, (1020, 500)) | |
results = yolo_model.track(frame, persist=True) | |
if results[0].boxes is not None: | |
boxes = results[0].boxes.xyxy.int().cpu().tolist() | |
track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes) | |
for box, track_id in zip(boxes, track_ids): | |
with lock: # Prevent race condition | |
if track_id not in processed_track_ids: | |
x1, y1, x2, y2 = box | |
crop = frame[y1:y2, x1:x2] | |
save_crop_image(crop, track_id) | |
threading.Thread(target=process_crop_image, args=(crop, track_id, responses)).start() | |
processed_track_ids.add(track_id) | |
out.write(frame) | |
cap.release() | |
out.release() | |
return output_path, responses | |
def process_and_return(video_file): | |
if not video_file: | |
return None, "No video uploaded." | |
video_path = "uploaded_video.mp4" | |
with open(video_path, "wb") as f: | |
f.write(video_file) | |
output_video_path, analysis_results = process_video(video_path) | |
results_text = "\n".join([f"**Track ID {track_id}:** {response}" for track_id, response in analysis_results]) | |
return output_video_path, results_text | |
# Gradio Interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# Bottle Label Checking using YOLO & Gemini AI") | |
with gr.Row(): | |
video_input = gr.File(label="Upload a video", type="binary") | |
process_button = gr.Button("Process Video") | |
with gr.Row(): | |
video_output = gr.Video(label="Processed Video") | |
download_button = gr.File(label="Download Processed Video") | |
analysis_results = gr.Markdown(label="AI Analysis Results") | |
process_button.click( | |
fn=process_and_return, | |
inputs=video_input, | |
outputs=[video_output, analysis_results] | |
) | |
download_button.change( | |
fn=lambda x: x if x else None, | |
inputs=video_output, | |
outputs=download_button | |
) | |
demo.launch() |