|
import streamlit as st |
|
import cv2 |
|
import numpy as np |
|
import os |
|
import time |
|
import threading |
|
import base64 |
|
from ultralytics import YOLO |
|
from langchain_core.messages import HumanMessage |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
|
|
|
os.environ["GOOGLE_API_KEY"] = "" |
|
gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash") |
|
|
|
|
|
yolo_model = YOLO("best.pt") |
|
names = yolo_model.names |
|
|
|
|
|
cx1 = 491 |
|
offset = 8 |
|
current_date = time.strftime("%Y-%m-%d") |
|
crop_folder = f"crop_{current_date}" |
|
if not os.path.exists(crop_folder): |
|
os.makedirs(crop_folder) |
|
processed_track_ids = set() |
|
|
|
def encode_image_to_base64(image): |
|
_, img_buffer = cv2.imencode('.jpg', image) |
|
return base64.b64encode(img_buffer).decode('utf-8') |
|
|
|
def analyze_image_with_gemini(current_image): |
|
if current_image is None: |
|
return "No image available for analysis." |
|
current_image_data = encode_image_to_base64(current_image) |
|
message = HumanMessage( |
|
content=[ |
|
{"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"} |
|
] |
|
) |
|
try: |
|
response = gemini_model.invoke([message]) |
|
return response.content |
|
except Exception as e: |
|
return f"Error processing image: {e}" |
|
|
|
def save_crop_image(crop, track_id): |
|
filename = f"{crop_folder}/{track_id}.jpg" |
|
cv2.imwrite(filename, crop) |
|
return filename |
|
|
|
def process_crop_image(crop, track_id): |
|
response = analyze_image_with_gemini(crop) |
|
st.session_state["responses"].append((track_id, response)) |
|
|
|
def process_video(uploaded_file): |
|
if not uploaded_file: |
|
return None |
|
|
|
video_bytes = uploaded_file.read() |
|
video_path = "uploaded_video.mp4" |
|
with open(video_path, "wb") as f: |
|
f.write(video_bytes) |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
output_path = "output_video.mp4" |
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
out = cv2.VideoWriter(output_path, fourcc, 20.0, (1020, 500)) |
|
|
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
frame = cv2.resize(frame, (1020, 500)) |
|
results = yolo_model.track(frame, persist=True) |
|
if results[0].boxes is not None: |
|
boxes = results[0].boxes.xyxy.int().cpu().tolist() |
|
track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes) |
|
for box, track_id in zip(boxes, track_ids): |
|
if track_id not in processed_track_ids: |
|
x1, y1, x2, y2 = box |
|
crop = frame[y1:y2, x1:x2] |
|
save_crop_image(crop, track_id) |
|
threading.Thread(target=process_crop_image, args=(crop, track_id)).start() |
|
processed_track_ids.add(track_id) |
|
out.write(frame) |
|
cap.release() |
|
out.release() |
|
return output_path |
|
|
|
st.title("Bottle Label Checking using YOLO & Gemini AI") |
|
st.sidebar.header("Upload a video") |
|
uploaded_file = st.sidebar.file_uploader("Choose a video file", type=["mp4", "avi", "mov"]) |
|
if "responses" not in st.session_state: |
|
st.session_state["responses"] = [] |
|
if uploaded_file: |
|
st.sidebar.write("Processing...") |
|
output_video_path = process_video(uploaded_file) |
|
st.sidebar.success("Processing completed!") |
|
st.video(output_video_path) |
|
st.subheader("AI Analysis Results") |
|
for track_id, response in st.session_state["responses"]: |
|
st.write(f"**Track ID {track_id}:** {response}") |
|
|