File size: 4,336 Bytes
54e76cb 6975a6c 54e76cb 6975a6c 22ddfde 6975a6c 22ddfde 6975a6c 54e76cb 6975a6c 22ddfde 6975a6c 22ddfde 6975a6c 22ddfde 6975a6c 54e76cb 22ddfde 6975a6c 22ddfde 54e76cb 22ddfde 6975a6c 22ddfde 54e76cb 22ddfde 54e76cb 22ddfde 6975a6c 54e76cb 22ddfde 6975a6c 22ddfde 6975a6c 22ddfde 6975a6c 22ddfde 6975a6c 22ddfde 6975a6c 22ddfde 6975a6c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
import streamlit as st
import cv2
import numpy as np
import os
import time
import threading
import base64
from ultralytics import YOLO
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
# Set up Google API Key
os.environ["GOOGLE_API_KEY"] = "AIzaSyDOBd0_yNLckwsZJrpb9-CqTHFUx0Ah3R8" # Replace with your actual API key
gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash")
# Load YOLO model
yolo_model = YOLO("best.pt")
names = yolo_model.names
# Constants for ROI detection
cx1 = 491
offset = 8
current_date = time.strftime("%Y-%m-%d")
crop_folder = f"crop_{current_date}"
if not os.path.exists(crop_folder):
os.makedirs(crop_folder)
processed_track_ids = set()
def encode_image_to_base64(image):
_, img_buffer = cv2.imencode('.jpg', image)
return base64.b64encode(img_buffer).decode('utf-8')
def analyze_image_with_gemini(current_image):
"""Send image to Gemini API for analysis."""
if current_image is None:
return "No image available for analysis."
current_image_data = encode_image_to_base64(current_image)
message = HumanMessage(
content=[
{"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"}
]
)
try:
response = gemini_model.invoke([message])
return response.content
except Exception as e:
return f"Error processing image: {e}"
def save_crop_image(crop, track_id):
"""Save cropped image of detected bottle."""
filename = f"{crop_folder}/{track_id}.jpg"
cv2.imwrite(filename, crop)
return filename
def process_crop_image(crop, track_id):
"""Process image asynchronously using Gemini AI."""
response = analyze_image_with_gemini(crop)
st.session_state["responses"].append((track_id, response))
def process_video(uploaded_file):
"""Process uploaded video, detect objects, and create an output video."""
if not uploaded_file:
return None
video_bytes = uploaded_file.read()
video_path = "uploaded_video.mp4"
with open(video_path, "wb") as f:
f.write(video_bytes)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
st.error("Error: Could not open video file.")
return None
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
output_path = "output_video.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = yolo_model.track(frame, persist=True)
if results[0].boxes is not None:
boxes = results[0].boxes.xyxy.int().cpu().tolist()
track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)
for box, track_id in zip(boxes, track_ids):
if track_id not in processed_track_ids:
x1, y1, x2, y2 = box
crop = frame[y1:y2, x1:x2]
save_crop_image(crop, track_id)
threading.Thread(target=process_crop_image, args=(crop, track_id)).start()
processed_track_ids.add(track_id)
out.write(frame)
cap.release()
out.release()
return output_path
# Streamlit UI
st.title("Bottle Label Checking using YOLO & Gemini AI")
st.sidebar.header("Upload a Video")
uploaded_file = st.sidebar.file_uploader("Choose a video file", type=["mp4", "avi", "mov"])
if "responses" not in st.session_state:
st.session_state["responses"] = []
if uploaded_file:
st.sidebar.write("Processing video, please wait...")
output_video_path = process_video(uploaded_file)
if output_video_path:
st.sidebar.success("Processing completed!")
st.video(output_video_path)
st.subheader("AI Analysis Results")
for track_id, response in st.session_state["responses"]:
st.write(f"**Track ID {track_id}:** {response}")
|