rajsecrets0 commited on
Commit
4809812
·
verified ·
1 Parent(s): e26f0dd

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +127 -0
app.py ADDED
@@ -0,0 +1,127 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import streamlit as st
2
+ import cv2
3
+ import requests
4
+ import base64
5
+ import tempfile
6
+ import os
7
+ import time
8
+ from typing import Generator, Tuple
9
+
10
+ # --------------------------
11
+ # Configuration
12
+ # --------------------------
13
+ API_KEY = os.getenv("GEMINI_API_KEY") # Fetch API key from Hugging Face secrets
14
+ API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={API_KEY}"
15
+ SYSTEM_PROMPT = '''
16
+ You are a next-generation AI-driven military surveillance officer...
17
+ (Same as before)
18
+ '''
19
+
20
+ # --------------------------
21
+ # Gemini API Client
22
+ # --------------------------
23
+ class GeminiClient:
24
+ def __init__(self, api_key: str, api_url: str):
25
+ self.api_key = api_key
26
+ self.api_url = api_url
27
+ self.session = requests.Session()
28
+ self.timeout = 30
29
+
30
+ def analyze_frame(self, frame_b64: str, timestamp: int) -> str:
31
+ """Send frame to Gemini API for analysis."""
32
+ payload = {
33
+ "contents": [{
34
+ "parts": [
35
+ {"text": f"Analyze this battlefield image from {timestamp} seconds:"},
36
+ {"inline_data": {"mime_type": "image/jpeg", "data": frame_b64}}
37
+ ]
38
+ }],
39
+ "systemInstruction": {
40
+ "parts": [{"text": SYSTEM_PROMPT}]
41
+ }
42
+ }
43
+
44
+ try:
45
+ response = self.session.post(
46
+ self.api_url,
47
+ json=payload,
48
+ timeout=self.timeout,
49
+ headers={"Content-Type": "application/json"}
50
+ )
51
+ response.raise_for_status()
52
+ return self._parse_response(response.json())
53
+ except requests.exceptions.RequestException as e:
54
+ return f"Analysis error: {str(e)}"
55
+
56
+ @staticmethod
57
+ def _parse_response(response: dict) -> str:
58
+ """Extract response text from Gemini API response."""
59
+ try:
60
+ return response["candidates"][0]["content"]["parts"][0]["text"]
61
+ except (KeyError, IndexError):
62
+ return "No analysis available"
63
+
64
+ # --------------------------
65
+ # Helper Functions
66
+ # --------------------------
67
+ def frame_to_base64(frame):
68
+ """Convert an image frame to base64 format."""
69
+ _, buffer = cv2.imencode(".jpg", frame)
70
+ return base64.b64encode(buffer).decode("utf-8")
71
+
72
+ def extract_video_frame(video_path, timestamp):
73
+ """Extract a frame at a specific timestamp from the video."""
74
+ cap = cv2.VideoCapture(video_path)
75
+ cap.set(cv2.CAP_PROP_POS_MSEC, timestamp * 1000)
76
+ success, frame = cap.read()
77
+ cap.release()
78
+ return frame if success else None
79
+
80
+ def download_video(video_url):
81
+ """Download video from URL to a temporary file."""
82
+ try:
83
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file:
84
+ response = requests.get(video_url, stream=True, timeout=30)
85
+ response.raise_for_status()
86
+ for chunk in response.iter_content(chunk_size=8192):
87
+ temp_file.write(chunk)
88
+ return temp_file.name
89
+ except Exception as e:
90
+ st.error(f"Video download failed: {str(e)}")
91
+ return None
92
+
93
+ # --------------------------
94
+ # Streamlit App
95
+ # --------------------------
96
+ st.title("🎥 Military Surveillance AI")
97
+ st.write("Upload a battlefield surveillance video to analyze.")
98
+
99
+ video_url = st.text_input("Enter Video URL:")
100
+ if st.button("Analyze Video") and video_url:
101
+ st.info("Downloading video...")
102
+ video_path = download_video(video_url)
103
+
104
+ if video_path:
105
+ client = GeminiClient(API_KEY, API_URL)
106
+ st.success("Video downloaded successfully!")
107
+
108
+ st.info("Processing video and analyzing frames...")
109
+ log = []
110
+
111
+ # Extract frames and analyze
112
+ for timestamp in range(10, 40, 10): # Analyze at 10s, 20s, 30s
113
+ frame = extract_video_frame(video_path, timestamp)
114
+ if frame is None:
115
+ log.append(f"[{timestamp}s] Frame extraction failed ❌")
116
+ continue
117
+
118
+ analysis = client.analyze_frame(frame_to_base64(frame), timestamp)
119
+ log.append(f"[{timestamp}s] {analysis}")
120
+ st.write(f"### Timestamp: {timestamp}s")
121
+ st.image(frame, caption=f"Frame at {timestamp}s", use_column_width=True)
122
+ st.write(analysis)
123
+ time.sleep(2) # Simulate processing delay
124
+
125
+ st.success("Analysis complete! ✅")
126
+ st.text_area("Summary of Analysis:", "\n".join(log), height=200)
127
+