Prathamesh1420's picture
Update app.py
6975a6c verified
raw
history blame
3.77 kB
import streamlit as st
import cv2
import numpy as np
import os
import time
import threading
import base64
from ultralytics import YOLO
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
# Set up Google API Key
os.environ["GOOGLE_API_KEY"] = "" # Replace with your API Key
gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash")
# Load YOLO model
yolo_model = YOLO("best.pt")
names = yolo_model.names
# Constants for ROI detection
cx1 = 491
offset = 8
current_date = time.strftime("%Y-%m-%d")
crop_folder = f"crop_{current_date}"
if not os.path.exists(crop_folder):
os.makedirs(crop_folder)
processed_track_ids = set()
def encode_image_to_base64(image):
_, img_buffer = cv2.imencode('.jpg', image)
return base64.b64encode(img_buffer).decode('utf-8')
def analyze_image_with_gemini(current_image):
if current_image is None:
return "No image available for analysis."
current_image_data = encode_image_to_base64(current_image)
message = HumanMessage(
content=[
{"type": "text", "text": "Analyze this image and check if the label is present on the bottle. Return results in a structured format."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{current_image_data}"}, "description": "Detected product"}
]
)
try:
response = gemini_model.invoke([message])
return response.content
except Exception as e:
return f"Error processing image: {e}"
def save_crop_image(crop, track_id):
filename = f"{crop_folder}/{track_id}.jpg"
cv2.imwrite(filename, crop)
return filename
def process_crop_image(crop, track_id):
response = analyze_image_with_gemini(crop)
st.session_state["responses"].append((track_id, response))
def process_video(uploaded_file):
if not uploaded_file:
return None
video_bytes = uploaded_file.read()
video_path = "uploaded_video.mp4"
with open(video_path, "wb") as f:
f.write(video_bytes)
cap = cv2.VideoCapture(video_path)
output_path = "output_video.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, 20.0, (1020, 500))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame = cv2.resize(frame, (1020, 500))
results = yolo_model.track(frame, persist=True)
if results[0].boxes is not None:
boxes = results[0].boxes.xyxy.int().cpu().tolist()
track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)
for box, track_id in zip(boxes, track_ids):
if track_id not in processed_track_ids:
x1, y1, x2, y2 = box
crop = frame[y1:y2, x1:x2]
save_crop_image(crop, track_id)
threading.Thread(target=process_crop_image, args=(crop, track_id)).start()
processed_track_ids.add(track_id)
out.write(frame)
cap.release()
out.release()
return output_path
st.title("Bottle Label Checking using YOLO & Gemini AI")
st.sidebar.header("Upload a video")
uploaded_file = st.sidebar.file_uploader("Choose a video file", type=["mp4", "avi", "mov"])
if "responses" not in st.session_state:
st.session_state["responses"] = []
if uploaded_file:
st.sidebar.write("Processing...")
output_video_path = process_video(uploaded_file)
st.sidebar.success("Processing completed!")
st.video(output_video_path)
st.subheader("AI Analysis Results")
for track_id, response in st.session_state["responses"]:
st.write(f"**Track ID {track_id}:** {response}")