File size: 3,806 Bytes
54e76cb 6975a6c 54e76cb 6975a6c 0eafc36 6975a6c 54e76cb 6975a6c 54e76cb 6975a6c 54e76cb 6975a6c 54e76cb 6975a6c 54e76cb 6975a6c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
import streamlit as st
import cv2
import numpy as np
import os
import time
import threading
import base64
from ultralytics import YOLO
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
# Set up Google API Key
os.environ["GOOGLE_API_KEY"] = "AIzaSyDOBd0_yNLckwsZJrpb9-CqTHFUx0Ah3R8" # Replace with your API Key
gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash")
# Load YOLO model
yolo_model = YOLO("best.pt")
names = yolo_model.names
# Constants for ROI detection
cx1 = 491
offset = 8
current_date = time.strftime("%Y-%m-%d")
crop_folder = f"crop_{current_date}"
if not os.path.exists(crop_folder):
os.makedirs(crop_folder)
processed_track_ids = set()
def encode_image_to_base64(image):
_, img_buffer = cv2.imencode('.jpg', image)
return base64.b64encode(img_buffer).decode('utf-8')
def analyze_image_with_gemini(current_image):
if current_image is None:
return "No image available for analysis."
current_image_data = encode_image_to_base64(current_image)
message = HumanMessage(
content=[
{"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"}
]
)
try:
response = gemini_model.invoke([message])
return response.content
except Exception as e:
return f"Error processing image: {e}"
def save_crop_image(crop, track_id):
filename = f"{crop_folder}/{track_id}.jpg"
cv2.imwrite(filename, crop)
return filename
def process_crop_image(crop, track_id):
response = analyze_image_with_gemini(crop)
st.session_state["responses"].append((track_id, response))
def process_video(uploaded_file):
if not uploaded_file:
return None
video_bytes = uploaded_file.read()
video_path = "uploaded_video.mp4"
with open(video_path, "wb") as f:
f.write(video_bytes)
cap = cv2.VideoCapture(video_path)
output_path = "output_video.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, 20.0, (1020, 500))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame = cv2.resize(frame, (1020, 500))
results = yolo_model.track(frame, persist=True)
if results[0].boxes is not None:
boxes = results[0].boxes.xyxy.int().cpu().tolist()
track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)
for box, track_id in zip(boxes, track_ids):
if track_id not in processed_track_ids:
x1, y1, x2, y2 = box
crop = frame[y1:y2, x1:x2]
save_crop_image(crop, track_id)
threading.Thread(target=process_crop_image, args=(crop, track_id)).start()
processed_track_ids.add(track_id)
out.write(frame)
cap.release()
out.release()
return output_path
st.title("Bottle Label Checking using YOLO & Gemini AI")
st.sidebar.header("Upload a video")
uploaded_file = st.sidebar.file_uploader("Choose a video file", type=["mp4", "avi", "mov"])
if "responses" not in st.session_state:
st.session_state["responses"] = []
if uploaded_file:
st.sidebar.write("Processing...")
output_video_path = process_video(uploaded_file)
st.sidebar.success("Processing completed!")
st.video(output_video_path)
st.subheader("AI Analysis Results")
for track_id, response in st.session_state["responses"]:
st.write(f"**Track ID {track_id}:** {response}")
|