File size: 3,806 Bytes
54e76cb
 
 
6975a6c
 
 
 
54e76cb
6975a6c
 
 
 
0eafc36
6975a6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54e76cb
6975a6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54e76cb
6975a6c
 
 
54e76cb
 
 
 
 
6975a6c
 
 
 
 
 
 
 
 
 
 
 
54e76cb
 
 
6975a6c
54e76cb
6975a6c
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import streamlit as st
import cv2
import numpy as np
import os
import time
import threading
import base64
from ultralytics import YOLO
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI

# Set up Google API Key
os.environ["GOOGLE_API_KEY"] = "AIzaSyDOBd0_yNLckwsZJrpb9-CqTHFUx0Ah3R8"  # Replace with your API Key
gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash")

# Load YOLO model
yolo_model = YOLO("best.pt")
names = yolo_model.names

# Constants for ROI detection
cx1 = 491
offset = 8
current_date = time.strftime("%Y-%m-%d")
crop_folder = f"crop_{current_date}"
if not os.path.exists(crop_folder):
    os.makedirs(crop_folder)
processed_track_ids = set()

def encode_image_to_base64(image):
    _, img_buffer = cv2.imencode('.jpg', image)
    return base64.b64encode(img_buffer).decode('utf-8')

def analyze_image_with_gemini(current_image):
    if current_image is None:
        return "No image available for analysis."
    current_image_data = encode_image_to_base64(current_image)
    message = HumanMessage(
        content=[
            {"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."},
            {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"}
        ]
    )
    try:
        response = gemini_model.invoke([message])
        return response.content
    except Exception as e:
        return f"Error processing image: {e}"

def save_crop_image(crop, track_id):
    filename = f"{crop_folder}/{track_id}.jpg"
    cv2.imwrite(filename, crop)
    return filename

def process_crop_image(crop, track_id):
    response = analyze_image_with_gemini(crop)
    st.session_state["responses"].append((track_id, response))

def process_video(uploaded_file):
    if not uploaded_file:
        return None
    
    video_bytes = uploaded_file.read()
    video_path = "uploaded_video.mp4"
    with open(video_path, "wb") as f:
        f.write(video_bytes)
    
    cap = cv2.VideoCapture(video_path)
    output_path = "output_video.mp4"
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, 20.0, (1020, 500))
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (1020, 500))
        results = yolo_model.track(frame, persist=True)
        if results[0].boxes is not None:
            boxes = results[0].boxes.xyxy.int().cpu().tolist()
            track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)
            for box, track_id in zip(boxes, track_ids):
                if track_id not in processed_track_ids:
                    x1, y1, x2, y2 = box
                    crop = frame[y1:y2, x1:x2]
                    save_crop_image(crop, track_id)
                    threading.Thread(target=process_crop_image, args=(crop, track_id)).start()
                processed_track_ids.add(track_id)
        out.write(frame)
    cap.release()
    out.release()
    return output_path

st.title("Bottle Label Checking using YOLO & Gemini AI")
st.sidebar.header("Upload a video")
uploaded_file = st.sidebar.file_uploader("Choose a video file", type=["mp4", "avi", "mov"])
if "responses" not in st.session_state:
    st.session_state["responses"] = []
if uploaded_file:
    st.sidebar.write("Processing...")
    output_video_path = process_video(uploaded_file)
    st.sidebar.success("Processing completed!")
    st.video(output_video_path)
    st.subheader("AI Analysis Results")
    for track_id, response in st.session_state["responses"]:
        st.write(f"**Track ID {track_id}:** {response}")