import streamlit as st import cv2 import numpy as np import os import time import threading import base64 from ultralytics import YOLO from langchain_core.messages import HumanMessage from langchain_google_genai import ChatGoogleGenerativeAI # Set up Google API Key os.environ["GOOGLE_API_KEY"] = "AIzaSyDOBd0_yNLckwsZJrpb9-CqTHFUx0Ah3R8" # Replace with your actual API key gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash") # Load YOLO model yolo_model = YOLO("best.pt") names = yolo_model.names # Constants for ROI detection cx1 = 491 offset = 8 current_date = time.strftime("%Y-%m-%d") crop_folder = f"crop_{current_date}" if not os.path.exists(crop_folder): os.makedirs(crop_folder) processed_track_ids = set() def encode_image_to_base64(image): _, img_buffer = cv2.imencode('.jpg', image) return base64.b64encode(img_buffer).decode('utf-8') def analyze_image_with_gemini(current_image): """Send image to Gemini API for analysis.""" if current_image is None: return "No image available for analysis." current_image_data = encode_image_to_base64(current_image) message = HumanMessage( content=[ {"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"} ] ) try: response = gemini_model.invoke([message]) return response.content except Exception as e: return f"Error processing image: {e}" def save_crop_image(crop, track_id): """Save cropped image of detected bottle.""" filename = f"{crop_folder}/{track_id}.jpg" cv2.imwrite(filename, crop) return filename def process_crop_image(crop, track_id): """Process image asynchronously using Gemini AI.""" response = analyze_image_with_gemini(crop) st.session_state["responses"].append((track_id, response)) def process_video(uploaded_file): """Process uploaded video, detect objects, and create an output video.""" if not uploaded_file: return None video_bytes = uploaded_file.read() video_path = "uploaded_video.mp4" with open(video_path, "wb") as f: f.write(video_bytes) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): st.error("Error: Could not open video file.") return None fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) output_path = "output_video.mp4" fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) while cap.isOpened(): ret, frame = cap.read() if not ret: break results = yolo_model.track(frame, persist=True) if results[0].boxes is not None: boxes = results[0].boxes.xyxy.int().cpu().tolist() track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes) for box, track_id in zip(boxes, track_ids): if track_id not in processed_track_ids: x1, y1, x2, y2 = box crop = frame[y1:y2, x1:x2] save_crop_image(crop, track_id) threading.Thread(target=process_crop_image, args=(crop, track_id)).start() processed_track_ids.add(track_id) out.write(frame) cap.release() out.release() return output_path # Streamlit UI st.title("Bottle Label Checking using YOLO & Gemini AI") st.sidebar.header("Upload a Video") uploaded_file = st.sidebar.file_uploader("Choose a video file", type=["mp4", "avi", "mov"]) if "responses" not in st.session_state: st.session_state["responses"] = [] if uploaded_file: st.sidebar.write("Processing video, please wait...") output_video_path = process_video(uploaded_file) if output_video_path: st.sidebar.success("Processing completed!") st.video(output_video_path) st.subheader("AI Analysis Results") for track_id, response in st.session_state["responses"]: st.write(f"**Track ID {track_id}:** {response}")