File size: 18,993 Bytes
d4dc59a
17f05fd
d4dc59a
 
 
 
 
 
 
 
 
 
 
 
0ea5e08
17f05fd
d4dc59a
 
 
 
17f05fd
0d7bd2a
d4dc59a
 
 
 
 
 
 
 
17f05fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d4dc59a
71bdca5
 
17f05fd
 
71bdca5
 
17f05fd
 
 
 
 
 
3f5a983
 
 
 
 
71bdca5
 
 
17f05fd
71bdca5
3f5a983
 
 
 
 
 
71bdca5
 
3f5a983
71bdca5
 
3f5a983
17f05fd
3f5a983
71bdca5
3f5a983
17f05fd
 
 
 
 
 
3f5a983
17f05fd
 
 
3f5a983
17f05fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71bdca5
 
3f5a983
 
17f05fd
3f5a983
71bdca5
17f05fd
3f5a983
71bdca5
 
17f05fd
 
71bdca5
3f5a983
71bdca5
7a2942e
71bdca5
 
 
17f05fd
71bdca5
17f05fd
71bdca5
 
 
7a2942e
17f05fd
3f5a983
71bdca5
cefc939
17f05fd
3f5a983
 
71bdca5
3f5a983
 
71bdca5
17f05fd
 
 
 
 
 
 
71bdca5
 
17f05fd
71bdca5
 
 
17f05fd
 
0ea5e08
17f05fd
0ea5e08
17f05fd
0ea5e08
71bdca5
3f5a983
71bdca5
 
 
 
3f5a983
0ea5e08
17f05fd
 
0ea5e08
3f5a983
71bdca5
17f05fd
71bdca5
7191e6a
71bdca5
17f05fd
d4dc59a
 
17f05fd
 
d4dc59a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cbb8164
17f05fd
d4dc59a
 
 
 
a2295c2
17f05fd
 
 
 
 
 
 
 
 
 
 
 
d4dc59a
 
 
17f05fd
d4dc59a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17f05fd
d4dc59a
 
 
17f05fd
 
 
 
 
 
 
d4dc59a
 
 
674b968
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import gradio as gr
from PIL import Image, ImageDraw, ImageFont, ImageOps
import base64
import io
import json
import logging
import os
import requests
import struct
import numpy as np
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from gradio_client import Client
from huggingface_hub import InferenceClient

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Constants ---
CREATOR_ENDPOINTS_JSON_URL = "https://huggingface.co/spaces/broadfield-dev/KeyLock-Auth-Creator/raw/main/endpoints.json"
BASE_HF_URL = "https://huggingface.co/spaces/"
CREATOR_SPACE_ID = "broadfield-dev/KeyLock-Auth-Creator"
SERVER_SPACE_ID = "broadfield-dev/KeyLock-Auth-Server"
CREATOR_URL = f"{BASE_HF_URL}{CREATOR_SPACE_ID}"
SERVER_URL = f"{BASE_HF_URL}{SERVER_SPACE_ID}"
CREATOR_APP_PY_URL = f"{CREATOR_URL}/raw/main/app.py"
SERVER_APP_PY_URL = f"{SERVER_URL}/raw/main/app.py"

# NEW: Constants for improved image generation
DEFAULT_IMAGE_URL = "https://images.unsplash.com/photo-1506318137071-a8e063b4bec0?q=80&w=1200&auto=format&fit=crop"
IMAGE_SIZE = 600
T2I_MODEL = "sd-community/sdxl-lightning"
T2I_PROMPT = "A stunning view of a distant galaxy, nebulae, and constellations, digital art, vibrant colors, cinematic lighting, 8k, masterpiece."

# --- Helper Functions for Image Handling ---

def resize_and_crop(img: Image.Image, size: int = IMAGE_SIZE) -> Image.Image:
    """Resizes an image to fit within a square of `size` and then center-crops it."""
    try:
        # ImageOps.fit is perfect for this: it resizes while maintaining aspect ratio and crops from the center.
        return ImageOps.fit(img, (size, size), Image.Resampling.LANCZOS)
    except Exception as e:
        logger.error(f"Failed to resize and crop image: {e}")
        # Fallback to a simple resize if 'fit' fails
        return img.resize((size, size), Image.Resampling.LANCZOS)

