File size: 13,787 Bytes
5cf429e ac072a2 b1bd8c8 ac072a2 b1bd8c8 ac072a2 b1bd8c8 ac072a2 b1bd8c8 ac072a2 b1bd8c8 5cf429e ac072a2 b1bd8c8 ac072a2 b1bd8c8 5cf429e b1bd8c8 ac072a2 b1bd8c8 ac072a2 b1bd8c8 b04a886 b1bd8c8 b04a886 b1bd8c8 ac072a2 b1bd8c8 ac072a2 1770918 ac072a2 1770918 ac072a2 1770918 ac072a2 1770918 ac072a2 b04a886 ac072a2 b1bd8c8 5cf429e b1bd8c8 1770918 b1bd8c8 1770918 b1bd8c8 1770918 94b8a3f 1770918 5cf429e ac072a2 5cf429e d1be50a 5cf429e d1be50a 5cf429e 1770918 5cf429e b04a886 5cf429e 7191e6a 1770918 59870e8 94b8a3f 5cf429e 1770918 b04a886 1770918 eb48225 5cf429e 94b8a3f 1770918 b04a886 94b8a3f 5cf429e b04a886 1770918 b04a886 eb48225 1770918 5cf429e 1770918 94b8a3f 1770918 b04a886 59870e8 1770918 b04a886 1770918 7191e6a b1bd8c8 5cf429e 1770918 b04a886 1770918 7191e6a 5cf429e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
import gradio as gr
from gradio_client import Client
from PIL import Image, ImageDraw, ImageFont
import base64
import io
import json
import logging
import os
import requests
import tempfile
import struct
import numpy as np
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
# --- Configure Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ==============================================================================
# CONFIGURATION: IDs AND URLs OF THE REMOTE SERVICES
# ==============================================================================
CREATOR_SPACE_ID = "broadfield-dev/KeyLock-Auth-Creator"
SERVER_SPACE_ID = "broadfield-dev/KeyLock-Auth-Server"
# URL to the raw JSON file containing the list of public keys and services.
CREATOR_ENDPOINTS_JSON_URL = "https://huggingface.co/spaces/broadfield-dev/KeyLock-Auth-Creator/raw/main/endpoints.json"
# Construct URLs for linking in documentation
BASE_HF_URL = "https://huggingface.co/spaces/"
CREATOR_URL = f"{BASE_HF_URL}{CREATOR_SPACE_ID}"
SERVER_URL = f"{BASE_HF_URL}{SERVER_SPACE_ID}"
CREATOR_APP_PY_URL = f"{CREATOR_URL}/blob/main/app.py"
SERVER_APP_PY_URL = f"{SERVER_URL}/blob/main/app.py"
# ==============================================================================
# LOCAL LOGIC (Key and Image Generation)
# ==============================================================================
def generate_rsa_keys():
"""Generates a new 2048-bit RSA key pair LOCALLY."""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
return private_pem, public_pem
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
"""Creates the encrypted image LOCALLY."""
if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.")
if not public_key_pem.strip(): raise ValueError("Public Key cannot be empty.")
# --- THIS IS THE FIX ---
# Replaced the failing dictionary comprehension with a standard, readable for loop.
data_dict = {}
for line in secret_data_str.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split(':', 1) if ':' in line else line.split('=', 1)
if len(parts) == 2:
key = parts[0].strip()
value = parts[1].strip().strip("'\"")
data_dict[key] = value
if not data_dict: raise ValueError("No valid key-value pairs found.")
json_bytes = json.dumps(data_dict).encode('utf-8')
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
aes_key, nonce = os.urandom(32), os.urandom(12)
ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
rsa_encrypted_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext
img = Image.new('RGB', (800, 600), color=(45, 52, 54))
draw = ImageDraw.Draw(img)
try: font = ImageFont.truetype("DejaVuSans.ttf", 40)
except IOError: font = ImageFont.load_default(size=30)
draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
pixel_data = np.array(img.convert("RGB")).ravel()
binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload)
if len(binary_payload) > pixel_data.size: raise ValueError("Data too large for image.")
pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
stego_pixels = pixel_data.reshape((600, 800, 3))
return Image.fromarray(stego_pixels, 'RGB')
# ==============================================================================
# UI HELPER & REMOTE API CALL LOGIC (Your working versions)
# ==============================================================================
def get_creator_endpoints():
"""Fetches the list of supported endpoints by making an HTTP request to the Creator's JSON file."""
status = f"Fetching endpoint list from {CREATOR_ENDPOINTS_JSON_URL}..."
yield gr.Dropdown(choices=[], value=None, label="β³ Fetching..."), status, [] # Initial state
try:
response = requests.get(CREATOR_ENDPOINTS_JSON_URL, timeout=10)
response.raise_for_status()
endpoints = response.json()
endpoint_names = [e['name'] for e in endpoints]
status = f"β
Success! Found {len(endpoint_names)} endpoints."
yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Service"), status, endpoints
except Exception as e:
status = f"β Error: Could not fetch configuration. Check the URL and if the 'endpoints.json' file is public. Details: {e}"
yield gr.Dropdown(choices=[], value=None, label="Error fetching services"), status, []
def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list):
"""UI wrapper for creating an image for a selected service."""
if not service_name: raise gr.Error("Please select a target server.")
public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None)
if not public_key: raise gr.Error(f"Could not find public key for '{service_name}'.")
try:
created_image = create_encrypted_image(secret_data, public_key)
return created_image, f"β
Success! Image created for '{service_name}'."
except Exception as e:
return None, f"β Error: {e}"
def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list):
"""UI wrapper for sending an image to a selected server API."""
if not service_name: raise gr.Error("Please select a target server.")
if image is None: raise gr.Error("Please upload an image to send.")
api_endpoint = next((e.get('api_endpoint') for e in available_endpoints if e['name'] == service_name), None)
if not api_endpoint:
raise gr.Error(f"Configuration Error: Could not find 'api_endpoint' for '{service_name}'.")
status = f"Connecting to endpoint: {api_endpoint}"
yield None, status
try:
with io.BytesIO() as buffer:
image.save(buffer, format="PNG")
b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
payload = {"data": [b64_string]}
headers = {"Content-Type": "application/json"}
response = requests.post(api_endpoint, headers=headers, json=payload, timeout=45)
response_json = response.json()
if response.status_code == 200:
if "data" in response_json:
return response_json["data"][0], "β
Success! Data decrypted by remote server."
else:
raise gr.Error(f"API returned an unexpected success format: {response_json}")
else:
raise gr.Error(f"API Error (Status {response.status_code}): {response_json.get('error', 'Unknown error')}")
except Exception as e:
return None, f"β Error calling server API: {e}"
# ==============================================================================
# GRADIO DASHBOARD INTERFACE (Your working layout)
# ==============================================================================
theme = gr.themes.Base(
primary_hue=gr.themes.colors.blue, secondary_hue=gr.themes.colors.sky, neutral_hue=gr.themes.colors.slate,
font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"),
).set(
body_background_fill="#F1F5F9", panel_background_fill="white", block_background_fill="white",
block_border_width="1px", block_shadow="*shadow_drop_lg",
button_primary_background_fill="*primary_600", button_primary_background_fill_hover="*primary_700",
)
with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
endpoints_state = gr.State([])
gr.Markdown("# π KeyLock Operations Dashboard")
gr.Markdown("A centralized dashboard to manage and demonstrate the entire KeyLock ecosystem. Key/Image creation is performed locally, while decryption is handled by a **live, remote API call** to a secure server.")
with gr.Tabs() as tabs:
with gr.TabItem("β Create KeyLock", id=0):
gr.Markdown("## Step 1: Create an Encrypted Authentication Image (Local)")
gr.Markdown(f"This tool acts as the **Auth Creator**. It fetches a list of available servers from a public [configuration file]({CREATOR_ENDPOINTS_JSON_URL}), then uses the selected server's public key to encrypt your data into a PNG. **This entire creation process happens locally.**")
with gr.Row(variant="panel"):
with gr.Column(scale=2):
with gr.Row():
creator_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to encrypt data for.")
refresh_button = gr.Button("π", scale=0, size="sm", tooltip="Refresh Server List from Config File")
creator_secret_input = gr.Textbox(lines=8, label="Secret Data to Encrypt", placeholder="API_KEY: sk-123...\nUSER: demo-user")
creator_button = gr.Button("β¨ Create Auth Image", variant="primary")
with gr.Column(scale=1):
creator_status = gr.Textbox(label="Status", interactive=False, lines=2)
creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil", show_download_button=True, format="png")
with gr.TabItem("β‘ Send KeyLock", id=1):
gr.Markdown("## Step 2: Decrypt via Live API Call")
gr.Markdown("This tool acts as the **Client**. It sends the encrypted image you created in Step 1 to the live, remote **Decoder Server** you select from the same configuration list. The server uses its securely stored private key to decrypt the data and sends the result back.")
with gr.Row(variant="panel"):
with gr.Column(scale=1):
gr.Markdown("### Configuration")
send_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to send the image to.")
gr.Markdown("### Image to Send")
client_image_input = gr.Image(type="pil", label="Upload or Drag Encrypted Image Here", sources=["upload", "clipboard"])
client_button = gr.Button("π Decrypt via Remote Server", variant="primary")
with gr.Column(scale=1):
gr.Markdown("### Response from Server")
client_status = gr.Textbox(label="Status", interactive=False, lines=2)
client_json_output = gr.JSON(label="Decrypted Data")
with gr.TabItem("βΉοΈ Info & Key Generation", id=2):
gr.Markdown("## Ecosystem Architecture")
gr.Markdown(f"This dashboard uses a public [configuration file]({CREATOR_ENDPOINTS_JSON_URL}) to dynamically discover and interact with live services. It demonstrates a secure, decoupled workflow.")
with gr.Accordion("π RSA Key Pair Generator", open=False):
gr.Markdown("Create a new key pair. In a real scenario, you would add the **Public Key** and the server's **API Endpoint URL** to the `endpoints.json` configuration file, and set the **Private Key** as a secret variable in the corresponding server space.")
with gr.Row():
with gr.Column():
output_public_key = gr.Textbox(lines=10, label="Generated Public Key", interactive=False, show_copy_button=True)
with gr.Column():
output_private_key = gr.Textbox(lines=10, label="Generated Private Key", interactive=False, show_copy_button=True)
gen_keys_button = gr.Button("βοΈ Generate New 2048-bit Key Pair", variant="secondary")
# --- Wire up the component logic ---
gen_keys_button.click(fn=generate_rsa_keys, inputs=None, outputs=[output_private_key, output_public_key])
def refresh_and_update_all():
dropdown_update, status_update, state_update = get_server_list()
return dropdown_update, dropdown_update, status_update, state_update
refresh_button.click(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
demo.load(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
creator_button.click(fn=create_keylock_wrapper, inputs=[creator_service_dropdown, creator_secret_input, endpoints_state], outputs=[creator_image_output, creator_status])
client_button.click(fn=send_keylock_wrapper, inputs=[send_service_dropdown, client_image_input, endpoints_state], outputs=[client_json_output, client_status])
if __name__ == "__main__":
demo.launch() |