|
import streamlit as st |
|
import cv2 |
|
import torch |
|
from PIL import Image |
|
import numpy as np |
|
from transformers import BlipProcessor, BlipForConditionalGeneration |
|
from transformers import ViltProcessor, ViltForQuestionAnswering |
|
import time |
|
from io import BytesIO |
|
import threading |
|
import queue |
|
import os |
|
import tempfile |
|
from datetime import datetime |
|
|
|
|
|
st.set_page_config(layout="wide", page_title="Securade.ai Sentinel") |
|
|
|
def initialize_state(): |
|
if 'initialized' not in st.session_state: |
|
st.session_state.frame = None |
|
st.session_state.captions = [] |
|
st.session_state.stop_event = threading.Event() |
|
st.session_state.frame_queue = queue.Queue(maxsize=1) |
|
st.session_state.caption_queue = queue.Queue(maxsize=10) |
|
st.session_state.processor = None |
|
st.session_state.thread = None |
|
st.session_state.is_streaming = False |
|
st.session_state.initialized = True |
|
|
|
@st.cache_resource |
|
def load_processor(): |
|
class VideoProcessor: |
|
def __init__(self): |
|
self.caption_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large") |
|
self.caption_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large") |
|
self.vqa_processor = ViltProcessor.from_pretrained("dandelin/vilt-b32-finetuned-vqa") |
|
self.vqa_model = ViltForQuestionAnswering.from_pretrained("dandelin/vilt-b32-finetuned-vqa") |
|
|
|
|
|
if torch.cuda.is_available(): |
|
self.device = "cuda" |
|
elif torch.backends.mps.is_available(): |
|
self.device = "mps" |
|
else: |
|
self.device = "cpu" |
|
|
|
self.caption_model.to(self.device) |
|
self.vqa_model.to(self.device) |
|
|
|
def generate_caption(self, image): |
|
inputs = self.caption_processor(images=image, return_tensors="pt").to(self.device) |
|
output = self.caption_model.generate(**inputs, max_new_tokens=50) |
|
return self.caption_processor.decode(output[0], skip_special_tokens=True) |
|
|
|
def answer_question(self, image, question): |
|
inputs = self.vqa_processor(image, question, return_tensors="pt").to(self.device) |
|
outputs = self.vqa_model(**inputs) |
|
logits = outputs.logits |
|
idx = logits.argmax(-1).item() |
|
return self.vqa_model.config.id2label[idx] |
|
|
|
return VideoProcessor() |
|
|
|
def get_video_source(source_type, source_path=None): |
|
if source_type == "Webcam": |
|
return cv2.VideoCapture(0) |
|
elif source_type == "Video File" and source_path: |
|
|
|
temp_dir = tempfile.gettempdir() |
|
temp_path = os.path.join(temp_dir, 'temp_video.mp4') |
|
with open(temp_path, 'wb') as f: |
|
f.write(source_path.getvalue()) |
|
|
|
cap = cv2.VideoCapture(temp_path) |
|
if not cap.isOpened(): |
|
st.error("Error: Could not open video file. Please ensure it's a supported format (MP4 with H.264 encoding recommended)") |
|
return None |
|
return cap |
|
elif source_type == "RTSP Stream" and source_path: |
|
return cv2.VideoCapture(source_path) |
|
return None |
|
|
|
def process_video(stop_event, frame_queue, caption_queue, processor, source_type, source_path=None): |
|
cap = get_video_source(source_type, source_path) |
|
last_caption_time = time.time() |
|
|
|
while not stop_event.is_set(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
frame = cv2.resize(frame, (800, 600)) |
|
current_time = time.time() |
|
|
|
|
|
if current_time - last_caption_time >= 8.0: |
|
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
|
caption = processor.generate_caption(img) |
|
timestamp = datetime.now().strftime("%H:%M:%S") |
|
|
|
try: |
|
if caption_queue.full(): |
|
caption_queue.get_nowait() |
|
caption_queue.put_nowait({'timestamp': timestamp, 'caption': caption}) |
|
last_caption_time = current_time |
|
except queue.Full: |
|
pass |
|
|
|
try: |
|
if frame_queue.full(): |
|
frame_queue.get_nowait() |
|
frame_queue.put_nowait(frame) |
|
except queue.Full: |
|
pass |
|
|
|
|
|
|
|
cap.release() |
|
|
|
def main(): |
|
initialize_state() |
|
|
|
|
|
st.title("Securade.ai Sentinel") |
|
|
|
|
|
video_col, caption_col, qa_col = st.columns([0.4, 0.3, 0.3]) |
|
|
|
|
|
with video_col: |
|
st.subheader("Video Feed") |
|
|
|
|
|
source_type = "Video File" |
|
|
|
source_path = None |
|
uploaded_file = None |
|
if source_type == "Video File": |
|
uploaded_file = st.file_uploader("Choose a video file", type=['mp4', 'avi', 'mov']) |
|
if uploaded_file: |
|
source_path = BytesIO(uploaded_file.getvalue()) |
|
elif source_type == "RTSP Stream": |
|
source_path = st.text_input("Enter RTSP URL", placeholder="rtsp://your-camera-url") |
|
|
|
start_stop = st.button( |
|
"Start Surveillance" if not st.session_state.is_streaming else "Stop Surveillance" |
|
) |
|
video_placeholder = st.empty() |
|
|
|
if start_stop: |
|
if not st.session_state.is_streaming: |
|
|
|
if st.session_state.processor is None: |
|
st.session_state.processor = load_processor() |
|
st.session_state.stop_event.clear() |
|
st.session_state.frame_queue = queue.Queue(maxsize=1) |
|
st.session_state.caption_queue = queue.Queue(maxsize=10) |
|
st.session_state.thread = threading.Thread( |
|
target=process_video, |
|
args=( |
|
st.session_state.stop_event, |
|
st.session_state.frame_queue, |
|
st.session_state.caption_queue, |
|
st.session_state.processor, |
|
source_type, |
|
source_path |
|
), |
|
daemon=True |
|
) |
|
st.session_state.thread.start() |
|
st.session_state.is_streaming = True |
|
else: |
|
|
|
st.session_state.stop_event.set() |
|
if st.session_state.thread: |
|
st.session_state.thread.join(timeout=1.0) |
|
st.session_state.frame = None |
|
st.session_state.is_streaming = False |
|
video_placeholder.empty() |
|
|
|
|
|
with caption_col: |
|
st.subheader("Scene Analysis") |
|
caption_placeholder = st.empty() |
|
|
|
|
|
with qa_col: |
|
st.subheader("Visual Q&A") |
|
question = st.text_input("Ask a question about the scene:") |
|
ask_button = st.button("Ask") |
|
answer_placeholder = st.empty() |
|
|
|
if ask_button and question and st.session_state.frame is not None: |
|
img = Image.fromarray(cv2.cvtColor(st.session_state.frame, cv2.COLOR_BGR2RGB)) |
|
answer = st.session_state.processor.answer_question(img, question) |
|
answer_placeholder.markdown(f"**Answer:** {answer}") |
|
|
|
|
|
if st.session_state.is_streaming: |
|
placeholder = st.empty() |
|
while True: |
|
try: |
|
|
|
frame = st.session_state.frame_queue.get_nowait() |
|
st.session_state.frame = frame |
|
video_placeholder.image(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
|
|
|
|
|
while not st.session_state.caption_queue.empty(): |
|
new_caption = st.session_state.caption_queue.get_nowait() |
|
st.session_state.captions.append(new_caption) |
|
st.session_state.captions = st.session_state.captions[-5:] |
|
|
|
if st.session_state.captions: |
|
caption_text = "\n\n".join([ |
|
f"**[{cap['timestamp']}]** {cap['caption']}" |
|
for cap in reversed(st.session_state.captions) |
|
]) |
|
caption_placeholder.markdown(caption_text) |
|
|
|
except queue.Empty: |
|
|
|
continue |
|
|
|
if __name__ == "__main__": |
|
main() |