def prepare_base_image(uploaded_image: Image.Image | None, progress) -> Image.Image:
    """
    Provides a base image using the fallback logic, updating a Gradio progress object.
    1. User-uploaded image.
    2. Default URL image.
    3. AI-generated image.
    All images are resized and cropped to a standard size.
    """
    # 1. User-uploaded image
    if uploaded_image:
        progress(0, desc="βœ… Using uploaded image...")
        logger.info("Using user-uploaded image.")
        return resize_and_crop(uploaded_image)

    # 2. Default URL image
    try:
        progress(0, desc="⏳ No image uploaded. Fetching default background...")
        logger.info(f"Fetching default image from URL: {DEFAULT_IMAGE_URL}")
        response = requests.get(DEFAULT_IMAGE_URL, timeout=15)
        response.raise_for_status()
        img = Image.open(io.BytesIO(response.content)).convert("RGB")
        progress(0, desc="βœ… Using default background image.")
        return resize_and_crop(img)
    except Exception as e:
        logger.warning(f"Could not fetch default image: {e}. Falling back to AI generation.")

    # 3. AI-generated image (last resort)
    try:
        progress(0, desc=f"⏳ Generating new image with {T2I_MODEL}...")
        logger.info(f"Generating a new image using model: {T2I_MODEL}")
        client = InferenceClient()
        image_bytes = client.text_to_image(T2I_PROMPT, model=T2I_MODEL)
        img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
        progress(0, desc="βœ… New image generated successfully.")
        return resize_and_crop(img)
    except Exception as e:
        logger.error(f"Fatal: All image sources failed. Text-to-image failed with: {e}")
        raise gr.Error(f"Failed to obtain a base image. AI generation error: {e}")

# --- Core Cryptography and Image Creation ---

def generate_rsa_keys():
    private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
    private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('utf-8')
    public_pem = private_key.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8')
    return private_pem, public_pem

def create_encrypted_image(
    secret_data_str: str,
    public_key_pem: str,
    base_image: Image.Image,
    show_key_overlay: bool
) -> Image.Image:
    if not secret_data_str.strip():
        raise ValueError("Secret data cannot be empty.")
    if not public_key_pem.strip():
        raise ValueError("Public Key cannot be empty.")
        
    data_dict = {}
    for line in secret_data_str.splitlines():
        line = line.strip()
        if not line or line.startswith('#'): continue
        parts = line.split(':', 1) if ':' in line else line.split('=', 1)
        if len(parts) == 2:
            data_dict[parts[0].strip()] = parts[1].strip().strip("'\"")
            
    if not data_dict:
        raise ValueError("No valid key-value pairs found in secret data.")
        
    json_bytes = json.dumps(data_dict).encode('utf-8')
    public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
    
    aes_key, nonce = os.urandom(32), os.urandom(12)
    ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
    
    rsa_encrypted_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
    
    encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext
    
    # Use the prepared base_image
    img = base_image.copy().convert("RGB")
    width, height = img.size

    # --- Add Stylish Overlays ---
    draw = ImageDraw.Draw(img, "RGBA") # Use RGBA to draw transparent shapes
    try:
        font_bold = ImageFont.truetype("DejaVuSans-Bold.ttf", 30)
        font_regular = ImageFont.truetype("DejaVuSans.ttf", 15)
        font_small = ImageFont.truetype("DejaVuSans.ttf", 12)
    except IOError:
        font_bold = ImageFont.load_default(size=28)
        font_regular = ImageFont.load_default(size=14)
        font_small = ImageFont.load_default(size=11)

    # Colors matching the Gradio theme (slate, sky)
    overlay_color = (15, 23, 42, 190)  # very dark slate, semi-transparent
    title_color = (226, 232, 240)      # light slate/sky
    key_color = (148, 163, 184)        # slate-400 for keys
    value_color = (241, 245, 249)      # slate-100 for values

    # Main title overlay (top)
    draw.rectangle([0, 20, width, 80], fill=overlay_color)
    draw.text((width / 2, 50), "KeyLock Secure Data", fill=title_color, font=font_bold, anchor="ms")

    # Key/Value data overlay (bottom-left)
    if show_key_overlay:
        box_padding = 15
        line_spacing = 6
        text_start_x = 35
        
        # Calculate box height dynamically
        lines = [f"{key}: {value}" for key, value in data_dict.items()]
        line_heights = [draw.textbbox((0,0), line, font=font_regular)[3] for line in lines]
        total_text_height = sum(line_heights) + (len(lines) - 1) * line_spacing
        box_height = total_text_height + (box_padding * 2)
        box_y0 = height - box_height - 20
        
        draw.rectangle([20, box_y0, width - 20, height - 20], fill=overlay_color)

        current_y = box_y0 + box_padding
        for i, (key, value) in enumerate(data_dict.items()):
            # Draw key and value separately for different colors
            key_text = f"{key}:"
            draw.text((text_start_x, current_y), key_text, fill=key_color, font=font_regular)
            key_bbox = draw.textbbox((text_start_x, current_y), key_text, font=font_regular)
            draw.text((key_bbox[2] + 8, current_y), str(value), fill=value_color, font=font_regular)
            current_y += line_heights[i] + line_spacing

    # --- Steganography: Embed encrypted data into pixels ---
    pixel_data = np.array(img.convert("RGB")).ravel()
    binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload)
    
    if len(binary_payload) > pixel_data.size:
        raise ValueError(f"Data is too large for the image. Max size: {pixel_data.size // 8} bytes. Your data: ~{len(binary_payload) // 8} bytes.")
        
    pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
    stego_pixels = pixel_data.reshape((height, width, 3))
    
    return Image.fromarray(stego_pixels, 'RGB')

