File size: 4,499 Bytes
4809812
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import streamlit as st
import cv2
import requests
import base64
import tempfile
import os
import time
from typing import Generator, Tuple

# --------------------------
# Configuration
# --------------------------
API_KEY = os.getenv("GEMINI_API_KEY")  # Fetch API key from Hugging Face secrets
API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={API_KEY}"
SYSTEM_PROMPT = '''
You are a next-generation AI-driven military surveillance officer...
(Same as before)
'''

# --------------------------
# Gemini API Client
# --------------------------
class GeminiClient:
    def __init__(self, api_key: str, api_url: str):
        self.api_key = api_key
        self.api_url = api_url
        self.session = requests.Session()
        self.timeout = 30

    def analyze_frame(self, frame_b64: str, timestamp: int) -> str:
        """Send frame to Gemini API for analysis."""
        payload = {
            "contents": [{
                "parts": [
                    {"text": f"Analyze this battlefield image from {timestamp} seconds:"},
                    {"inline_data": {"mime_type": "image/jpeg", "data": frame_b64}}
                ]
            }],
            "systemInstruction": {
                "parts": [{"text": SYSTEM_PROMPT}]
            }
        }

        try:
            response = self.session.post(
                self.api_url,
                json=payload,
                timeout=self.timeout,
                headers={"Content-Type": "application/json"}
            )
            response.raise_for_status()
            return self._parse_response(response.json())
        except requests.exceptions.RequestException as e:
            return f"Analysis error: {str(e)}"

    @staticmethod
    def _parse_response(response: dict) -> str:
        """Extract response text from Gemini API response."""
        try:
            return response["candidates"][0]["content"]["parts"][0]["text"]
        except (KeyError, IndexError):
            return "No analysis available"

# --------------------------
# Helper Functions
# --------------------------
def frame_to_base64(frame):
    """Convert an image frame to base64 format."""
    _, buffer = cv2.imencode(".jpg", frame)
    return base64.b64encode(buffer).decode("utf-8")

def extract_video_frame(video_path, timestamp):
    """Extract a frame at a specific timestamp from the video."""
    cap = cv2.VideoCapture(video_path)
    cap.set(cv2.CAP_PROP_POS_MSEC, timestamp * 1000)
    success, frame = cap.read()
    cap.release()
    return frame if success else None

def download_video(video_url):
    """Download video from URL to a temporary file."""
    try:
        with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file:
            response = requests.get(video_url, stream=True, timeout=30)
            response.raise_for_status()
            for chunk in response.iter_content(chunk_size=8192):
                temp_file.write(chunk)
            return temp_file.name
    except Exception as e:
        st.error(f"Video download failed: {str(e)}")
        return None

# --------------------------
# Streamlit App
# --------------------------
st.title("🎥 Military Surveillance AI")
st.write("Upload a battlefield surveillance video to analyze.")

video_url = st.text_input("Enter Video URL:")
if st.button("Analyze Video") and video_url:
    st.info("Downloading video...")
    video_path = download_video(video_url)
    
    if video_path:
        client = GeminiClient(API_KEY, API_URL)
        st.success("Video downloaded successfully!")
        
        st.info("Processing video and analyzing frames...")
        log = []
        
        # Extract frames and analyze
        for timestamp in range(10, 40, 10):  # Analyze at 10s, 20s, 30s
            frame = extract_video_frame(video_path, timestamp)
            if frame is None:
                log.append(f"[{timestamp}s] Frame extraction failed ❌")
                continue
            
            analysis = client.analyze_frame(frame_to_base64(frame), timestamp)
            log.append(f"[{timestamp}s] {analysis}")
            st.write(f"### Timestamp: {timestamp}s")
            st.image(frame, caption=f"Frame at {timestamp}s", use_column_width=True)
            st.write(analysis)
            time.sleep(2)  # Simulate processing delay
            
        st.success("Analysis complete! ✅")
        st.text_area("Summary of Analysis:", "\n".join(log), height=200)