import streamlit as st import cv2 import requests import base64 import tempfile import os import time from typing import Generator, Tuple # -------------------------- # Configuration # -------------------------- API_KEY = os.getenv("GEMINI_API_KEY") # Fetch API key from Hugging Face secrets API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={API_KEY}" SYSTEM_PROMPT = ''' You are a next-generation AI-driven military surveillance officer... (Same as before) ''' # -------------------------- # Gemini API Client # -------------------------- class GeminiClient: def __init__(self, api_key: str, api_url: str): self.api_key = api_key self.api_url = api_url self.session = requests.Session() self.timeout = 30 def analyze_frame(self, frame_b64: str, timestamp: int) -> str: """Send frame to Gemini API for analysis.""" payload = { "contents": [{ "parts": [ {"text": f"Analyze this battlefield image from {timestamp} seconds:"}, {"inline_data": {"mime_type": "image/jpeg", "data": frame_b64}} ] }], "systemInstruction": { "parts": [{"text": SYSTEM_PROMPT}] } } try: response = self.session.post( self.api_url, json=payload, timeout=self.timeout, headers={"Content-Type": "application/json"} ) response.raise_for_status() return self._parse_response(response.json()) except requests.exceptions.RequestException as e: return f"Analysis error: {str(e)}" @staticmethod def _parse_response(response: dict) -> str: """Extract response text from Gemini API response.""" try: return response["candidates"][0]["content"]["parts"][0]["text"] except (KeyError, IndexError): return "No analysis available" # -------------------------- # Helper Functions # -------------------------- def frame_to_base64(frame): """Convert an image frame to base64 format.""" _, buffer = cv2.imencode(".jpg", frame) return base64.b64encode(buffer).decode("utf-8") def extract_video_frame(video_path, timestamp): """Extract a frame at a specific timestamp from the video.""" cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, timestamp * 1000) success, frame = cap.read() cap.release() return frame if success else None def download_video(video_url): """Download video from URL to a temporary file.""" try: with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: response = requests.get(video_url, stream=True, timeout=30) response.raise_for_status() for chunk in response.iter_content(chunk_size=8192): temp_file.write(chunk) return temp_file.name except Exception as e: st.error(f"Video download failed: {str(e)}") return None # -------------------------- # Streamlit App # -------------------------- st.title("🎥 Military Surveillance AI") st.write("Upload a battlefield surveillance video to analyze.") video_url = st.text_input("Enter Video URL:") if st.button("Analyze Video") and video_url: st.info("Downloading video...") video_path = download_video(video_url) if video_path: client = GeminiClient(API_KEY, API_URL) st.success("Video downloaded successfully!") st.info("Processing video and analyzing frames...") log = [] # Extract frames and analyze for timestamp in range(10, 40, 10): # Analyze at 10s, 20s, 30s frame = extract_video_frame(video_path, timestamp) if frame is None: log.append(f"[{timestamp}s] Frame extraction failed ❌") continue analysis = client.analyze_frame(frame_to_base64(frame), timestamp) log.append(f"[{timestamp}s] {analysis}") st.write(f"### Timestamp: {timestamp}s") st.image(frame, caption=f"Frame at {timestamp}s", use_column_width=True) st.write(analysis) time.sleep(2) # Simulate processing delay st.success("Analysis complete! ✅") st.text_area("Summary of Analysis:", "\n".join(log), height=200)