# --- Gradio Wrapper Functions ---

def get_server_list():
    status = "Fetching server list from remote config..."
    yield gr.Dropdown(choices=[], value=None, label="⏳ Fetching..."), status, []
    try:
        response = requests.get(CREATOR_ENDPOINTS_JSON_URL, timeout=10)
        response.raise_for_status()
        all_entries = response.json()
        valid_endpoints = [e for e in all_entries if isinstance(e, dict) and "name" in e and "public_key" in e and ("api_endpoint" in e or "link" in e)]
        if not valid_endpoints:
            raise ValueError("No valid server configurations found.")
        endpoint_names = [e['name'] for e in valid_endpoints]
        status = f"βœ… Success! Found {len(endpoint_names)} valid servers."
        yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Server"), status, valid_endpoints
    except Exception as e:
        status = f"❌ Error fetching configuration: {e}"
        logger.error(status)
        yield gr.Dropdown(choices=[], value=None, label="Error fetching servers"), status, []

def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list, uploaded_image: Image.Image | None, show_keys: bool, progress=gr.Progress(track_tqdm=True)):
    if not service_name:
        raise gr.Error("Please select a target server.")
    public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None)
    if not public_key:
        raise gr.Error(f"Could not find public key for '{service_name}'. Please refresh the server list.")
    try:
        # Step 1: Prepare the base image with status updates via the progress object
        base_image = prepare_base_image(uploaded_image, progress)
        progress(0.5, desc="Encrypting and embedding data...")
        
        # Step 2: Create the final encrypted image with overlays
        created_image = create_encrypted_image(secret_data, public_key, base_image, show_keys)
        
        return created_image, f"βœ… Success! Image created for '{service_name}'."
    except Exception as e:
        logger.error(f"Error creating image: {e}", exc_info=True)
        return None, f"❌ Error: {e}"

def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list):
    if not service_name: raise gr.Error("Please select a target server.")
    if image is None: raise gr.Error("Please create or upload an image to send.")
    endpoint_details = next((e for e in available_endpoints if e['name'] == service_name), None)
    if not endpoint_details or not endpoint_details.get('link'): raise gr.Error(f"Config Error for '{service_name}'.")
    
    server_url = endpoint_details['link']
    status = f"Connecting to remote server: {server_url}"
    yield None, status
    
    try:
        with io.BytesIO() as buffer:
            image.save(buffer, format="PNG")
            b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
        
        client = Client(server_url)
        result = client.predict(image_base64_string=b64_string, api_name="/keylock-auth-decoder")
        decrypted_data = json.loads(result) if isinstance(result, str) else result
        yield decrypted_data, "βœ… Success! Data decrypted by remote server."

    except Exception as e:
        logger.error(f"Error calling server with gradio_client: {e}", exc_info=True)
        yield None, f"❌ Error calling server API: {e}"

def refresh_and_update_all():
    for dropdown_update, status_update, state_update in get_server_list(): pass
    return dropdown_update, dropdown_update, status_update, state_update

# --- Gradio UI ---

theme = gr.themes.Base(
    primary_hue=gr.themes.colors.blue, secondary_hue=gr.themes.colors.sky, neutral_hue=gr.themes.colors.slate,
    font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"),
).set(
    body_background_fill="#F1F5F9", panel_background_fill="white", block_background_fill="white",
    block_border_width="1px", block_shadow="*shadow_drop_lg",
    button_primary_background_fill="*primary_600", button_primary_background_fill_hover="*primary_700",
)

