Update app.py
Browse files
app.py
CHANGED
@@ -1,146 +1,3 @@
|
|
1 |
-
import gradio as gr
|
2 |
-
from PIL import Image, ImageDraw, ImageFont
|
3 |
-
import base64
|
4 |
-
import io
|
5 |
-
import json
|
6 |
-
import logging
|
7 |
-
import os
|
8 |
-
import requests
|
9 |
-
import struct
|
10 |
-
import numpy as np
|
11 |
-
from cryptography.hazmat.primitives import serialization
|
12 |
-
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
13 |
-
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
14 |
-
from cryptography.hazmat.primitives import hashes
|
15 |
-
|
16 |
-
# --- Configure Logging ---
|
17 |
-
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
18 |
-
logger = logging.getLogger(__name__)
|
19 |
-
|
20 |
-
# ==============================================================================
|
21 |
-
# CONFIGURATION
|
22 |
-
# ==============================================================================
|
23 |
-
CREATOR_ENDPOINTS_JSON_URL = "https://huggingface.co/spaces/broadfield-dev/KeyLock-Auth-Creator/raw/main/endpoints.json"
|
24 |
-
BASE_HF_URL = "https://huggingface.co/spaces/"
|
25 |
-
# Define these here for cleaner linking in the UI
|
26 |
-
CREATOR_SPACE_ID = "broadfield-dev/KeyLock-Auth-Creator"
|
27 |
-
SERVER_SPACE_ID = "broadfield-dev/KeyLock-Auth-Server"
|
28 |
-
CREATOR_URL = f"{BASE_HF_URL}{CREATOR_SPACE_ID}"
|
29 |
-
SERVER_URL = f"{BASE_HF_URL}{SERVER_SPACE_ID}"
|
30 |
-
|
31 |
-
# ==============================================================================
|
32 |
-
# LOCAL LOGIC (Key and Image Generation)
|
33 |
-
# ==============================================================================
|
34 |
-
def generate_rsa_keys():
|
35 |
-
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
36 |
-
private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('utf-8')
|
37 |
-
public_pem = private_key.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8')
|
38 |
-
return private_pem, public_pem
|
39 |
-
|
40 |
-
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
|
41 |
-
if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.")
|
42 |
-
if not public_key_pem.strip(): raise ValueError("Public Key cannot be empty.")
|
43 |
-
data_dict = {}
|
44 |
-
for line in secret_data_str.splitlines():
|
45 |
-
line = line.strip()
|
46 |
-
if not line or line.startswith('#'): continue
|
47 |
-
parts = line.split(':', 1) if ':' in line else line.split('=', 1)
|
48 |
-
if len(parts) == 2: data_dict[parts[0].strip()] = parts[1].strip().strip("'\"")
|
49 |
-
if not data_dict: raise ValueError("No valid key-value pairs found.")
|
50 |
-
json_bytes = json.dumps(data_dict).encode('utf-8')
|
51 |
-
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
|
52 |
-
aes_key, nonce = os.urandom(32), os.urandom(12)
|
53 |
-
ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
|
54 |
-
rsa_encrypted_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
|
55 |
-
encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext
|
56 |
-
img = Image.new('RGB', (800, 600), color=(45, 52, 54))
|
57 |
-
draw = ImageDraw.Draw(img)
|
58 |
-
try: font = ImageFont.truetype("DejaVuSans.ttf", 40)
|
59 |
-
except IOError: font = ImageFont.load_default(size=30)
|
60 |
-
draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
|
61 |
-
pixel_data = np.array(img.convert("RGB")).ravel()
|
62 |
-
binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload)
|
63 |
-
if len(binary_payload) > pixel_data.size: raise ValueError("Data too large for image.")
|
64 |
-
pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
|
65 |
-
stego_pixels = pixel_data.reshape((600, 800, 3))
|
66 |
-
return Image.fromarray(stego_pixels, 'RGB')
|
67 |
-
|
68 |
-
# ==============================================================================
|
69 |
-
# UI HELPER & REMOTE API CALL LOGIC
|
70 |
-
# ==============================================================================
|
71 |
-
|
72 |
-
def get_server_list():
|
73 |
-
"""Fetches and validates the list of servers from the public JSON file."""
|
74 |
-
status = f"Fetching server list from remote config..."
|
75 |
-
yield gr.Dropdown(choices=[], value=None, label="β³ Fetching..."), status, []
|
76 |
-
try:
|
77 |
-
response = requests.get(ENDPOINTS_JSON_URL, timeout=10)
|
78 |
-
response.raise_for_status()
|
79 |
-
|
80 |
-
all_entries = response.json()
|
81 |
-
valid_endpoints = []
|
82 |
-
required_keys = ["name", "api_endpoint", "public_key"]
|
83 |
-
|
84 |
-
# --- ROBUST VALIDATION LOGIC ---
|
85 |
-
for entry in all_entries:
|
86 |
-
if all(key in entry for key in required_keys):
|
87 |
-
valid_endpoints.append(entry)
|
88 |
-
else:
|
89 |
-
logger.warning(f"Skipping invalid entry in configuration file: {entry}")
|
90 |
-
|
91 |
-
if not valid_endpoints:
|
92 |
-
raise ValueError("No valid server configurations found in the remote file.")
|
93 |
-
|
94 |
-
endpoint_names = [e['name'] for e in valid_endpoints]
|
95 |
-
status = f"β
Success! Found {len(endpoint_names)} valid servers."
|
96 |
-
yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Server"), status, valid_endpoints
|
97 |
-
except Exception as e:
|
98 |
-
status = f"β Error fetching or parsing configuration: {e}"
|
99 |
-
yield gr.Dropdown(choices=[], value=None, label="Error fetching servers"), status, []
|
100 |
-
|
101 |
-
def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list):
|
102 |
-
"""UI wrapper for creating an image for a selected service."""
|
103 |
-
if not service_name: raise gr.Error("Please select a target server.")
|
104 |
-
public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None)
|
105 |
-
if not public_key: raise gr.Error(f"Could not find public key for '{service_name}'.")
|
106 |
-
try:
|
107 |
-
created_image = create_encrypted_image(secret_data, public_key)
|
108 |
-
return created_image, f"β
Success! Image created for '{service_name}'."
|
109 |
-
except Exception as e:
|
110 |
-
return None, f"β Error: {e}"
|
111 |
-
|
112 |
-
def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list):
|
113 |
-
"""UI wrapper for sending an image to a selected server API."""
|
114 |
-
if not service_name: raise gr.Error("Please select a target server.")
|
115 |
-
if image is None: raise gr.Error("Please upload an image to send.")
|
116 |
-
|
117 |
-
api_endpoint = next((e.get('api_endpoint') for e in available_endpoints if e['name'] == service_name), None)
|
118 |
-
if not api_endpoint: raise gr.Error(f"Configuration Error: Could not find 'api_endpoint' for '{service_name}'.")
|
119 |
-
|
120 |
-
status = f"Connecting to endpoint: {api_endpoint}"
|
121 |
-
yield None, status
|
122 |
-
|
123 |
-
try:
|
124 |
-
with io.BytesIO() as buffer:
|
125 |
-
image.save(buffer, format="PNG")
|
126 |
-
b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
|
127 |
-
|
128 |
-
payload = {"data": [b64_string]}
|
129 |
-
headers = {"Content-Type": "application/json"}
|
130 |
-
response = requests.post(api_endpoint, headers=headers, json=payload, timeout=45)
|
131 |
-
response_json = response.json()
|
132 |
-
|
133 |
-
if response.status_code == 200:
|
134 |
-
if "data" in response_json:
|
135 |
-
yield response_json["data"][0], "β
Success! Data decrypted by remote server."
|
136 |
-
else:
|
137 |
-
raise gr.Error(f"API returned an unexpected success format: {response_json}")
|
138 |
-
else:
|
139 |
-
raise gr.Error(f"API Error (Status {response.status_code}): {response_json.get('error', 'Unknown error')}")
|
140 |
-
|
141 |
-
except Exception as e:
|
142 |
-
yield None, f"β Error calling server API: {e}"
|
143 |
-
|
144 |
# ==============================================================================
|
145 |
# GRADIO DASHBOARD INTERFACE
|
146 |
# ==============================================================================
|
@@ -154,6 +11,7 @@ theme = gr.themes.Base(
|
|
154 |
)
|
155 |
|
156 |
with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
|
|
|
157 |
endpoints_state = gr.State([])
|
158 |
|
159 |
gr.Markdown("# π KeyLock Operations Dashboard")
|
@@ -161,14 +19,13 @@ with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
|
|
161 |
|
162 |
with gr.Tabs() as tabs:
|
163 |
with gr.TabItem("β Create KeyLock", id=0):
|
164 |
-
# ... UI layout is the same as your working version ...
|
165 |
gr.Markdown("## Step 1: Create an Encrypted Authentication Image (Local)")
|
166 |
-
gr.Markdown(f"This tool acts as the **Auth Creator**. It fetches a list of available servers from a public [configuration file]({
|
167 |
with gr.Row(variant="panel"):
|
168 |
with gr.Column(scale=2):
|
169 |
with gr.Row():
|
170 |
creator_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to encrypt data for.")
|
171 |
-
refresh_button = gr.Button("π", scale=0, size="sm")
|
172 |
creator_secret_input = gr.Textbox(lines=8, label="Secret Data to Encrypt", placeholder="API_KEY: sk-123...\nUSER: demo-user")
|
173 |
creator_button = gr.Button("β¨ Create Auth Image", variant="primary")
|
174 |
with gr.Column(scale=1):
|
@@ -176,7 +33,6 @@ with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
|
|
176 |
creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil", show_download_button=True, format="png")
|
177 |
|
178 |
with gr.TabItem("β‘ Send KeyLock", id=1):
|
179 |
-
# ... UI layout is the same as your working version ...
|
180 |
gr.Markdown("## Step 2: Decrypt via Live API Call")
|
181 |
gr.Markdown("This tool acts as the **Client**. It sends the encrypted image you created in Step 1 to the live, remote **Decoder Server** you select from the same configuration list. The server uses its securely stored private key to decrypt the data and sends the result back.")
|
182 |
with gr.Row(variant="panel"):
|
@@ -192,9 +48,14 @@ with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
|
|
192 |
client_json_output = gr.JSON(label="Decrypted Data")
|
193 |
|
194 |
with gr.TabItem("βΉοΈ Info & Key Generation", id=2):
|
195 |
-
# ... UI layout is the same as your working version ...
|
196 |
gr.Markdown("## Ecosystem Architecture")
|
197 |
-
gr.Markdown(f"This dashboard uses a public [configuration file]({
|
|
|
|
|
|
|
|
|
|
|
|
|
198 |
with gr.Accordion("π RSA Key Pair Generator", open=False):
|
199 |
gr.Markdown("Create a new key pair. In a real scenario, you would add the **Public Key** and the server's **API Endpoint URL** to the `endpoints.json` configuration file, and set the **Private Key** as a secret variable in the corresponding server space.")
|
200 |
with gr.Row():
|
@@ -209,8 +70,10 @@ with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
|
|
209 |
|
210 |
def refresh_and_update_all():
|
211 |
# The generator must be iterated to get its final values
|
|
|
212 |
for dropdown_update, status_update, state_update in get_server_list():
|
213 |
pass
|
|
|
214 |
return dropdown_update, dropdown_update, status_update, state_update
|
215 |
|
216 |
refresh_button.click(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
# ==============================================================================
|
2 |
# GRADIO DASHBOARD INTERFACE
|
3 |
# ==============================================================================
|
|
|
11 |
)
|
12 |
|
13 |
with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
|
14 |
+
# This State component will hold the list of dicts fetched from the JSON config
|
15 |
endpoints_state = gr.State([])
|
16 |
|
17 |
gr.Markdown("# π KeyLock Operations Dashboard")
|
|
|
19 |
|
20 |
with gr.Tabs() as tabs:
|
21 |
with gr.TabItem("β Create KeyLock", id=0):
|
|
|
22 |
gr.Markdown("## Step 1: Create an Encrypted Authentication Image (Local)")
|
23 |
+
gr.Markdown(f"This tool acts as the **Auth Creator**. It fetches a list of available servers from a public [configuration file]({ENDPOINTS_JSON_URL}), then uses the selected server's public key to encrypt your data into a PNG. **This entire creation process happens locally.**")
|
24 |
with gr.Row(variant="panel"):
|
25 |
with gr.Column(scale=2):
|
26 |
with gr.Row():
|
27 |
creator_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to encrypt data for.")
|
28 |
+
refresh_button = gr.Button("π", scale=0, size="sm", tooltip="Refresh Server List from Config File")
|
29 |
creator_secret_input = gr.Textbox(lines=8, label="Secret Data to Encrypt", placeholder="API_KEY: sk-123...\nUSER: demo-user")
|
30 |
creator_button = gr.Button("β¨ Create Auth Image", variant="primary")
|
31 |
with gr.Column(scale=1):
|
|
|
33 |
creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil", show_download_button=True, format="png")
|
34 |
|
35 |
with gr.TabItem("β‘ Send KeyLock", id=1):
|
|
|
36 |
gr.Markdown("## Step 2: Decrypt via Live API Call")
|
37 |
gr.Markdown("This tool acts as the **Client**. It sends the encrypted image you created in Step 1 to the live, remote **Decoder Server** you select from the same configuration list. The server uses its securely stored private key to decrypt the data and sends the result back.")
|
38 |
with gr.Row(variant="panel"):
|
|
|
48 |
client_json_output = gr.JSON(label="Decrypted Data")
|
49 |
|
50 |
with gr.TabItem("βΉοΈ Info & Key Generation", id=2):
|
|
|
51 |
gr.Markdown("## Ecosystem Architecture")
|
52 |
+
gr.Markdown(f"This dashboard uses a public [configuration file]({ENDPOINTS_JSON_URL}) to dynamically discover and interact with live services. It demonstrates a secure, decoupled workflow.")
|
53 |
+
with gr.Row():
|
54 |
+
with gr.Column():
|
55 |
+
gr.Markdown(f"### π Auth Creator Service\n- **Space:** [{CREATOR_SPACE_ID}]({CREATOR_URL})\n- **Role:** Provides an API to encrypt data for various targets defined in its `endpoints.json` file.\n- **Source Code:** [app.py]({CREATOR_APP_PY_URL})")
|
56 |
+
with gr.Column():
|
57 |
+
gr.Markdown(f"### π‘ Decoder Server\n- **Space:** [{SERVER_SPACE_ID}]({SERVER_URL})\n- **Role:** The trusted authority. It holds a secret private key and provides a secure API to decrypt images.\n- **Source Code:** [app.py]({SERVER_APP_PY_URL})")
|
58 |
+
|
59 |
with gr.Accordion("π RSA Key Pair Generator", open=False):
|
60 |
gr.Markdown("Create a new key pair. In a real scenario, you would add the **Public Key** and the server's **API Endpoint URL** to the `endpoints.json` configuration file, and set the **Private Key** as a secret variable in the corresponding server space.")
|
61 |
with gr.Row():
|
|
|
70 |
|
71 |
def refresh_and_update_all():
|
72 |
# The generator must be iterated to get its final values
|
73 |
+
# The 'pass' allows the loop to complete, and we use the final yielded values.
|
74 |
for dropdown_update, status_update, state_update in get_server_list():
|
75 |
pass
|
76 |
+
# Update both dropdowns and the state
|
77 |
return dropdown_update, dropdown_update, status_update, state_update
|
78 |
|
79 |
refresh_button.click(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
|