File size: 4,336 Bytes
54e76cb
 
 
6975a6c
 
 
 
54e76cb
6975a6c
 
 
 
22ddfde
6975a6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22ddfde
6975a6c
 
 
 
 
 
 
 
 
 
 
 
 
 
54e76cb
6975a6c
22ddfde
6975a6c
 
 
 
 
22ddfde
6975a6c
 
 
 
22ddfde
6975a6c
 
 
 
 
 
 
 
54e76cb
22ddfde
 
 
 
 
 
 
6975a6c
 
22ddfde
 
54e76cb
 
 
 
22ddfde
6975a6c
 
 
 
 
 
 
 
 
 
22ddfde
 
54e76cb
22ddfde
54e76cb
 
22ddfde
6975a6c
54e76cb
22ddfde
6975a6c
22ddfde
6975a6c
22ddfde
6975a6c
 
22ddfde
6975a6c
22ddfde
6975a6c
22ddfde
 
 
 
 
6975a6c
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import streamlit as st
import cv2
import numpy as np
import os
import time
import threading
import base64
from ultralytics import YOLO
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI

# Set up Google API Key
os.environ["GOOGLE_API_KEY"] = "AIzaSyDOBd0_yNLckwsZJrpb9-CqTHFUx0Ah3R8"  # Replace with your actual API key
gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash")

# Load YOLO model
yolo_model = YOLO("best.pt")
names = yolo_model.names

# Constants for ROI detection
cx1 = 491
offset = 8
current_date = time.strftime("%Y-%m-%d")
crop_folder = f"crop_{current_date}"
if not os.path.exists(crop_folder):
    os.makedirs(crop_folder)
processed_track_ids = set()

def encode_image_to_base64(image):
    _, img_buffer = cv2.imencode('.jpg', image)
    return base64.b64encode(img_buffer).decode('utf-8')

def analyze_image_with_gemini(current_image):
    """Send image to Gemini API for analysis."""
    if current_image is None:
        return "No image available for analysis."
    current_image_data = encode_image_to_base64(current_image)
    message = HumanMessage(
        content=[
            {"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."},
            {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"}
        ]
    )
    try:
        response = gemini_model.invoke([message])
        return response.content
    except Exception as e:
        return f"Error processing image: {e}"

def save_crop_image(crop, track_id):
    """Save cropped image of detected bottle."""
    filename = f"{crop_folder}/{track_id}.jpg"
    cv2.imwrite(filename, crop)
    return filename

def process_crop_image(crop, track_id):
    """Process image asynchronously using Gemini AI."""
    response = analyze_image_with_gemini(crop)
    st.session_state["responses"].append((track_id, response))

def process_video(uploaded_file):
    """Process uploaded video, detect objects, and create an output video."""
    if not uploaded_file:
        return None
    
    video_bytes = uploaded_file.read()
    video_path = "uploaded_video.mp4"
    with open(video_path, "wb") as f:
        f.write(video_bytes)
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        st.error("Error: Could not open video file.")
        return None

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    output_path = "output_video.mp4"
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        results = yolo_model.track(frame, persist=True)
        if results[0].boxes is not None:
            boxes = results[0].boxes.xyxy.int().cpu().tolist()
            track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)
            for box, track_id in zip(boxes, track_ids):
                if track_id not in processed_track_ids:
                    x1, y1, x2, y2 = box
                    crop = frame[y1:y2, x1:x2]
                    save_crop_image(crop, track_id)
                    threading.Thread(target=process_crop_image, args=(crop, track_id)).start()
                    processed_track_ids.add(track_id)

        out.write(frame)

    cap.release()
    out.release()

    return output_path

# Streamlit UI
st.title("Bottle Label Checking using YOLO & Gemini AI")
st.sidebar.header("Upload a Video")
uploaded_file = st.sidebar.file_uploader("Choose a video file", type=["mp4", "avi", "mov"])

if "responses" not in st.session_state:
    st.session_state["responses"] = []

if uploaded_file:
    st.sidebar.write("Processing video, please wait...")
    output_video_path = process_video(uploaded_file)

    if output_video_path:
        st.sidebar.success("Processing completed!")
        st.video(output_video_path)
    
    st.subheader("AI Analysis Results")
    for track_id, response in st.session_state["responses"]:
        st.write(f"**Track ID {track_id}:** {response}")