Spaces:
Sleeping
Sleeping
File size: 4,499 Bytes
4809812 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
import streamlit as st
import cv2
import requests
import base64
import tempfile
import os
import time
from typing import Generator, Tuple
# --------------------------
# Configuration
# --------------------------
API_KEY = os.getenv("GEMINI_API_KEY") # Fetch API key from Hugging Face secrets
API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={API_KEY}"
SYSTEM_PROMPT = '''
You are a next-generation AI-driven military surveillance officer...
(Same as before)
'''
# --------------------------
# Gemini API Client
# --------------------------
class GeminiClient:
def __init__(self, api_key: str, api_url: str):
self.api_key = api_key
self.api_url = api_url
self.session = requests.Session()
self.timeout = 30
def analyze_frame(self, frame_b64: str, timestamp: int) -> str:
"""Send frame to Gemini API for analysis."""
payload = {
"contents": [{
"parts": [
{"text": f"Analyze this battlefield image from {timestamp} seconds:"},
{"inline_data": {"mime_type": "image/jpeg", "data": frame_b64}}
]
}],
"systemInstruction": {
"parts": [{"text": SYSTEM_PROMPT}]
}
}
try:
response = self.session.post(
self.api_url,
json=payload,
timeout=self.timeout,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
return self._parse_response(response.json())
except requests.exceptions.RequestException as e:
return f"Analysis error: {str(e)}"
@staticmethod
def _parse_response(response: dict) -> str:
"""Extract response text from Gemini API response."""
try:
return response["candidates"][0]["content"]["parts"][0]["text"]
except (KeyError, IndexError):
return "No analysis available"
# --------------------------
# Helper Functions
# --------------------------
def frame_to_base64(frame):
"""Convert an image frame to base64 format."""
_, buffer = cv2.imencode(".jpg", frame)
return base64.b64encode(buffer).decode("utf-8")
def extract_video_frame(video_path, timestamp):
"""Extract a frame at a specific timestamp from the video."""
cap = cv2.VideoCapture(video_path)
cap.set(cv2.CAP_PROP_POS_MSEC, timestamp * 1000)
success, frame = cap.read()
cap.release()
return frame if success else None
def download_video(video_url):
"""Download video from URL to a temporary file."""
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file:
response = requests.get(video_url, stream=True, timeout=30)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
temp_file.write(chunk)
return temp_file.name
except Exception as e:
st.error(f"Video download failed: {str(e)}")
return None
# --------------------------
# Streamlit App
# --------------------------
st.title("🎥 Military Surveillance AI")
st.write("Upload a battlefield surveillance video to analyze.")
video_url = st.text_input("Enter Video URL:")
if st.button("Analyze Video") and video_url:
st.info("Downloading video...")
video_path = download_video(video_url)
if video_path:
client = GeminiClient(API_KEY, API_URL)
st.success("Video downloaded successfully!")
st.info("Processing video and analyzing frames...")
log = []
# Extract frames and analyze
for timestamp in range(10, 40, 10): # Analyze at 10s, 20s, 30s
frame = extract_video_frame(video_path, timestamp)
if frame is None:
log.append(f"[{timestamp}s] Frame extraction failed ❌")
continue
analysis = client.analyze_frame(frame_to_base64(frame), timestamp)
log.append(f"[{timestamp}s] {analysis}")
st.write(f"### Timestamp: {timestamp}s")
st.image(frame, caption=f"Frame at {timestamp}s", use_column_width=True)
st.write(analysis)
time.sleep(2) # Simulate processing delay
st.success("Analysis complete! ✅")
st.text_area("Summary of Analysis:", "\n".join(log), height=200)
|