File size: 52,041 Bytes
b0ac1ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 |
"""
---
title: Video Anomaly Detector
emoji: π₯
colorFrom: blue
colorTo: green
sdk: streamlit
sdk_version: 1.31.0
app_file: app.py
pinned: false
license: mit
---
"""
import streamlit as st
import os
import tempfile
import time
from detector import VideoAnomalyDetector
import cv2
from PIL import Image
import numpy as np
from dotenv import load_dotenv
import streamlit.components.v1 as components
import json
import base64
from io import BytesIO
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
import requests
import re
# Custom JSON encoder to handle numpy arrays and other non-serializable types
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
# Convert numpy arrays to base64 encoded strings
pil_img = Image.fromarray(obj)
buffered = BytesIO()
pil_img.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
return {"__ndarray__": img_str}
return super(NumpyEncoder, self).default(obj)
def send_email_notification(to_email, subject, body, image=None):
"""Send email notification with optional image attachment"""
try:
# Get email credentials from environment variables
smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com")
smtp_port = int(os.getenv("SMTP_PORT", "1587"))
smtp_username = os.getenv("SMTP_USERNAME")
smtp_password = os.getenv("SMTP_PASSWORD")
if not smtp_username or not smtp_password:
st.warning("Email notification failed: SMTP credentials not configured. Please set SMTP_USERNAME and SMTP_PASSWORD environment variables.")
return False
# Create message
msg = MIMEMultipart()
msg['From'] = smtp_username
msg['To'] = to_email
msg['Subject'] = subject
# Attach text
msg.attach(MIMEText(body, 'plain'))
# Attach image if provided
if image is not None:
# Convert numpy array to image
if isinstance(image, np.ndarray):
pil_img = Image.fromarray(image)
img_byte_arr = BytesIO()
pil_img.save(img_byte_arr, format='PNG')
img_data = img_byte_arr.getvalue()
else:
# Assume it's already bytes
img_data = image
img_attachment = MIMEImage(img_data)
img_attachment.add_header('Content-Disposition', 'attachment', filename='anomaly.png')
msg.attach(img_attachment)
# Connect to server and send
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(smtp_username, smtp_password)
server.send_message(msg)
server.quit()
return True
except Exception as e:
st.warning(f"Email notification failed: {str(e)}")
return False
def send_whatsapp_notification(to_number, message):
"""Send WhatsApp notification using WhatsApp Business API"""
try:
# Get WhatsApp API credentials from environment variables
whatsapp_api_key = os.getenv("WHATSAPP_API_KEY")
whatsapp_phone_id = os.getenv("WHATSAPP_PHONE_ID")
if not whatsapp_api_key or not whatsapp_phone_id:
st.warning("WhatsApp notification failed: API credentials not configured. Please set WHATSAPP_API_KEY and WHATSAPP_PHONE_ID environment variables.")
return False
# For demonstration purposes, we'll show how to use the WhatsApp Business API
# In a real implementation, you would need to set up a WhatsApp Business account
# and use their official API
# Example using WhatsApp Business API
url = f"https://graph.facebook.com/v17.0/{whatsapp_phone_id}/messages"
headers = {
"Authorization": f"Bearer {whatsapp_api_key}",
"Content-Type": "application/json"
}
data = {
"messaging_product": "whatsapp",
"to": to_number,
"type": "text",
"text": {
"body": message
}
}
# For demonstration, we'll just log the request instead of actually sending it
print(f"Would send WhatsApp message to {to_number}: {message}")
# In a real implementation, you would uncomment this:
# response = requests.post(url, headers=headers, json=data)
# return response.status_code == 200
return True
except Exception as e:
st.warning(f"WhatsApp notification failed: {str(e)}")
return False
# Helper functions for notifications
def validate_email(email):
"""Validate email format"""
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
return re.match(pattern, email) is not None
def validate_phone(phone):
"""Validate phone number format (should include country code)"""
pattern = r'^\+\d{1,3}\d{6,14}$'
return re.match(pattern, phone) is not None
def send_notification(notification_type, contact, message, image=None):
"""Send notification based on type"""
if notification_type == "email":
if validate_email(contact):
return send_email_notification(
contact,
"Anomaly Detected - Video Anomaly Detector",
message,
image
)
else:
st.warning("Invalid email format. Notification not sent.")
return False
elif notification_type == "whatsapp":
if validate_phone(contact):
return send_whatsapp_notification(contact, message)
else:
st.warning("Invalid phone number format. Please include country code (e.g., +1234567890). Notification not sent.")
return False
return False
# Helper functions for displaying results
def display_single_result(result):
"""Display a single analysis result"""
if isinstance(result, dict):
# This is a single frame result or cumulative result
if "anomaly_detected" in result:
# Create columns for image and text
if "frame" in result:
col1, col2 = st.columns([1, 2])
with col1:
st.image(result["frame"], caption="Captured Frame", use_column_width=True)
with col2:
anomaly_detected = result["anomaly_detected"]
# Start building the HTML content
html_content = f"""
<div class='result-details'>
"""
# Add confidence if available
if "confidence" in result:
html_content += f"<p><strong>Confidence:</strong> {result['confidence']}%</p>"
# Add analysis/text if available (check multiple possible keys)
analysis_text = None
for key in ["analysis", "text", "description"]:
if key in result and result[key]:
analysis_text = result[key]
break
if analysis_text:
html_content += f"<p><strong>Analysis:</strong> {analysis_text}</p>"
# Add anomaly type if available
if "anomaly_type" in result and result["anomaly_type"]:
html_content += f"<p><strong>Anomaly Type:</strong> {result['anomaly_type']}</p>"
# Close the div
html_content += "</div>"
# Display the HTML content
st.markdown(html_content, unsafe_allow_html=True)
else:
# No frame available, just show the text
# Start building the HTML content
html_content = "<div class='result-details'>"
# Add confidence if available
if "confidence" in result:
html_content += f"<p><strong>Confidence:</strong> {result['confidence']}%</p>"
# Add analysis/text if available (check multiple possible keys)
analysis_text = None
for key in ["analysis", "text", "description"]:
if key in result and result[key]:
analysis_text = result[key]
break
if analysis_text:
html_content += f"<p><strong>Analysis:</strong> {analysis_text}</p>"
# Add anomaly type if available
if "anomaly_type" in result and result["anomaly_type"]:
html_content += f"<p><strong>Anomaly Type:</strong> {result['anomaly_type']}</p>"
# Close the div
html_content += "</div>"
# Display the HTML content
st.markdown(html_content, unsafe_allow_html=True)
else:
# Display other types of results
st.json(result)
else:
# Unknown result type
st.write(result)
def display_results(results, analysis_depth):
"""Display analysis results based on analysis depth"""
if not results:
st.warning("No results to display")
return
# Add a main results header
st.markdown("<h2 class='section-header'>π Analysis Results</h2>", unsafe_allow_html=True)
# Add high-level summary at the top
if analysis_depth == "granular":
# For granular analysis, check if any frame has an anomaly
anomaly_frames = sum(1 for r in results if r.get("anomaly_detected", False))
total_frames = len(results)
if anomaly_frames > 0:
# Get the anomaly types from frames with anomalies
anomaly_types = set(r.get("anomaly_type", "Unknown") for r in results if r.get("anomaly_detected", False))
anomaly_types_str = ", ".join(anomaly_types)
st.markdown(
f"""
<div class='result-box anomaly'>
<h3>β οΈ ANOMALY DETECTED</h3>
<p><strong>Frames with anomalies:</strong> {anomaly_frames} out of {total_frames}</p>
<p><strong>Anomaly types:</strong> {anomaly_types_str}</p>
</div>
""",
unsafe_allow_html=True
)
else:
st.markdown(
"""
<div class='result-box normal'>
<h3>β
No Anomalies Detected</h3>
<p>No anomalies were detected in any of the analyzed frames.</p>
</div>
""",
unsafe_allow_html=True
)
else: # cumulative
# For cumulative analysis, check the overall result
if results.get("anomaly_detected", False):
anomaly_type = results.get("anomaly_type", "Unknown")
st.markdown(
f"""
<div class='result-box anomaly'>
<h3>β οΈ ANOMALY DETECTED</h3>
<p><strong>Anomaly type:</strong> {anomaly_type}</p>
</div>
""",
unsafe_allow_html=True
)
else:
st.markdown(
"""
<div class='result-box normal'>
<h3>β
No Anomalies Detected</h3>
<p>No anomalies were detected in the video.</p>
</div>
""",
unsafe_allow_html=True
)
# Display detailed results
if analysis_depth == "granular":
# For granular analysis, results is a list of frame analyses
st.markdown("<h3 class='sub-header'>π Frame-by-Frame Analysis</h3>", unsafe_allow_html=True)
# Display detailed view directly without tabs
for i, result in enumerate(results):
with st.expander(f"Frame {i+1} - {'β οΈ ANOMALY' if result.get('anomaly_detected', False) else 'β
Normal'}"):
display_single_result(result)
else: # cumulative
st.markdown("<h3 class='sub-header'>π Overall Video Analysis</h3>", unsafe_allow_html=True)
display_single_result(results)
# Display key frames if available
if "frames" in results and results["frames"]:
st.markdown("<h3 class='sub-header'>πΌοΈ Key Frames</h3>", unsafe_allow_html=True)
# Create a row of columns for the frames
num_frames = len(results["frames"])
cols = st.columns(min(3, num_frames))
# Display each frame in a column
for i, (col, frame) in enumerate(zip(cols, results["frames"])):
with col:
st.image(frame, caption=f"Key Frame {i+1}", use_column_width=True)
# Initialize session state for stop button
if 'stop_requested' not in st.session_state:
st.session_state.stop_requested = False
def request_stop():
st.session_state.stop_requested = True
# Conditionally import Phi-4 detector
try:
from phi4_detector import Phi4AnomalyDetector
PHI4_AVAILABLE = True
except ImportError:
PHI4_AVAILABLE = False
# Load environment variables from .env file
load_dotenv()
# Set page configuration
st.set_page_config(
page_title="Video Anomaly Detector",
page_icon="π",
layout="wide"
)
# Custom CSS for better UI
st.markdown("""
<style>
@import url('https://fonts.googleapis.com/css2?family=Poppins:wght@300;400;500;600;700&display=swap');
html, body, [class*="css"] {
font-family: 'Poppins', sans-serif;
}
.main-header {
font-size: 2.8rem;
font-weight: 700;
color: #5046E5;
text-align: center;
margin-bottom: 1rem;
padding-top: 1.5rem;
}
.sub-header {
font-size: 1.8rem;
font-weight: 600;
color: #36B37E;
margin-bottom: 1.2rem;
}
.section-header {
font-size: 2rem;
font-weight: 600;
color: #5046E5;
margin-top: 2rem;
margin-bottom: 1rem;
}
.result-box {
padding: 15px;
border-radius: 10px;
margin-bottom: 15px;
}
.result-box.anomaly {
background-color: rgba(255, 76, 76, 0.1);
border: 1px solid rgba(255, 76, 76, 0.3);
}
.result-box.normal {
background-color: rgba(54, 179, 126, 0.1);
border: 1px solid rgba(54, 179, 126, 0.3);
}
.result-box h3 {
margin-top: 0;
margin-bottom: 10px;
}
.result-box.anomaly h3 {
color: #FF4C4C;
}
.result-box.normal h3 {
color: #36B37E;
}
.result-container {
background-color: #f8f9fa;
padding: 1.8rem;
border-radius: 12px;
margin-bottom: 1.5rem;
border: 1px solid #e9ecef;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.05);
}
.stProgress > div > div > div {
background-color: #5046E5;
}
.stButton>button {
background-color: #5046E5;
color: white;
font-weight: 600;
border-radius: 8px;
padding: 0.5rem 1rem;
border: none;
}
.stButton>button:hover {
background-color: #4038C7;
}
.stSelectbox>div>div {
background-color: #f8f9fa;
border-radius: 8px;
}
.stRadio>div {
padding: 10px;
background-color: #f8f9fa;
border-radius: 8px;
}
.stExpander>div {
border-radius: 8px;
border: 1px solid #e9ecef;
}
.model-info {
font-size: 0.9rem;
color: #6c757d;
font-style: italic;
margin-top: 0.5rem;
}
.icon-text {
display: flex;
align-items: center;
gap: 0.5rem;
}
.footer {
text-align: center;
color: #6c757d;
font-size: 0.9rem;
margin-top: 2rem;
}
.anomaly-true {
color: #dc3545;
font-weight: bold;
}
.anomaly-false {
color: #28a745;
font-weight: bold;
}
.anomaly-type {
font-weight: bold;
margin-top: 0.5rem;
}
.anomaly-box {
padding: 1rem;
border-radius: 8px;
margin-bottom: 1rem;
}
.anomaly-box-true {
background-color: rgba(220, 53, 69, 0.1);
border: 1px solid rgba(220, 53, 69, 0.3);
}
.anomaly-box-false {
background-color: rgba(40, 167, 69, 0.1);
border: 1px solid rgba(40, 167, 69, 0.3);
}
.instructions-container {
font-size: 1.1rem;
line-height: 1.8;
}
.instructions-container ol {
padding-left: 1.5rem;
}
.instructions-container ul {
padding-left: 1.5rem;
}
.instructions-container li {
margin-bottom: 0.5rem;
}
.live-stream-container {
border: 2px solid #5046E5;
border-radius: 12px;
padding: 1rem;
margin-top: 1rem;
}
.result-details {
padding: 15px;
border-radius: 10px;
margin-bottom: 15px;
background-color: rgba(80, 70, 229, 0.05);
border: 1px solid rgba(80, 70, 229, 0.2);
}
.result-details p {
margin-bottom: 10px;
}
.result-details strong {
color: #5046E5;
}
.video-preview-container {
border: 1px solid #e9ecef;
border-radius: 10px;
padding: 15px;
margin-bottom: 20px;
background-color: rgba(80, 70, 229, 0.03);
}
.video-preview-container video {
width: 100%;
border-radius: 8px;
margin-bottom: 10px;
}
.video-info {
display: flex;
justify-content: space-between;
margin-top: 10px;
}
.video-info-item {
text-align: center;
padding: 8px;
background-color: #f8f9fa;
border-radius: 5px;
flex: 1;
margin: 0 5px;
}
</style>
""", unsafe_allow_html=True)
# Header with icon
st.markdown("<h1 class='main-header'>π Video Anomaly Detector</h1>", unsafe_allow_html=True)
st.markdown("<p style='text-align: center; font-size: 1.2rem; margin-bottom: 2rem;'>Analyze video frames for anomalies using advanced AI models</p>", unsafe_allow_html=True)
# Sidebar for inputs
with st.sidebar:
st.markdown("<h2 class='sub-header'>βοΈ Settings</h2>", unsafe_allow_html=True)
# Input source selection
st.markdown("<div class='icon-text'><span>πΉ</span><span>Input Source</span></div>", unsafe_allow_html=True)
input_source = st.radio(
"",
["Video File", "Live Stream"],
index=0,
help="Select the input source for analysis"
)
# File uploader or stream URL based on input source
if input_source == "Video File":
st.markdown("<div class='icon-text'><span>π</span><span>Upload Video</span></div>", unsafe_allow_html=True)
# Find sample .mp4 files in the current directory
sample_files = []
for file in os.listdir():
if file.endswith('.mp4'):
sample_files.append(file)
# Show sample files if available
if sample_files:
st.info(f"Sample videos available: {', '.join(sample_files)}")
use_sample = st.checkbox("Use a sample video instead of uploading")
if use_sample:
selected_sample = st.selectbox("Select a sample video", sample_files)
uploaded_file = selected_sample # We'll handle this specially later
# Add video preview section
st.markdown("<h3 class='sub-header'>π¬ Video Preview</h3>", unsafe_allow_html=True)
# Create a container for the video preview with custom styling
st.markdown("<div class='video-preview-container'>", unsafe_allow_html=True)
# Get the full path to the selected sample video
video_path = os.path.join(os.getcwd(), selected_sample)
# Display the video player
st.video(video_path)
# Display video information
try:
cap = cv2.VideoCapture(video_path)
if cap.isOpened():
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Calculate duration
duration = frame_count / fps if fps > 0 else 0
# Format duration as minutes:seconds
minutes = int(duration // 60)
seconds = int(duration % 60)
duration_str = f"{minutes}:{seconds:02d}"
cap.release()
except Exception as e:
st.warning(f"Could not read video properties: {str(e)}")
st.markdown("</div>", unsafe_allow_html=True)
else:
uploaded_file = st.file_uploader("", type=["mp4", "avi", "mov", "mkv"])
else:
uploaded_file = st.file_uploader("", type=["mp4", "avi", "mov", "mkv"])
stream_source = None
else: # Live Stream
st.markdown("<div class='icon-text'><span>π</span><span>Stream Source</span></div>", unsafe_allow_html=True)
stream_options = ["Webcam", "IP Camera / RTSP Stream"]
stream_type = st.selectbox("", stream_options, index=0)
if stream_type == "Webcam":
stream_source = 0 # Default webcam
else:
stream_source = st.text_input("Stream URL", placeholder="rtsp://username:password@ip_address:port/path")
# Max frames to process for live stream
st.markdown("<div class='icon-text'><span>π’</span><span>Frame Capture Settings</span></div>", unsafe_allow_html=True)
capture_mode = st.radio(
"Capture Mode",
["Frame Count Limit", "Time Interval (Continuous)"],
index=0,
help="Choose how to capture frames from the live stream"
)
if capture_mode == "Frame Count Limit":
max_frames = st.number_input(
"Maximum Frames",
min_value=1,
max_value=100,
value=30,
help="Maximum number of frames to process from the live stream"
)
time_interval = None
else: # Time Interval mode
max_frames = None # No frame limit in time interval mode
time_interval = st.number_input(
"Seconds Between Captures",
min_value=1,
max_value=60,
value=5,
help="Capture one frame every X seconds indefinitely"
)
st.info("β οΈ In time interval mode, processing will continue indefinitely. Use the Stop button to end capture.")
uploaded_file = None
# Model selection
st.markdown("<div class='icon-text'><span>π§ </span><span>AI Model</span></div>", unsafe_allow_html=True)
# Add Phi-4 to the model options if available
model_options = ["GPT-4o", "GPT-4o-mini"]
if PHI4_AVAILABLE:
model_options.append("Phi-4")
model_options.append("Phi-3 (Coming Soon)")
model = st.selectbox(
"",
model_options,
index=0,
help="Select the AI model to use for analysis"
)
# Display model info based on selection
if model == "GPT-4o":
st.markdown("<div class='model-info'>Most powerful model with highest accuracy</div>", unsafe_allow_html=True)
model_value = "gpt-4o"
use_phi4 = False
elif model == "GPT-4o-mini":
st.markdown("<div class='model-info'>Faster and more cost-effective</div>", unsafe_allow_html=True)
model_value = "gpt-4o-mini"
use_phi4 = False
elif model == "Phi-4":
st.markdown("<div class='model-info'>Microsoft's multimodal model, runs locally</div>", unsafe_allow_html=True)
model_value = "phi-4"
use_phi4 = True
else: # Phi-3
st.markdown("<div class='model-info'>Not yet implemented</div>", unsafe_allow_html=True)
model_value = "gpt-4o" # Default to GPT-4o if Phi-3 is selected
use_phi4 = False
st.warning("Phi-3 support is coming soon. Using GPT-4o instead.")
# Skip frames input with icon
st.markdown("<div class='icon-text'><span>βοΈ</span><span>Frame Skip Rate</span></div>", unsafe_allow_html=True)
skip_frames = st.number_input(
"",
min_value=0,
max_value=100,
value=5,
help="Higher values process fewer frames, making analysis faster but potentially less accurate"
)
# Analysis depth selection
st.markdown("<div class='icon-text'><span>π¬</span><span>Analysis Depth</span></div>", unsafe_allow_html=True)
analysis_depth = st.radio(
"",
["Granular (Frame by Frame)", "Cumulative (Overall)"],
index=0,
help="Granular provides analysis for each frame, Cumulative gives an overall assessment"
)
# Map the radio button value to the actual value
analysis_depth_value = "granular" if analysis_depth == "Granular (Frame by Frame)" else "cumulative"
# Notification options
st.markdown("<div class='icon-text'><span>π</span><span>Notifications</span></div>", unsafe_allow_html=True)
enable_notifications = st.checkbox("Enable notifications for anomaly detection", value=False)
if enable_notifications:
notification_type = st.radio(
"Notification Method",
["Email", "WhatsApp"],
index=0,
help="Select how you want to be notified when anomalies are detected"
)
if notification_type == "Email":
notification_email = st.text_input(
"Email Address",
placeholder="[email protected]",
help="Enter the email address to receive notifications"
)
st.session_state.notification_contact = notification_email if notification_email else None
st.session_state.notification_type = "email" if notification_email else None
else: # WhatsApp
notification_phone = st.text_input(
"WhatsApp Number",
placeholder="+1234567890 (include country code)",
help="Enter your WhatsApp number with country code"
)
st.session_state.notification_contact = notification_phone if notification_phone else None
st.session_state.notification_type = "whatsapp" if notification_phone else None
else:
st.session_state.notification_type = None
st.session_state.notification_contact = None
# Prompt input with icon
st.markdown("<div class='icon-text'><span>π¬</span><span>Anomaly Description</span></div>", unsafe_allow_html=True)
prompt = st.text_area(
"",
value="Analyze this frame and describe if there are any unusual or anomalous activities or objects. If you detect anything unusual, explain what it is and why it might be considered an anomaly.",
height=150,
help="Describe what kind of anomaly to look for"
)
# API key input with default from environment variable and icon (only show for OpenAI models)
if not use_phi4:
st.markdown("<div class='icon-text'><span>π</span><span>OpenAI API Key</span></div>", unsafe_allow_html=True)
default_api_key = os.getenv("OPENAI_API_KEY", "")
api_key = st.text_input(
"",
value=default_api_key,
type="password",
help="Your OpenAI API key with access to the selected model"
)
else:
# For Phi-4, we don't need an API key
api_key = "not-needed-for-phi4"
# Submit button with icon
submit_button = st.button("π Analyze Video")
# Main content area for video file
if input_source == "Video File" and uploaded_file is not None:
# Display video info
st.markdown("<h2 class='sub-header'>π Video Information</h2>", unsafe_allow_html=True)
# Check if we're using a sample file or an uploaded file
if isinstance(uploaded_file, str) and os.path.exists(uploaded_file):
# This is a sample file from the directory
video_path = uploaded_file
st.success(f"Using sample video: {os.path.basename(video_path)}")
else:
# This is an uploaded file
# Save uploaded file to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file:
tmp_file.write(uploaded_file.getvalue())
video_path = tmp_file.name
# Get video metadata
# For video files, use the default backend instead of DirectShow
cap = cv2.VideoCapture(video_path)
# Don't set MJPG format for video files as it can interfere with proper decoding
# cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M','J','P','G'))
# Try to get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Prevent division by zero, but only show warning for live streams
# For video files, this is likely an actual error
if fps <= 0:
# Check if this is a video file (not a webcam/stream)
if isinstance(video_path, str) and os.path.exists(video_path):
# This is a file that exists but has FPS issues
fps = 30.0 # Use a default value
st.warning(f"Could not determine frame rate for video file: {os.path.basename(video_path)}. Using default value of 30 FPS.")
else:
# This is likely a webcam or stream
fps = 30.0
st.info("Using default frame rate of 30 FPS for live stream.")
duration = frame_count / fps
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# Display video metadata in a nicer format
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("<div style='text-align: center;'>β±οΈ</div>", unsafe_allow_html=True)
st.metric("Duration", f"{duration:.2f} seconds")
with col2:
st.markdown("<div style='text-align: center;'>ποΈ</div>", unsafe_allow_html=True)
st.metric("Total Frames", frame_count)
with col3:
st.markdown("<div style='text-align: center;'>π</div>", unsafe_allow_html=True)
st.metric("Resolution", f"{width}x{height}")
# Display estimated frames to process
estimated_frames = frame_count // (skip_frames + 1) + 1
st.info(f"With current settings, approximately {estimated_frames} frames will be processed.")
# Main content area for live stream
elif input_source == "Live Stream" and stream_source is not None:
# Display live stream info
st.markdown("<h2 class='sub-header'>π Live Stream Information</h2>", unsafe_allow_html=True)
# Display stream source info
if stream_source == 0:
st.info("Using default webcam as the stream source.")
else:
st.info(f"Using stream URL: {stream_source}")
# Display estimated frames to process
st.info(f"Will process up to {max_frames} frames with a skip rate of {skip_frames}.")
# Show a placeholder for the live stream
st.markdown("<div class='live-stream-container'><p style='text-align: center;'>Live stream preview will appear here during processing</p></div>", unsafe_allow_html=True)
# Process video or stream when submit button is clicked
if submit_button:
if not api_key and not use_phi4:
st.error("β οΈ Please enter your OpenAI API key")
elif input_source == "Video File" and uploaded_file is None:
st.error("β οΈ Please upload a video file")
elif input_source == "Live Stream" and stream_source is None:
st.error("β οΈ Please provide a valid stream source")
else:
try:
# Initialize detector based on selected model
if use_phi4:
with st.spinner("Loading Phi-4 model... This may take a while if downloading for the first time."):
detector = Phi4AnomalyDetector()
st.success("Phi-4 model loaded successfully!")
else:
detector = VideoAnomalyDetector(api_key, model_value)
# Progress bar and status
st.markdown("<h2 class='sub-header'>β³ Processing Video</h2>", unsafe_allow_html=True)
progress_bar = st.progress(0)
status_text = st.empty()
# Create a callback function to update progress
def update_progress(current, total):
if total == -1:
# Continuous mode
status_text.text(f"Processed {current} frames (continuous mode)...")
else:
# Normal mode with a known total
if total > 0:
progress = current / total
progress_bar.progress(progress)
else:
# Handle case where total is zero
progress_bar.progress(0)
status_text.text(f"Processing frame {current+1} of {total if total > 0 else '?'}...")
# Process the video or stream
start_time = time.time()
if input_source == "Video File":
results = detector.process_video(video_path, skip_frames, prompt, analysis_depth_value, update_progress)
print(f"Results: {results}")
# Results will be displayed after processing
else: # Live Stream
if capture_mode == "Frame Count Limit":
# Process with frame count limit (original behavior)
results = detector.process_live_stream(stream_source, skip_frames, prompt, analysis_depth_value, max_frames, update_progress)
# Results will be displayed after processing
else: # Time Interval mode
# Create a placeholder for continuous results
results_container = st.empty()
# Reset stop request flag at the beginning of processing
st.session_state.stop_requested = False
# Create a stop button outside the loop
st.button("Stop Capture", key="stop_continuous_main", on_click=request_stop)
# Process with time interval (generator mode)
results_generator = detector.process_live_stream(
stream_source, skip_frames, prompt, analysis_depth_value,
None, update_progress, time_interval
)
# Collect results for cumulative analysis if needed
all_results = []
frame_counter = 0
try:
# Process results as they come in
for result in results_generator:
# Check if stop button was pressed
if st.session_state.stop_requested:
st.success("Capture stopped by user")
break
frame_counter += 1
all_results.append(result)
# Display the latest result
with results_container.container():
if analysis_depth_value == "granular":
# For granular analysis, show the latest frame result
st.markdown(f"### Frame {frame_counter}")
display_single_result(result)
# Send notification if anomaly detected and notifications are enabled
if result.get("anomaly_detected", False) and st.session_state.notification_type and st.session_state.notification_contact:
# Create notification message
anomaly_type = result.get("anomaly_type", "Unknown")
anomaly_message = f"Anomaly detected in live stream (Frame {frame_counter}).\n"
anomaly_message += f"Anomaly type: {anomaly_type}\n\n"
# Add analysis details
analysis_text = None
for key in ["analysis", "text", "description"]:
if key in result and result[key]:
analysis_text = result[key]
break
if analysis_text:
anomaly_message += f"Analysis: {analysis_text[:500]}..."
# Send notification
with st.spinner("Sending notification about detected anomaly..."):
notification_sent = send_notification(
st.session_state.notification_type,
st.session_state.notification_contact,
anomaly_message,
result.get("frame")
)
if notification_sent:
st.success(f"Notification sent to {st.session_state.notification_contact} via {st.session_state.notification_type.capitalize()}")
else:
st.error(f"Failed to send notification. Please check your {st.session_state.notification_type} settings.")
else:
# For cumulative analysis, we get periodic updates
st.markdown(f"### Cumulative Analysis (Updated)")
display_single_result(result)
# Send notification if anomaly detected and notifications are enabled
if result.get("anomaly_detected", False) and st.session_state.notification_type and st.session_state.notification_contact:
# Create notification message
anomaly_type = result.get("anomaly_type", "Unknown")
anomaly_message = f"Anomaly detected in live stream (Cumulative Analysis).\n"
anomaly_message += f"Anomaly type: {anomaly_type}\n\n"
# Add analysis details
analysis_text = None
for key in ["analysis", "text", "description"]:
if key in result and result[key]:
analysis_text = result[key]
break
if analysis_text:
anomaly_message += f"Analysis: {analysis_text[:500]}..."
# Get a frame for the notification if available
anomaly_image = None
if "frames" in result and result["frames"]:
anomaly_image = result["frames"][0]
# Send notification
with st.spinner("Sending notification about detected anomaly..."):
notification_sent = send_notification(
st.session_state.notification_type,
st.session_state.notification_contact,
anomaly_message,
anomaly_image
)
if notification_sent:
st.success(f"Notification sent to {st.session_state.notification_contact} via {st.session_state.notification_type.capitalize()}")
else:
st.error(f"Failed to send notification. Please check your {st.session_state.notification_type} settings.")
# Sleep briefly to allow UI updates
time.sleep(0.1)
except StopIteration:
if not st.session_state.stop_requested:
st.info("Stream ended")
# Final results
if analysis_depth_value == "granular":
results = all_results
else:
results = all_results[-1] if all_results else None
end_time = time.time()
# Calculate processing time
processing_time = end_time - start_time
st.success(f"Processing completed in {processing_time:.2f} seconds")
# Check if notifications are enabled and if anomalies were detected
if st.session_state.notification_type and st.session_state.notification_contact:
# Check if anomalies were detected
anomalies_detected = False
anomaly_image = None
anomaly_message = ""
if analysis_depth_value == "granular":
# For granular analysis, check if any frame has an anomaly
anomaly_frames = [r for r in results if r.get("anomaly_detected", False)]
if anomaly_frames:
anomalies_detected = True
# Get the first anomaly frame for the notification
first_anomaly = anomaly_frames[0]
anomaly_image = first_anomaly.get("frame")
# Create notification message
anomaly_types = set(r.get("anomaly_type", "Unknown") for r in anomaly_frames)
anomaly_message = f"Anomaly detected in {len(anomaly_frames)} out of {len(results)} frames.\n"
anomaly_message += f"Anomaly types: {', '.join(anomaly_types)}\n\n"
# Add details of the first anomaly
analysis_text = None
for key in ["analysis", "text", "description"]:
if key in first_anomaly and first_anomaly[key]:
analysis_text = first_anomaly[key]
break
if analysis_text:
anomaly_message += f"Analysis of first anomaly: {analysis_text[:500]}..."
else:
# For cumulative analysis, check the overall result
if results.get("anomaly_detected", False):
anomalies_detected = True
# Get a frame for the notification if available
if "frames" in results and results["frames"]:
anomaly_image = results["frames"][0]
# Create notification message
anomaly_type = results.get("anomaly_type", "Unknown")
anomaly_message = f"Anomaly detected in video analysis.\n"
anomaly_message += f"Anomaly type: {anomaly_type}\n\n"
# Add analysis details
analysis_text = None
for key in ["analysis", "text", "description"]:
if key in results and results[key]:
analysis_text = results[key]
break
if analysis_text:
anomaly_message += f"Analysis: {analysis_text[:500]}..."
# Send notification if anomalies were detected
if anomalies_detected:
with st.spinner("Sending notification about detected anomalies..."):
notification_sent = send_notification(
st.session_state.notification_type,
st.session_state.notification_contact,
anomaly_message,
anomaly_image
)
if notification_sent:
st.success(f"Notification sent to {st.session_state.notification_contact} via {st.session_state.notification_type.capitalize()}")
else:
st.error(f"Failed to send notification. Please check your {st.session_state.notification_type} settings.")
# Only display results here if we're not in time interval mode
# (time interval mode displays results as they come in)
if not (input_source == "Live Stream" and capture_mode == "Time Interval (Continuous)"):
# Display the results without an additional header
display_results(results, analysis_depth_value)
# Download results button
if results:
try:
# Convert results to JSON using our custom encoder
results_json = json.dumps(results, indent=2, cls=NumpyEncoder)
# Create a download button
st.download_button(
label="Download Results as JSON",
data=results_json,
file_name="anomaly_detection_results.json",
mime="application/json"
)
except Exception as e:
st.warning(f"Could not create downloadable results: {str(e)}")
st.info("This is usually due to large image data in the results. The analysis is still valid.")
# Clean up the temporary file if using a video file
if input_source == "Video File" and 'video_path' in locals():
# Only delete the file if it's a temporary file, not a sample file
if not isinstance(uploaded_file, str):
os.unlink(video_path)
except Exception as e:
st.error(f"β οΈ An error occurred: {str(e)}")
if input_source == "Video File" and 'video_path' in locals():
# Only delete the file if it's a temporary file, not a sample file
if not isinstance(uploaded_file, str):
os.unlink(video_path)
# Instructions when no file is uploaded or stream is selected
if (input_source == "Video File" and uploaded_file is None) or (input_source == "Live Stream" and stream_source is None) or not submit_button:
# Using HTML component to properly render the HTML
model_options_html = ""
if PHI4_AVAILABLE:
model_options_html += "<li><strong>Phi-4</strong> - Microsoft's multimodal model, runs locally</li>"
instructions_html = f"""
<div class="result-container instructions-container">
<h2 style="color: #5046E5;">π How to use this application</h2>
<ol>
<li><strong>Select an input source</strong>:
<ul>
<li><strong>Video File</strong> - Upload a video file for analysis</li>
<li><strong>Live Stream</strong> - Connect to a webcam or IP camera stream</li>
</ul>
</li>
<li><strong>Select an AI model</strong> for analysis:
<ul>
<li><strong>GPT-4o-mini</strong> - Faster and more cost-effective</li>
<li><strong>GPT-4o</strong> - Most powerful model with highest accuracy</li>
{model_options_html}
</ul>
</li>
<li><strong>Set the number of frames to skip</strong> - higher values process fewer frames</li>
<li><strong>Choose an analysis depth</strong>:
<ul>
<li><strong>Granular</strong> - Analyzes each frame individually</li>
<li><strong>Cumulative</strong> - Provides an overall summary with key frames</li>
</ul>
</li>
<li><strong>Enter a prompt</strong> describing what anomaly to look for</li>
<li><strong>Enter your OpenAI API key</strong> with access to the selected model (not needed for Phi-4)</li>
<li><strong>Click "Analyze Video"</strong> to start processing</li>
</ol>
<p>The application will extract frames from your video or stream, analyze them using the selected AI model, and display the results with clear indicators for detected anomalies.</p>
</div>
"""
components.html(instructions_html, height=500)
# Footer
st.markdown("---")
st.markdown("<div class='footer'>Powered by OpenAI's GPT-4o, GPT-4o-mini, and Microsoft's Phi-4 models | Β© 2023 Video Anomaly Detector</div>", unsafe_allow_html=True)
|