import streamlit as st |
import cv2 |
import torch |
from PIL import Image |
import numpy as np |
from transformers import BlipProcessor, BlipForConditionalGeneration |
from transformers import ViltProcessor, ViltForQuestionAnswering |
import time |
from io import BytesIO |
import threading |
import queue |
import os |
import tempfile |
from datetime import datetime |
st.set_page_config(layout="wide", page_title="Securade.ai Sentinel") |
def initialize_state(): |
if 'initialized' not in st.session_state: |
st.session_state.frame = None |
st.session_state.captions = [] |
st.session_state.stop_event = threading.Event() |
st.session_state.frame_queue = queue.Queue(maxsize=1) |
st.session_state.caption_queue = queue.Queue(maxsize=10) |
st.session_state.processor = None |
st.session_state.thread = None |
st.session_state.is_streaming = False |
st.session_state.initialized = True |
@st.cache_resource |
def load_processor(): |
class VideoProcessor: |
def __init__(self): |
self.caption_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large") |
self.caption_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large") |
self.vqa_processor = ViltProcessor.from_pretrained("dandelin/vilt-b32-finetuned-vqa") |
self.vqa_model = ViltForQuestionAnswering.from_pretrained("dandelin/vilt-b32-finetuned-vqa") |
if torch.cuda.is_available(): |
self.device = "cuda" |
elif torch.backends.mps.is_available(): |
self.device = "mps" |
else: |
self.device = "cpu" |
self.caption_model.to(self.device) |
self.vqa_model.to(self.device) |
def generate_caption(self, image): |
inputs = self.caption_processor(images=image, return_tensors="pt").to(self.device) |
output = self.caption_model.generate(**inputs, max_new_tokens=50) |
return self.caption_processor.decode(output[0], skip_special_tokens=True) |
def answer_question(self, image, question): |
inputs = self.vqa_processor(image, question, return_tensors="pt").to(self.device) |
outputs = self.vqa_model(**inputs) |
logits = outputs.logits |
idx = logits.argmax(-1).item() |
return self.vqa_model.config.id2label[idx] |
return VideoProcessor() |
def get_video_source(source_type, source_path=None): |
if source_type == "Webcam": |
return cv2.VideoCapture(0) |
elif source_type == "Video File" and source_path: |
temp_dir = tempfile.gettempdir() |
temp_path = os.path.join(temp_dir, 'temp_video.mp4') |
with open(temp_path, 'wb') as f: |
f.write(source_path.getvalue()) |
cap = cv2.VideoCapture(temp_path) |
if not cap.isOpened(): |
st.error("Error: Could not open video file. Please ensure it's a supported format (MP4 with H.264 encoding recommended)") |
return None |
return cap |
elif source_type == "RTSP Stream" and source_path: |
return cv2.VideoCapture(source_path) |
return None |
def process_video(stop_event, frame_queue, caption_queue, processor, source_type, source_path=None): |
cap = get_video_source(source_type, source_path) |
last_caption_time = time.time() |
while not stop_event.is_set(): |
ret, frame = cap.read() |
if not ret: |
break |
frame = cv2.resize(frame, (800, 600)) |
current_time = time.time() |
if current_time - last_caption_time >= 8.0: |
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
caption = processor.generate_caption(img) |
timestamp = datetime.now().strftime("%H:%M:%S") |
try: |
if caption_queue.full(): |
caption_queue.get_nowait() |
caption_queue.put_nowait({'timestamp': timestamp, 'caption': caption}) |
last_caption_time = current_time |
except queue.Full: |
pass |
try: |
if frame_queue.full(): |
frame_queue.get_nowait() |
frame_queue.put_nowait(frame) |
except queue.Full: |
pass |
cap.release() |
def main(): |
initialize_state() |
st.title("Securade.ai Sentinel") |
video_col, caption_col, qa_col = st.columns([0.4, 0.3, 0.3]) |
with video_col: |
st.subheader("Video Feed") |
source_type = "Video File" |
source_path = None |
uploaded_file = None |
if source_type == "Video File": |
uploaded_file = st.file_uploader("Choose a video file", type=['mp4', 'avi', 'mov']) |
if uploaded_file: |
source_path = BytesIO(uploaded_file.getvalue()) |
elif source_type == "RTSP Stream": |
source_path = st.text_input("Enter RTSP URL", placeholder="rtsp://your-camera-url") |
start_stop = st.button( |
"Start Surveillance" if not st.session_state.is_streaming else "Stop Surveillance" |
) |
video_placeholder = st.empty() |
if start_stop: |
if not st.session_state.is_streaming: |
if st.session_state.processor is None: |
st.session_state.processor = load_processor() |
st.session_state.stop_event.clear() |
st.session_state.frame_queue = queue.Queue(maxsize=1) |
st.session_state.caption_queue = queue.Queue(maxsize=10) |
st.session_state.thread = threading.Thread( |
target=process_video, |
args=( |
st.session_state.stop_event, |
st.session_state.frame_queue, |
st.session_state.caption_queue, |
st.session_state.processor, |
source_type, |
source_path |
), |
daemon=True |
) |
st.session_state.thread.start() |
st.session_state.is_streaming = True |
else: |
st.session_state.stop_event.set() |
if st.session_state.thread: |
st.session_state.thread.join(timeout=1.0) |
st.session_state.frame = None |
st.session_state.is_streaming = False |
video_placeholder.empty() |
with caption_col: |
st.subheader("Scene Analysis") |
caption_placeholder = st.empty() |
with qa_col: |
st.subheader("Visual Q&A") |
question = st.text_input("Ask a question about the scene:") |
ask_button = st.button("Ask") |
answer_placeholder = st.empty() |
if ask_button and question and st.session_state.frame is not None: |
img = Image.fromarray(cv2.cvtColor(st.session_state.frame, cv2.COLOR_BGR2RGB)) |
answer = st.session_state.processor.answer_question(img, question) |
answer_placeholder.markdown(f"**Answer:** {answer}") |
if st.session_state.is_streaming: |
placeholder = st.empty() |
while True: |
try: |
frame = st.session_state.frame_queue.get_nowait() |
st.session_state.frame = frame |
video_placeholder.image(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
while not st.session_state.caption_queue.empty(): |
new_caption = st.session_state.caption_queue.get_nowait() |
st.session_state.captions.append(new_caption) |
st.session_state.captions = st.session_state.captions[-5:] |
if st.session_state.captions: |
caption_text = "\n\n".join([ |
f"**[{cap['timestamp']}]** {cap['caption']}" |
for cap in reversed(st.session_state.captions) |
]) |
caption_placeholder.markdown(caption_text) |
except queue.Empty: |
continue |
if __name__ == "__main__": |
main() |