with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
    endpoints_state = gr.State([])

    gr.Markdown("# πŸ”‘ KeyLock Operations Dashboard")
    gr.Markdown("A centralized dashboard to manage and demonstrate the entire KeyLock ecosystem. Key/Image creation is performed locally, while decryption is handled by a **live, remote API call** to a secure server.")

    with gr.Tabs() as tabs:
        with gr.TabItem("β‘  Create KeyLock", id=0):
            gr.Markdown("## Step 1: Create an Encrypted Authentication Image (Local)")
            gr.Markdown("## ❗ this is a DEMO, don't send personal information encrypted to the DEMO server")
            gr.Markdown(f"Encrypt your data into a PNG. You can upload your own background image, or we'll provide a stylish default or generate one for you. The encryption process happens entirely in your browser.")
            with gr.Row(variant="panel"):
                with gr.Column(scale=2):
                    with gr.Row():
                        creator_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to encrypt data for.")
                        refresh_button = gr.Button("πŸ”„", scale=0, size="sm")
                    
                    creator_secret_input = gr.Textbox(lines=5, label="Secret Data to Encrypt", placeholder="API_KEY: sk-123...\nUSER: demo-user")
                    
                    gr.Markdown("### Image Options")
                    creator_base_image_input = gr.Image(label="Optional Base Image (600x600 recommended)", type="pil", sources=["upload"], show_download_button=False)
                    creator_show_keys_checkbox = gr.Checkbox(label="Show key/value data on final image", value=True)
                    
                    creator_button = gr.Button("✨ Create Auth Image", variant="primary", scale=2)

                with gr.Column(scale=3):
                    creator_status = gr.Textbox(label="Status", interactive=False, lines=1)
                    creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil", show_download_button=True, format="png", height=600)

        with gr.TabItem("β‘‘ Send KeyLock", id=1):
            gr.Markdown("## Step 2: Decrypt via Live API Call")
            gr.Markdown("This tool acts as the **Client**. It sends the encrypted image you created in Step 1 to the live, remote **Decoder Server** you select. The server uses its securely stored private key to decrypt the data and sends the result back.")
            with gr.Row(variant="panel"):
                with gr.Column(scale=1):
                    gr.Markdown("### Configuration")
                    send_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to send the image to.")
                    gr.Markdown("### Image to Send")
                    client_image_input = gr.Image(type="pil", label="Upload or Drag Encrypted Image Here", sources=["upload", "clipboard"])
                    client_button = gr.Button("πŸ”“ Decrypt via Remote Server", variant="primary")
                with gr.Column(scale=1):
                    gr.Markdown("### Response from Server")
                    client_status = gr.Textbox(label="Status", interactive=False, lines=2)
                    client_json_output = gr.JSON(label="Decrypted Data")
        
        with gr.TabItem("ℹ️ Info & Key Generation", id=2):
            gr.Markdown("## Ecosystem Architecture")
            gr.Markdown(f"This dashboard uses a public [configuration file]({CREATOR_ENDPOINTS_JSON_URL}) to dynamically discover and interact with live services. It demonstrates a secure, decoupled workflow.")
            with gr.Row():
                 with gr.Column():
                    gr.Markdown(f"### 🏭 Auth Creator Service\n- **Space:** [{CREATOR_SPACE_ID}]({CREATOR_URL})\n- **Role:** Provides an API to encrypt data for various targets defined in its `endpoints.json` file.\n- **Source Code:** [app.py]({CREATOR_APP_PY_URL})")
                 with gr.Column():
                    gr.Markdown(f"### πŸ“‘ Decoder Server\n- **Space:** [{SERVER_SPACE_ID}]({SERVER_URL})\n- **Role:** The trusted authority. It holds a secret private key and provides a secure API to decrypt images.\n- **Source Code:** [app.py]({SERVER_APP_PY_URL})")
            
            with gr.Accordion("πŸ”‘ RSA Key Pair Generator", open=False):
                gr.Markdown("Create a new key pair. In a real scenario, you would add the **Public Key** and the server's **API Endpoint URL** to the `endpoints.json` configuration file, and set the **Private Key** as a secret variable in the corresponding server space.")
                with gr.Row():
                    with gr.Column():
                        output_public_key = gr.Textbox(lines=10, label="Generated Public Key", interactive=False, show_copy_button=True)
                    with gr.Column():
                        output_private_key = gr.Textbox(lines=10, label="Generated Private Key", interactive=False, show_copy_button=True)
                gen_keys_button = gr.Button("βš™οΈ Generate New 2048-bit Key Pair", variant="secondary")

    # --- Event Handlers ---
    gen_keys_button.click(fn=generate_rsa_keys, inputs=None, outputs=[output_private_key, output_public_key])
    refresh_button.click(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
    demo.load(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
    
    creator_button.click(
        fn=create_keylock_wrapper,
        inputs=[creator_service_dropdown, creator_secret_input, endpoints_state, creator_base_image_input, creator_show_keys_checkbox],
        outputs=[creator_image_output, creator_status]
    )
    
    client_button.click(fn=send_keylock_wrapper, inputs=[send_service_dropdown, client_image_input, endpoints_state], outputs=[client_json_output, client_status])

if __name__ == "__main__":
    demo.launch(mcp_server=True)