File size: 4,410 Bytes
ba059c9
54e76cb
 
6975a6c
 
 
 
54e76cb
6975a6c
 
 
 
ba059c9
6975a6c
 
 
 
 
 
 
 
 
 
 
 
 
915ba3e
 
6975a6c
915ba3e
 
6975a6c
 
 
 
 
 
 
915ba3e
6975a6c
 
 
 
 
 
 
 
 
 
 
 
54e76cb
6975a6c
 
 
 
 
ba059c9
6975a6c
ba059c9
 
 
54e76cb
6975a6c
 
915ba3e
 
ba059c9
 
54e76cb
 
 
 
915ba3e
22ddfde
6975a6c
 
 
 
915ba3e
6975a6c
915ba3e
 
 
 
 
ba059c9
915ba3e
22ddfde
54e76cb
915ba3e
54e76cb
 
ba059c9
 
 
 
 
54e76cb
ba059c9
 
 
 
 
 
 
 
 
046137b
ba059c9
 
 
22ddfde
ba059c9
 
 
046137b
ba059c9
 
 
22ddfde
ba059c9
 
 
 
 
 
 
 
 
 
 
 
 
046137b
ba059c9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import gradio as gr
import cv2
import numpy as np
import os
import time
import threading
import base64
from ultralytics import YOLO
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI

# Set up Google API Key
os.environ["GOOGLE_API_KEY"] = "YOUR_GOOGLE_API_KEY"  # Replace with your API Key
gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash")

# Load YOLO model
yolo_model = YOLO("best.pt")
names = yolo_model.names

# Constants for ROI detection
cx1 = 491
offset = 8
current_date = time.strftime("%Y-%m-%d")
crop_folder = f"crop_{current_date}"
if not os.path.exists(crop_folder):
    os.makedirs(crop_folder)

# Track processed IDs to avoid duplicate processing
processed_track_ids = set()
lock = threading.Lock()  # Ensure thread-safe operations

def encode_image_to_base64(image):
    _, img_buffer = cv2.imencode('.jpg', image)
    return base64.b64encode(img_buffer).decode('utf-8')

def analyze_image_with_gemini(current_image):
    if current_image is None:
        return "No image available for analysis."
    
    current_image_data = encode_image_to_base64(current_image)
    message = HumanMessage(
        content=[
            {"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."},
            {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"}
        ]
    )
    try:
        response = gemini_model.invoke([message])
        return response.content
    except Exception as e:
        return f"Error processing image: {e}"

def save_crop_image(crop, track_id):
    filename = f"{crop_folder}/{track_id}.jpg"
    cv2.imwrite(filename, crop)
    return filename

def process_crop_image(crop, track_id, responses):
    response = analyze_image_with_gemini(crop)
    responses.append((track_id, response))

def process_video(video_path):
    cap = cv2.VideoCapture(video_path)
    output_path = "output_video.mp4"
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, 20.0, (1020, 500))
    
    responses = []
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (1020, 500))
        
        results = yolo_model.track(frame, persist=True)
        if results[0].boxes is not None:
            boxes = results[0].boxes.xyxy.int().cpu().tolist()
            track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)

            for box, track_id in zip(boxes, track_ids):
                with lock:  # Prevent race condition
                    if track_id not in processed_track_ids:
                        x1, y1, x2, y2 = box
                        crop = frame[y1:y2, x1:x2]
                        save_crop_image(crop, track_id)
                        threading.Thread(target=process_crop_image, args=(crop, track_id, responses)).start()
                        processed_track_ids.add(track_id)

        out.write(frame)
    
    cap.release()
    out.release()
    return output_path, responses

def process_and_return(video_file):
    if not video_file:
        return None, "No video uploaded."

    video_path = "uploaded_video.mp4"
    with open(video_path, "wb") as f:
        f.write(video_file)

    output_video_path, analysis_results = process_video(video_path)

    results_text = "\n".join([f"**Track ID {track_id}:** {response}" for track_id, response in analysis_results])
    
    return output_video_path, results_text

# Gradio Interface
with gr.Blocks() as demo:
    gr.Markdown("# Bottle Label Checking using YOLO & Gemini AI")

    with gr.Row():
        video_input = gr.File(label="Upload a video", type="binary")
        process_button = gr.Button("Process Video")

    with gr.Row():
        video_output = gr.Video(label="Processed Video")
        download_button = gr.File(label="Download Processed Video")

    analysis_results = gr.Markdown(label="AI Analysis Results")

    process_button.click(
        fn=process_and_return,
        inputs=video_input,
        outputs=[video_output, analysis_results]
    )

    download_button.change(
        fn=lambda x: x if x else None,
        inputs=video_output,
        outputs=download_button
    )

demo.launch()