|
""" |
|
--- |
|
title: Video Anomaly Detector |
|
emoji: 🎥 |
|
colorFrom: blue |
|
colorTo: green |
|
sdk: streamlit |
|
sdk_version: 1.31.0 |
|
app_file: app.py |
|
pinned: false |
|
license: mit |
|
--- |
|
""" |
|
import streamlit as st |
|
import os |
|
import tempfile |
|
import time |
|
from detector import VideoAnomalyDetector |
|
import cv2 |
|
from PIL import Image |
|
import numpy as np |
|
from dotenv import load_dotenv |
|
import streamlit.components.v1 as components |
|
import json |
|
import base64 |
|
from io import BytesIO |
|
import smtplib |
|
from email.mime.text import MIMEText |
|
from email.mime.multipart import MIMEMultipart |
|
from email.mime.image import MIMEImage |
|
import requests |
|
import re |
|
|
|
|
|
class NumpyEncoder(json.JSONEncoder): |
|
def default(self, obj): |
|
if isinstance(obj, np.ndarray): |
|
|
|
pil_img = Image.fromarray(obj) |
|
buffered = BytesIO() |
|
pil_img.save(buffered, format="PNG") |
|
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") |
|
return {"__ndarray__": img_str} |
|
return super(NumpyEncoder, self).default(obj) |
|
|
|
|
|
|
|
|
|
def send_email_notification(to_email, subject, body, image=None): |
|
"""Send email notification with optional image attachment""" |
|
try: |
|
|
|
smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com") |
|
smtp_port = int(os.getenv("SMTP_PORT", "1587")) |
|
smtp_username = os.getenv("SMTP_USERNAME") |
|
smtp_password = os.getenv("SMTP_PASSWORD") |
|
|
|
if not smtp_username or not smtp_password: |
|
st.warning("Email notification failed: SMTP credentials not configured. Please set SMTP_USERNAME and SMTP_PASSWORD environment variables.") |
|
return False |
|
|
|
|
|
msg = MIMEMultipart() |
|
msg['From'] = smtp_username |
|
msg['To'] = to_email |
|
msg['Subject'] = subject |
|
|
|
|
|
msg.attach(MIMEText(body, 'plain')) |
|
|
|
|
|
if image is not None: |
|
|
|
if isinstance(image, np.ndarray): |
|
pil_img = Image.fromarray(image) |
|
img_byte_arr = BytesIO() |
|
pil_img.save(img_byte_arr, format='PNG') |
|
img_data = img_byte_arr.getvalue() |
|
else: |
|
|
|
img_data = image |
|
|
|
img_attachment = MIMEImage(img_data) |
|
img_attachment.add_header('Content-Disposition', 'attachment', filename='anomaly.png') |
|
msg.attach(img_attachment) |
|
|
|
|
|
server = smtplib.SMTP(smtp_server, smtp_port) |
|
server.starttls() |
|
server.login(smtp_username, smtp_password) |
|
server.send_message(msg) |
|
server.quit() |
|
|
|
return True |
|
except Exception as e: |
|
st.warning(f"Email notification failed: {str(e)}") |
|
return False |
|
|
|
def send_whatsapp_notification(to_number, message): |
|
"""Send WhatsApp notification using WhatsApp Business API""" |
|
try: |
|
|
|
whatsapp_api_key = os.getenv("WHATSAPP_API_KEY") |
|
whatsapp_phone_id = os.getenv("WHATSAPP_PHONE_ID") |
|
|
|
if not whatsapp_api_key or not whatsapp_phone_id: |
|
st.warning("WhatsApp notification failed: API credentials not configured. Please set WHATSAPP_API_KEY and WHATSAPP_PHONE_ID environment variables.") |
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
url = f"https://graph.facebook.com/v17.0/{whatsapp_phone_id}/messages" |
|
headers = { |
|
"Authorization": f"Bearer {whatsapp_api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
data = { |
|
"messaging_product": "whatsapp", |
|
"to": to_number, |
|
"type": "text", |
|
"text": { |
|
"body": message |
|
} |
|
} |
|
|
|
|
|
print(f"Would send WhatsApp message to {to_number}: {message}") |
|
|
|
|
|
|
|
|
|
|
|
return True |
|
except Exception as e: |
|
st.warning(f"WhatsApp notification failed: {str(e)}") |
|
return False |
|
|
|
|
|
def validate_email(email): |
|
"""Validate email format""" |
|
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' |
|
return re.match(pattern, email) is not None |
|
|
|
def validate_phone(phone): |
|
"""Validate phone number format (should include country code)""" |
|
pattern = r'^\+\d{1,3}\d{6,14}$' |
|
return re.match(pattern, phone) is not None |
|
|
|
def send_notification(notification_type, contact, message, image=None): |
|
"""Send notification based on type""" |
|
if notification_type == "email": |
|
if validate_email(contact): |
|
return send_email_notification( |
|
contact, |
|
"Anomaly Detected - Video Anomaly Detector", |
|
message, |
|
image |
|
) |
|
else: |
|
st.warning("Invalid email format. Notification not sent.") |
|
return False |
|
elif notification_type == "whatsapp": |
|
if validate_phone(contact): |
|
return send_whatsapp_notification(contact, message) |
|
else: |
|
st.warning("Invalid phone number format. Please include country code (e.g., +1234567890). Notification not sent.") |
|
return False |
|
return False |
|
|
|
|
|
def display_single_result(result): |
|
"""Display a single analysis result""" |
|
if isinstance(result, dict): |
|
|
|
if "anomaly_detected" in result: |
|
|
|
if "frame" in result: |
|
col1, col2 = st.columns([1, 2]) |
|
with col1: |
|
st.image(result["frame"], caption="Captured Frame", use_column_width=True) |
|
|
|
with col2: |
|
anomaly_detected = result["anomaly_detected"] |
|
|
|
|
|
html_content = f""" |
|
<div class='result-details'> |
|
""" |
|
|
|
|
|
if "confidence" in result: |
|
html_content += f"<p><strong>Confidence:</strong> {result['confidence']}%</p>" |
|
|
|
|
|
analysis_text = None |
|
for key in ["analysis", "text", "description"]: |
|
if key in result and result[key]: |
|
analysis_text = result[key] |
|
break |
|
|
|
if analysis_text: |
|
html_content += f"<p><strong>Analysis:</strong> {analysis_text}</p>" |
|
|
|
|
|
if "anomaly_type" in result and result["anomaly_type"]: |
|
html_content += f"<p><strong>Anomaly Type:</strong> {result['anomaly_type']}</p>" |
|
|
|
|
|
html_content += "</div>" |
|
|
|
|
|
st.markdown(html_content, unsafe_allow_html=True) |
|
else: |
|
|
|
|
|
html_content = "<div class='result-details'>" |
|
|
|
|
|
if "confidence" in result: |
|
html_content += f"<p><strong>Confidence:</strong> {result['confidence']}%</p>" |
|
|
|
|
|
analysis_text = None |
|
for key in ["analysis", "text", "description"]: |
|
if key in result and result[key]: |
|
analysis_text = result[key] |
|
break |
|
|
|
if analysis_text: |
|
html_content += f"<p><strong>Analysis:</strong> {analysis_text}</p>" |
|
|
|
|
|
if "anomaly_type" in result and result["anomaly_type"]: |
|
html_content += f"<p><strong>Anomaly Type:</strong> {result['anomaly_type']}</p>" |
|
|
|
|
|
html_content += "</div>" |
|
|
|
|
|
st.markdown(html_content, unsafe_allow_html=True) |
|
else: |
|
|
|
st.json(result) |
|
else: |
|
|
|
st.write(result) |
|
|
|
def display_results(results, analysis_depth): |
|
"""Display analysis results based on analysis depth""" |
|
if not results: |
|
st.warning("No results to display") |
|
return |
|
|
|
|
|
st.markdown("<h2 class='section-header'>📊 Analysis Results</h2>", unsafe_allow_html=True) |
|
|
|
|
|
if analysis_depth == "granular": |
|
|
|
anomaly_frames = sum(1 for r in results if r.get("anomaly_detected", False)) |
|
total_frames = len(results) |
|
|
|
if anomaly_frames > 0: |
|
|
|
anomaly_types = set(r.get("anomaly_type", "Unknown") for r in results if r.get("anomaly_detected", False)) |
|
anomaly_types_str = ", ".join(anomaly_types) |
|
|
|
st.markdown( |
|
f""" |
|
<div class='result-box anomaly'> |
|
<h3>⚠️ ANOMALY DETECTED</h3> |
|
<p><strong>Frames with anomalies:</strong> {anomaly_frames} out of {total_frames}</p> |
|
<p><strong>Anomaly types:</strong> {anomaly_types_str}</p> |
|
</div> |
|
""", |
|
unsafe_allow_html=True |
|
) |
|
else: |
|
st.markdown( |
|
""" |
|
<div class='result-box normal'> |
|
<h3>✅ No Anomalies Detected</h3> |
|
<p>No anomalies were detected in any of the analyzed frames.</p> |
|
</div> |
|
""", |
|
unsafe_allow_html=True |
|
) |
|
else: |
|
|
|
if results.get("anomaly_detected", False): |
|
anomaly_type = results.get("anomaly_type", "Unknown") |
|
st.markdown( |
|
f""" |
|
<div class='result-box anomaly'> |
|
<h3>⚠️ ANOMALY DETECTED</h3> |
|
<p><strong>Anomaly type:</strong> {anomaly_type}</p> |
|
</div> |
|
""", |
|
unsafe_allow_html=True |
|
) |
|
else: |
|
st.markdown( |
|
""" |
|
<div class='result-box normal'> |
|
<h3>✅ No Anomalies Detected</h3> |
|
<p>No anomalies were detected in the video.</p> |
|
</div> |
|
""", |
|
unsafe_allow_html=True |
|
) |
|
|
|
|
|
if analysis_depth == "granular": |
|
|
|
st.markdown("<h3 class='sub-header'>🔍 Frame-by-Frame Analysis</h3>", unsafe_allow_html=True) |
|
|
|
|
|
for i, result in enumerate(results): |
|
with st.expander(f"Frame {i+1} - {'⚠️ ANOMALY' if result.get('anomaly_detected', False) else '✅ Normal'}"): |
|
display_single_result(result) |
|
|
|
else: |
|
st.markdown("<h3 class='sub-header'>🔍 Overall Video Analysis</h3>", unsafe_allow_html=True) |
|
display_single_result(results) |
|
|
|
|
|
if "frames" in results and results["frames"]: |
|
st.markdown("<h3 class='sub-header'>🖼️ Key Frames</h3>", unsafe_allow_html=True) |
|
|
|
|
|
num_frames = len(results["frames"]) |
|
cols = st.columns(min(3, num_frames)) |
|
|
|
|
|
for i, (col, frame) in enumerate(zip(cols, results["frames"])): |
|
with col: |
|
st.image(frame, caption=f"Key Frame {i+1}", use_column_width=True) |
|
|
|
|
|
if 'stop_requested' not in st.session_state: |
|
st.session_state.stop_requested = False |
|
|
|
def request_stop(): |
|
st.session_state.stop_requested = True |
|
|
|
|
|
try: |
|
from phi4_detector import Phi4AnomalyDetector |
|
PHI4_AVAILABLE = True |
|
except ImportError: |
|
PHI4_AVAILABLE = False |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
st.set_page_config( |
|
page_title="Video Anomaly Detector", |
|
page_icon="🔍", |
|
layout="wide" |
|
) |
|
|
|
|
|
st.markdown(""" |
|
<style> |
|
@import url('https://fonts.googleapis.com/css2?family=Poppins:wght@300;400;500;600;700&display=swap'); |
|
|
|
html, body, [class*="css"] { |
|
font-family: 'Poppins', sans-serif; |
|
} |
|
|
|
.main-header { |
|
font-size: 2.8rem; |
|
font-weight: 700; |
|
color: #5046E5; |
|
text-align: center; |
|
margin-bottom: 1rem; |
|
padding-top: 1.5rem; |
|
} |
|
|
|
.sub-header { |
|
font-size: 1.8rem; |
|
font-weight: 600; |
|
color: #36B37E; |
|
margin-bottom: 1.2rem; |
|
} |
|
|
|
.section-header { |
|
font-size: 2rem; |
|
font-weight: 600; |
|
color: #5046E5; |
|
margin-top: 2rem; |
|
margin-bottom: 1rem; |
|
} |
|
|
|
.result-box { |
|
padding: 15px; |
|
border-radius: 10px; |
|
margin-bottom: 15px; |
|
} |
|
|
|
.result-box.anomaly { |
|
background-color: rgba(255, 76, 76, 0.1); |
|
border: 1px solid rgba(255, 76, 76, 0.3); |
|
} |
|
|
|
.result-box.normal { |
|
background-color: rgba(54, 179, 126, 0.1); |
|
border: 1px solid rgba(54, 179, 126, 0.3); |
|
} |
|
|
|
.result-box h3 { |
|
margin-top: 0; |
|
margin-bottom: 10px; |
|
} |
|
|
|
.result-box.anomaly h3 { |
|
color: #FF4C4C; |
|
} |
|
|
|
.result-box.normal h3 { |
|
color: #36B37E; |
|
} |
|
|
|
.result-container { |
|
background-color: #f8f9fa; |
|
padding: 1.8rem; |
|
border-radius: 12px; |
|
margin-bottom: 1.5rem; |
|
border: 1px solid #e9ecef; |
|
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.05); |
|
} |
|
|
|
.stProgress > div > div > div { |
|
background-color: #5046E5; |
|
} |
|
|
|
.stButton>button { |
|
background-color: #5046E5; |
|
color: white; |
|
font-weight: 600; |
|
border-radius: 8px; |
|
padding: 0.5rem 1rem; |
|
border: none; |
|
} |
|
|
|
.stButton>button:hover { |
|
background-color: #4038C7; |
|
} |
|
|
|
.stSelectbox>div>div { |
|
background-color: #f8f9fa; |
|
border-radius: 8px; |
|
} |
|
|
|
.stRadio>div { |
|
padding: 10px; |
|
background-color: #f8f9fa; |
|
border-radius: 8px; |
|
} |
|
|
|
.stExpander>div { |
|
border-radius: 8px; |
|
border: 1px solid #e9ecef; |
|
} |
|
|
|
.model-info { |
|
font-size: 0.9rem; |
|
color: #6c757d; |
|
font-style: italic; |
|
margin-top: 0.5rem; |
|
} |
|
|
|
.icon-text { |
|
display: flex; |
|
align-items: center; |
|
gap: 0.5rem; |
|
} |
|
|
|
.footer { |
|
text-align: center; |
|
color: #6c757d; |
|
font-size: 0.9rem; |
|
margin-top: 2rem; |
|
} |
|
|
|
.anomaly-true { |
|
color: #dc3545; |
|
font-weight: bold; |
|
} |
|
|
|
.anomaly-false { |
|
color: #28a745; |
|
font-weight: bold; |
|
} |
|
|
|
.anomaly-type { |
|
font-weight: bold; |
|
margin-top: 0.5rem; |
|
} |
|
|
|
.anomaly-box { |
|
padding: 1rem; |
|
border-radius: 8px; |
|
margin-bottom: 1rem; |
|
} |
|
|
|
.anomaly-box-true { |
|
background-color: rgba(220, 53, 69, 0.1); |
|
border: 1px solid rgba(220, 53, 69, 0.3); |
|
} |
|
|
|
.anomaly-box-false { |
|
background-color: rgba(40, 167, 69, 0.1); |
|
border: 1px solid rgba(40, 167, 69, 0.3); |
|
} |
|
|
|
.instructions-container { |
|
font-size: 1.1rem; |
|
line-height: 1.8; |
|
} |
|
|
|
.instructions-container ol { |
|
padding-left: 1.5rem; |
|
} |
|
|
|
.instructions-container ul { |
|
padding-left: 1.5rem; |
|
} |
|
|
|
.instructions-container li { |
|
margin-bottom: 0.5rem; |
|
} |
|
|
|
.live-stream-container { |
|
border: 2px solid #5046E5; |
|
border-radius: 12px; |
|
padding: 1rem; |
|
margin-top: 1rem; |
|
} |
|
|
|
.result-details { |
|
padding: 15px; |
|
border-radius: 10px; |
|
margin-bottom: 15px; |
|
background-color: rgba(80, 70, 229, 0.05); |
|
border: 1px solid rgba(80, 70, 229, 0.2); |
|
} |
|
|
|
.result-details p { |
|
margin-bottom: 10px; |
|
} |
|
|
|
.result-details strong { |
|
color: #5046E5; |
|
} |
|
|
|
.video-preview-container { |
|
border: 1px solid #e9ecef; |
|
border-radius: 10px; |
|
padding: 15px; |
|
margin-bottom: 20px; |
|
background-color: rgba(80, 70, 229, 0.03); |
|
} |
|
|
|
.video-preview-container video { |
|
width: 100%; |
|
border-radius: 8px; |
|
margin-bottom: 10px; |
|
} |
|
|
|
.video-info { |
|
display: flex; |
|
justify-content: space-between; |
|
margin-top: 10px; |
|
} |
|
|
|
.video-info-item { |
|
text-align: center; |
|
padding: 8px; |
|
background-color: #f8f9fa; |
|
border-radius: 5px; |
|
flex: 1; |
|
margin: 0 5px; |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
st.markdown("<h1 class='main-header'>🔍 Video Anomaly Detector</h1>", unsafe_allow_html=True) |
|
st.markdown("<p style='text-align: center; font-size: 1.2rem; margin-bottom: 2rem;'>Analyze video frames for anomalies using advanced AI models</p>", unsafe_allow_html=True) |
|
|
|
|
|
with st.sidebar: |
|
st.markdown("<h2 class='sub-header'>⚙️ Settings</h2>", unsafe_allow_html=True) |
|
|
|
|
|
st.markdown("<div class='icon-text'><span>📹</span><span>Input Source</span></div>", unsafe_allow_html=True) |
|
input_source = st.radio( |
|
"", |
|
["Video File", "Live Stream"], |
|
index=0, |
|
help="Select the input source for analysis" |
|
) |
|
|
|
|
|
if input_source == "Video File": |
|
st.markdown("<div class='icon-text'><span>📁</span><span>Upload Video</span></div>", unsafe_allow_html=True) |
|
|
|
|
|
sample_files = [] |
|
for file in os.listdir(): |
|
if file.endswith('.mp4'): |
|
sample_files.append(file) |
|
|
|
|
|
if sample_files: |
|
st.info(f"Sample videos available: {', '.join(sample_files)}") |
|
use_sample = st.checkbox("Use a sample video instead of uploading") |
|
|
|
if use_sample: |
|
selected_sample = st.selectbox("Select a sample video", sample_files) |
|
uploaded_file = selected_sample |
|
|
|
|
|
st.markdown("<h3 class='sub-header'>🎬 Video Preview</h3>", unsafe_allow_html=True) |
|
|
|
|
|
st.markdown("<div class='video-preview-container'>", unsafe_allow_html=True) |
|
|
|
|
|
video_path = os.path.join(os.getcwd(), selected_sample) |
|
|
|
|
|
st.video(video_path) |
|
|
|
|
|
try: |
|
cap = cv2.VideoCapture(video_path) |
|
if cap.isOpened(): |
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
duration = frame_count / fps if fps > 0 else 0 |
|
|
|
|
|
minutes = int(duration // 60) |
|
seconds = int(duration % 60) |
|
duration_str = f"{minutes}:{seconds:02d}" |
|
|
|
|
|
cap.release() |
|
except Exception as e: |
|
st.warning(f"Could not read video properties: {str(e)}") |
|
|
|
st.markdown("</div>", unsafe_allow_html=True) |
|
else: |
|
uploaded_file = st.file_uploader("", type=["mp4", "avi", "mov", "mkv"]) |
|
else: |
|
uploaded_file = st.file_uploader("", type=["mp4", "avi", "mov", "mkv"]) |
|
|
|
stream_source = None |
|
else: |
|
st.markdown("<div class='icon-text'><span>🔗</span><span>Stream Source</span></div>", unsafe_allow_html=True) |
|
stream_options = ["Webcam", "IP Camera / RTSP Stream"] |
|
stream_type = st.selectbox("", stream_options, index=0) |
|
|
|
if stream_type == "Webcam": |
|
stream_source = 0 |
|
else: |
|
stream_source = st.text_input("Stream URL", placeholder="rtsp://username:password@ip_address:port/path") |
|
|
|
|
|
st.markdown("<div class='icon-text'><span>🔢</span><span>Frame Capture Settings</span></div>", unsafe_allow_html=True) |
|
|
|
capture_mode = st.radio( |
|
"Capture Mode", |
|
["Frame Count Limit", "Time Interval (Continuous)"], |
|
index=0, |
|
help="Choose how to capture frames from the live stream" |
|
) |
|
|
|
if capture_mode == "Frame Count Limit": |
|
max_frames = st.number_input( |
|
"Maximum Frames", |
|
min_value=1, |
|
max_value=100, |
|
value=30, |
|
help="Maximum number of frames to process from the live stream" |
|
) |
|
time_interval = None |
|
else: |
|
max_frames = None |
|
time_interval = st.number_input( |
|
"Seconds Between Captures", |
|
min_value=1, |
|
max_value=60, |
|
value=5, |
|
help="Capture one frame every X seconds indefinitely" |
|
) |
|
st.info("⚠️ In time interval mode, processing will continue indefinitely. Use the Stop button to end capture.") |
|
|
|
uploaded_file = None |
|
|
|
|
|
st.markdown("<div class='icon-text'><span>🧠</span><span>AI Model</span></div>", unsafe_allow_html=True) |
|
|
|
|
|
model_options = ["GPT-4o", "GPT-4o-mini"] |
|
if PHI4_AVAILABLE: |
|
model_options.append("Phi-4") |
|
model_options.append("Phi-3 (Coming Soon)") |
|
|
|
model = st.selectbox( |
|
"", |
|
model_options, |
|
index=0, |
|
help="Select the AI model to use for analysis" |
|
) |
|
|
|
|
|
if model == "GPT-4o": |
|
st.markdown("<div class='model-info'>Most powerful model with highest accuracy</div>", unsafe_allow_html=True) |
|
model_value = "gpt-4o" |
|
use_phi4 = False |
|
elif model == "GPT-4o-mini": |
|
st.markdown("<div class='model-info'>Faster and more cost-effective</div>", unsafe_allow_html=True) |
|
model_value = "gpt-4o-mini" |
|
use_phi4 = False |
|
elif model == "Phi-4": |
|
st.markdown("<div class='model-info'>Microsoft's multimodal model, runs locally</div>", unsafe_allow_html=True) |
|
model_value = "phi-4" |
|
use_phi4 = True |
|
else: |
|
st.markdown("<div class='model-info'>Not yet implemented</div>", unsafe_allow_html=True) |
|
model_value = "gpt-4o" |
|
use_phi4 = False |
|
st.warning("Phi-3 support is coming soon. Using GPT-4o instead.") |
|
|
|
|
|
st.markdown("<div class='icon-text'><span>⏭️</span><span>Frame Skip Rate</span></div>", unsafe_allow_html=True) |
|
skip_frames = st.number_input( |
|
"", |
|
min_value=0, |
|
max_value=100, |
|
value=5, |
|
help="Higher values process fewer frames, making analysis faster but potentially less accurate" |
|
) |
|
|
|
|
|
st.markdown("<div class='icon-text'><span>🔬</span><span>Analysis Depth</span></div>", unsafe_allow_html=True) |
|
analysis_depth = st.radio( |
|
"", |
|
["Granular (Frame by Frame)", "Cumulative (Overall)"], |
|
index=0, |
|
help="Granular provides analysis for each frame, Cumulative gives an overall assessment" |
|
) |
|
|
|
|
|
analysis_depth_value = "granular" if analysis_depth == "Granular (Frame by Frame)" else "cumulative" |
|
|
|
|
|
st.markdown("<div class='icon-text'><span>🔔</span><span>Notifications</span></div>", unsafe_allow_html=True) |
|
enable_notifications = st.checkbox("Enable notifications for anomaly detection", value=False) |
|
|
|
if enable_notifications: |
|
notification_type = st.radio( |
|
"Notification Method", |
|
["Email", "WhatsApp"], |
|
index=0, |
|
help="Select how you want to be notified when anomalies are detected" |
|
) |
|
|
|
if notification_type == "Email": |
|
notification_email = st.text_input( |
|
"Email Address", |
|
placeholder="[email protected]", |
|
help="Enter the email address to receive notifications" |
|
) |
|
st.session_state.notification_contact = notification_email if notification_email else None |
|
st.session_state.notification_type = "email" if notification_email else None |
|
|
|
else: |
|
notification_phone = st.text_input( |
|
"WhatsApp Number", |
|
placeholder="+1234567890 (include country code)", |
|
help="Enter your WhatsApp number with country code" |
|
) |
|
st.session_state.notification_contact = notification_phone if notification_phone else None |
|
st.session_state.notification_type = "whatsapp" if notification_phone else None |
|
else: |
|
st.session_state.notification_type = None |
|
st.session_state.notification_contact = None |
|
|
|
|
|
st.markdown("<div class='icon-text'><span>💬</span><span>Anomaly Description</span></div>", unsafe_allow_html=True) |
|
prompt = st.text_area( |
|
"", |
|
value="Analyze this frame and describe if there are any unusual or anomalous activities or objects. If you detect anything unusual, explain what it is and why it might be considered an anomaly.", |
|
height=150, |
|
help="Describe what kind of anomaly to look for" |
|
) |
|
|
|
|
|
if not use_phi4: |
|
st.markdown("<div class='icon-text'><span>🔑</span><span>OpenAI API Key</span></div>", unsafe_allow_html=True) |
|
default_api_key = os.getenv("OPENAI_API_KEY", "") |
|
api_key = st.text_input( |
|
"", |
|
value=default_api_key, |
|
type="password", |
|
help="Your OpenAI API key with access to the selected model" |
|
) |
|
else: |
|
|
|
api_key = "not-needed-for-phi4" |
|
|
|
|
|
submit_button = st.button("🚀 Analyze Video") |
|
|
|
|
|
if input_source == "Video File" and uploaded_file is not None: |
|
|
|
st.markdown("<h2 class='sub-header'>📊 Video Information</h2>", unsafe_allow_html=True) |
|
|
|
|
|
if isinstance(uploaded_file, str) and os.path.exists(uploaded_file): |
|
|
|
video_path = uploaded_file |
|
st.success(f"Using sample video: {os.path.basename(video_path)}") |
|
else: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file: |
|
tmp_file.write(uploaded_file.getvalue()) |
|
video_path = tmp_file.name |
|
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
|
|
|
|
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
|
|
if fps <= 0: |
|
|
|
if isinstance(video_path, str) and os.path.exists(video_path): |
|
|
|
fps = 30.0 |
|
st.warning(f"Could not determine frame rate for video file: {os.path.basename(video_path)}. Using default value of 30 FPS.") |
|
else: |
|
|
|
fps = 30.0 |
|
st.info("Using default frame rate of 30 FPS for live stream.") |
|
|
|
duration = frame_count / fps |
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
cap.release() |
|
|
|
|
|
col1, col2, col3 = st.columns(3) |
|
with col1: |
|
st.markdown("<div style='text-align: center;'>⏱️</div>", unsafe_allow_html=True) |
|
st.metric("Duration", f"{duration:.2f} seconds") |
|
with col2: |
|
st.markdown("<div style='text-align: center;'>🎞️</div>", unsafe_allow_html=True) |
|
st.metric("Total Frames", frame_count) |
|
with col3: |
|
st.markdown("<div style='text-align: center;'>📐</div>", unsafe_allow_html=True) |
|
st.metric("Resolution", f"{width}x{height}") |
|
|
|
|
|
estimated_frames = frame_count // (skip_frames + 1) + 1 |
|
st.info(f"With current settings, approximately {estimated_frames} frames will be processed.") |
|
|
|
|
|
elif input_source == "Live Stream" and stream_source is not None: |
|
|
|
st.markdown("<h2 class='sub-header'>📊 Live Stream Information</h2>", unsafe_allow_html=True) |
|
|
|
|
|
if stream_source == 0: |
|
st.info("Using default webcam as the stream source.") |
|
else: |
|
st.info(f"Using stream URL: {stream_source}") |
|
|
|
|
|
st.info(f"Will process up to {max_frames} frames with a skip rate of {skip_frames}.") |
|
|
|
|
|
st.markdown("<div class='live-stream-container'><p style='text-align: center;'>Live stream preview will appear here during processing</p></div>", unsafe_allow_html=True) |
|
|
|
|
|
if submit_button: |
|
if not api_key and not use_phi4: |
|
st.error("⚠️ Please enter your OpenAI API key") |
|
elif input_source == "Video File" and uploaded_file is None: |
|
st.error("⚠️ Please upload a video file") |
|
elif input_source == "Live Stream" and stream_source is None: |
|
st.error("⚠️ Please provide a valid stream source") |
|
else: |
|
try: |
|
|
|
if use_phi4: |
|
with st.spinner("Loading Phi-4 model... This may take a while if downloading for the first time."): |
|
detector = Phi4AnomalyDetector() |
|
st.success("Phi-4 model loaded successfully!") |
|
else: |
|
detector = VideoAnomalyDetector(api_key, model_value) |
|
|
|
|
|
st.markdown("<h2 class='sub-header'>⏳ Processing Video</h2>", unsafe_allow_html=True) |
|
progress_bar = st.progress(0) |
|
status_text = st.empty() |
|
|
|
|
|
def update_progress(current, total): |
|
if total == -1: |
|
|
|
status_text.text(f"Processed {current} frames (continuous mode)...") |
|
else: |
|
|
|
if total > 0: |
|
progress = current / total |
|
progress_bar.progress(progress) |
|
else: |
|
|
|
progress_bar.progress(0) |
|
status_text.text(f"Processing frame {current+1} of {total if total > 0 else '?'}...") |
|
|
|
|
|
start_time = time.time() |
|
|
|
if input_source == "Video File": |
|
results = detector.process_video(video_path, skip_frames, prompt, analysis_depth_value, update_progress) |
|
print(f"Results: {results}") |
|
|
|
|
|
else: |
|
if capture_mode == "Frame Count Limit": |
|
|
|
results = detector.process_live_stream(stream_source, skip_frames, prompt, analysis_depth_value, max_frames, update_progress) |
|
|
|
|
|
else: |
|
|
|
results_container = st.empty() |
|
|
|
|
|
st.session_state.stop_requested = False |
|
|
|
|
|
st.button("Stop Capture", key="stop_continuous_main", on_click=request_stop) |
|
|
|
|
|
results_generator = detector.process_live_stream( |
|
stream_source, skip_frames, prompt, analysis_depth_value, |
|
None, update_progress, time_interval |
|
) |
|
|
|
|
|
all_results = [] |
|
frame_counter = 0 |
|
|
|
try: |
|
|
|
for result in results_generator: |
|
|
|
if st.session_state.stop_requested: |
|
st.success("Capture stopped by user") |
|
break |
|
|
|
frame_counter += 1 |
|
all_results.append(result) |
|
|
|
|
|
with results_container.container(): |
|
if analysis_depth_value == "granular": |
|
|
|
st.markdown(f"### Frame {frame_counter}") |
|
display_single_result(result) |
|
|
|
|
|
if result.get("anomaly_detected", False) and st.session_state.notification_type and st.session_state.notification_contact: |
|
|
|
anomaly_type = result.get("anomaly_type", "Unknown") |
|
anomaly_message = f"Anomaly detected in live stream (Frame {frame_counter}).\n" |
|
anomaly_message += f"Anomaly type: {anomaly_type}\n\n" |
|
|
|
|
|
analysis_text = None |
|
for key in ["analysis", "text", "description"]: |
|
if key in result and result[key]: |
|
analysis_text = result[key] |
|
break |
|
|
|
if analysis_text: |
|
anomaly_message += f"Analysis: {analysis_text[:500]}..." |
|
|
|
|
|
with st.spinner("Sending notification about detected anomaly..."): |
|
notification_sent = send_notification( |
|
st.session_state.notification_type, |
|
st.session_state.notification_contact, |
|
anomaly_message, |
|
result.get("frame") |
|
) |
|
|
|
if notification_sent: |
|
st.success(f"Notification sent to {st.session_state.notification_contact} via {st.session_state.notification_type.capitalize()}") |
|
else: |
|
st.error(f"Failed to send notification. Please check your {st.session_state.notification_type} settings.") |
|
else: |
|
|
|
st.markdown(f"### Cumulative Analysis (Updated)") |
|
display_single_result(result) |
|
|
|
|
|
if result.get("anomaly_detected", False) and st.session_state.notification_type and st.session_state.notification_contact: |
|
|
|
anomaly_type = result.get("anomaly_type", "Unknown") |
|
anomaly_message = f"Anomaly detected in live stream (Cumulative Analysis).\n" |
|
anomaly_message += f"Anomaly type: {anomaly_type}\n\n" |
|
|
|
|
|
analysis_text = None |
|
for key in ["analysis", "text", "description"]: |
|
if key in result and result[key]: |
|
analysis_text = result[key] |
|
break |
|
|
|
if analysis_text: |
|
anomaly_message += f"Analysis: {analysis_text[:500]}..." |
|
|
|
|
|
anomaly_image = None |
|
if "frames" in result and result["frames"]: |
|
anomaly_image = result["frames"][0] |
|
|
|
|
|
with st.spinner("Sending notification about detected anomaly..."): |
|
notification_sent = send_notification( |
|
st.session_state.notification_type, |
|
st.session_state.notification_contact, |
|
anomaly_message, |
|
anomaly_image |
|
) |
|
|
|
if notification_sent: |
|
st.success(f"Notification sent to {st.session_state.notification_contact} via {st.session_state.notification_type.capitalize()}") |
|
else: |
|
st.error(f"Failed to send notification. Please check your {st.session_state.notification_type} settings.") |
|
|
|
|
|
time.sleep(0.1) |
|
except StopIteration: |
|
if not st.session_state.stop_requested: |
|
st.info("Stream ended") |
|
|
|
|
|
if analysis_depth_value == "granular": |
|
results = all_results |
|
else: |
|
results = all_results[-1] if all_results else None |
|
|
|
end_time = time.time() |
|
|
|
|
|
processing_time = end_time - start_time |
|
st.success(f"Processing completed in {processing_time:.2f} seconds") |
|
|
|
|
|
if st.session_state.notification_type and st.session_state.notification_contact: |
|
|
|
anomalies_detected = False |
|
anomaly_image = None |
|
anomaly_message = "" |
|
|
|
if analysis_depth_value == "granular": |
|
|
|
anomaly_frames = [r for r in results if r.get("anomaly_detected", False)] |
|
if anomaly_frames: |
|
anomalies_detected = True |
|
|
|
first_anomaly = anomaly_frames[0] |
|
anomaly_image = first_anomaly.get("frame") |
|
|
|
|
|
anomaly_types = set(r.get("anomaly_type", "Unknown") for r in anomaly_frames) |
|
anomaly_message = f"Anomaly detected in {len(anomaly_frames)} out of {len(results)} frames.\n" |
|
anomaly_message += f"Anomaly types: {', '.join(anomaly_types)}\n\n" |
|
|
|
|
|
analysis_text = None |
|
for key in ["analysis", "text", "description"]: |
|
if key in first_anomaly and first_anomaly[key]: |
|
analysis_text = first_anomaly[key] |
|
break |
|
|
|
if analysis_text: |
|
anomaly_message += f"Analysis of first anomaly: {analysis_text[:500]}..." |
|
else: |
|
|
|
if results.get("anomaly_detected", False): |
|
anomalies_detected = True |
|
|
|
|
|
if "frames" in results and results["frames"]: |
|
anomaly_image = results["frames"][0] |
|
|
|
|
|
anomaly_type = results.get("anomaly_type", "Unknown") |
|
anomaly_message = f"Anomaly detected in video analysis.\n" |
|
anomaly_message += f"Anomaly type: {anomaly_type}\n\n" |
|
|
|
|
|
analysis_text = None |
|
for key in ["analysis", "text", "description"]: |
|
if key in results and results[key]: |
|
analysis_text = results[key] |
|
break |
|
|
|
if analysis_text: |
|
anomaly_message += f"Analysis: {analysis_text[:500]}..." |
|
|
|
|
|
if anomalies_detected: |
|
with st.spinner("Sending notification about detected anomalies..."): |
|
notification_sent = send_notification( |
|
st.session_state.notification_type, |
|
st.session_state.notification_contact, |
|
anomaly_message, |
|
anomaly_image |
|
) |
|
|
|
if notification_sent: |
|
st.success(f"Notification sent to {st.session_state.notification_contact} via {st.session_state.notification_type.capitalize()}") |
|
else: |
|
st.error(f"Failed to send notification. Please check your {st.session_state.notification_type} settings.") |
|
|
|
|
|
|
|
if not (input_source == "Live Stream" and capture_mode == "Time Interval (Continuous)"): |
|
|
|
display_results(results, analysis_depth_value) |
|
|
|
|
|
if results: |
|
try: |
|
|
|
results_json = json.dumps(results, indent=2, cls=NumpyEncoder) |
|
|
|
|
|
st.download_button( |
|
label="Download Results as JSON", |
|
data=results_json, |
|
file_name="anomaly_detection_results.json", |
|
mime="application/json" |
|
) |
|
except Exception as e: |
|
st.warning(f"Could not create downloadable results: {str(e)}") |
|
st.info("This is usually due to large image data in the results. The analysis is still valid.") |
|
|
|
|
|
if input_source == "Video File" and 'video_path' in locals(): |
|
|
|
if not isinstance(uploaded_file, str): |
|
os.unlink(video_path) |
|
|
|
except Exception as e: |
|
st.error(f"⚠️ An error occurred: {str(e)}") |
|
if input_source == "Video File" and 'video_path' in locals(): |
|
|
|
if not isinstance(uploaded_file, str): |
|
os.unlink(video_path) |
|
|
|
|
|
if (input_source == "Video File" and uploaded_file is None) or (input_source == "Live Stream" and stream_source is None) or not submit_button: |
|
|
|
model_options_html = "" |
|
if PHI4_AVAILABLE: |
|
model_options_html += "<li><strong>Phi-4</strong> - Microsoft's multimodal model, runs locally</li>" |
|
|
|
instructions_html = f""" |
|
<div class="result-container instructions-container"> |
|
<h2 style="color: #5046E5;">📝 How to use this application</h2> |
|
|
|
<ol> |
|
<li><strong>Select an input source</strong>: |
|
<ul> |
|
<li><strong>Video File</strong> - Upload a video file for analysis</li> |
|
<li><strong>Live Stream</strong> - Connect to a webcam or IP camera stream</li> |
|
</ul> |
|
</li> |
|
<li><strong>Select an AI model</strong> for analysis: |
|
<ul> |
|
<li><strong>GPT-4o-mini</strong> - Faster and more cost-effective</li> |
|
<li><strong>GPT-4o</strong> - Most powerful model with highest accuracy</li> |
|
{model_options_html} |
|
</ul> |
|
</li> |
|
<li><strong>Set the number of frames to skip</strong> - higher values process fewer frames</li> |
|
<li><strong>Choose an analysis depth</strong>: |
|
<ul> |
|
<li><strong>Granular</strong> - Analyzes each frame individually</li> |
|
<li><strong>Cumulative</strong> - Provides an overall summary with key frames</li> |
|
</ul> |
|
</li> |
|
<li><strong>Enter a prompt</strong> describing what anomaly to look for</li> |
|
<li><strong>Enter your OpenAI API key</strong> with access to the selected model (not needed for Phi-4)</li> |
|
<li><strong>Click "Analyze Video"</strong> to start processing</li> |
|
</ol> |
|
|
|
<p>The application will extract frames from your video or stream, analyze them using the selected AI model, and display the results with clear indicators for detected anomalies.</p> |
|
</div> |
|
""" |
|
components.html(instructions_html, height=500) |
|
|
|
|
|
st.markdown("---") |
|
st.markdown("<div class='footer'>Powered by OpenAI's GPT-4o, GPT-4o-mini, and Microsoft's Phi-4 models | © 2023 Video Anomaly Detector</div>", unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
|
|
|
|