|
import gradio as gr |
|
from PIL import Image, ImageDraw, ImageFont |
|
import base64 |
|
import io |
|
import json |
|
import logging |
|
import os |
|
import requests |
|
import struct |
|
import numpy as np |
|
from cryptography.hazmat.primitives import serialization |
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding |
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM |
|
from cryptography.hazmat.primitives import hashes |
|
from gradio_client import Client |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
CREATOR_ENDPOINTS_JSON_URL = "https://huggingface.co/spaces/broadfield-dev/KeyLock-Auth-Creator/raw/main/endpoints.json" |
|
BASE_HF_URL = "https://huggingface.co/spaces/" |
|
CREATOR_SPACE_ID = "broadfield-dev/KeyLock-Auth-Creator" |
|
SERVER_SPACE_ID = "broadfield-dev/KeyLock-Auth-Server" |
|
CREATOR_URL = f"{BASE_HF_URL}{CREATOR_SPACE_ID}" |
|
SERVER_URL = f"{BASE_HF_URL}{SERVER_SPACE_ID}" |
|
CREATOR_APP_PY_URL = f"{CREATOR_URL}/raw/main/app.py" |
|
SERVER_APP_PY_URL = f"{SERVER_URL}/raw/main/app.py" |
|
|
|
|
|
def generate_rsa_keys(): |
|
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) |
|
private_pem = private_key.private_bytes( |
|
encoding=serialization.Encoding.PEM, |
|
format=serialization.PrivateFormat.PKCS8, |
|
encryption_algorithm=serialization.NoEncryption() |
|
).decode('utf-8') |
|
public_pem = private_key.public_key().public_bytes( |
|
encoding=serialization.Encoding.PEM, |
|
format=serialization.PublicFormat.SubjectPublicKeyInfo |
|
).decode('utf-8') |
|
return private_pem, public_pem |
|
|
|
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image: |
|
if not secret_data_str.strip(): |
|
raise ValueError("Secret data cannot be empty.") |
|
if not public_key_pem.strip(): |
|
raise ValueError("Public Key cannot be empty.") |
|
|
|
data_dict = {} |
|
for line in secret_data_str.splitlines(): |
|
line = line.strip() |
|
if not line or line.startswith('#'): |
|
continue |
|
parts = line.split(':', 1) if ':' in line else line.split('=', 1) |
|
if len(parts) == 2: |
|
data_dict[parts[0].strip()] = parts[1].strip().strip("'\"") |
|
|
|
if not data_dict: |
|
raise ValueError("No valid key-value pairs found in secret data.") |
|
|
|
json_bytes = json.dumps(data_dict).encode('utf-8') |
|
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8')) |
|
|
|
aes_key, nonce = os.urandom(32), os.urandom(12) |
|
ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None) |
|
|
|
rsa_encrypted_key = public_key.encrypt( |
|
aes_key, |
|
padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None) |
|
) |
|
|
|
encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext |
|
|
|
img = Image.new('RGB', (800, 600), color=(45, 52, 54)) |
|
draw = ImageDraw.Draw(img) |
|
try: |
|
font = ImageFont.truetype("DejaVuSans.ttf", 40) |
|
except IOError: |
|
font = ImageFont.load_default(size=30) |
|
draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms") |
|
|
|
pixel_data = np.array(img.convert("RGB")).ravel() |
|
binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload) |
|
|
|
if len(binary_payload) > pixel_data.size: |
|
raise ValueError(f"Data is too large for the image. Max size: {pixel_data.size // 8} bytes.") |
|
|
|
pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8) |
|
stego_pixels = pixel_data.reshape((600, 800, 3)) |
|
|
|
return Image.fromarray(stego_pixels, 'RGB') |
|
|
|
def get_server_list(): |
|
status = "Fetching server list from remote config..." |
|
yield gr.Dropdown(choices=[], value=None, label="β³ Fetching..."), status, [] |
|
try: |
|
response = requests.get(CREATOR_ENDPOINTS_JSON_URL, timeout=10) |
|
response.raise_for_status() |
|
all_entries = response.json() |
|
valid_endpoints = [] |
|
|
|
for entry in all_entries: |
|
if not isinstance(entry, dict) or "name" not in entry or "public_key" not in entry: |
|
logger.warning(f"Skipping invalid entry (missing name or public_key): {entry}") |
|
continue |
|
|
|
if "api_endpoint" not in entry: |
|
if "link" in entry: |
|
base_url = entry["link"].strip("/") |
|
entry["api_endpoint"] = f"{base_url}/run/predict/" |
|
logger.info(f"Auto-generating API endpoint for '{entry['name']}': {entry['api_endpoint']}") |
|
else: |
|
logger.warning(f"Skipping invalid entry '{entry['name']}' (missing 'api_endpoint' and 'link'): {entry}") |
|
continue |
|
|
|
valid_endpoints.append(entry) |
|
|
|
if not valid_endpoints: |
|
raise ValueError("No valid server configurations found in the remote file.") |
|
|
|
endpoint_names = [e['name'] for e in valid_endpoints] |
|
status = f"β
Success! Found {len(endpoint_names)} valid servers." |
|
yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Server"), status, valid_endpoints |
|
except Exception as e: |
|
status = f"β Error fetching or parsing configuration: {e}" |
|
logger.error(status) |
|
yield gr.Dropdown(choices=[], value=None, label="Error fetching servers"), status, [] |
|
|
|
def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list): |
|
if not service_name: |
|
raise gr.Error("Please select a target server.") |
|
public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None) |
|
if not public_key: |
|
raise gr.Error(f"Could not find public key for '{service_name}'. Please refresh the server list.") |
|
try: |
|
created_image = create_encrypted_image(secret_data, public_key) |
|
return created_image, f"β
Success! Image created for '{service_name}'." |
|
except Exception as e: |
|
logger.error(f"Error creating image: {e}") |
|
return None, f"β Error: {e}" |
|
|
|
def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list): |
|
if not service_name: |
|
raise gr.Error("Please select a target server.") |
|
if image is None: |
|
raise gr.Error("Please create or upload an image to send.") |
|
|
|
endpoint_details = next((e for e in available_endpoints if e['name'] == service_name), None) |
|
if not endpoint_details: |
|
raise gr.Error(f"Configuration Error: Could not find details for '{service_name}'. Refresh the list.") |
|
|
|
server_url = endpoint_details.get('link') |
|
if not server_url: |
|
raise gr.Error(f"Configuration Error: The selected server '{service_name}' is missing a 'link' to its Space.") |
|
|
|
status = f"Connecting to remote server: {server_url}" |
|
yield None, status |
|
|
|
try: |
|
with io.BytesIO() as buffer: |
|
image.save(buffer, format="PNG") |
|
b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8") |
|
|
|
client = Client(server_url) |
|
result = client.predict( |
|
image_base64_string=b64_string, |
|
api_name="/keylock-auth-decoder" |
|
) |
|
|
|
decrypted_data = result |
|
if isinstance(decrypted_data, str): |
|
try: |
|
decrypted_data = json.loads(decrypted_data) |
|
except json.JSONDecodeError: |
|
pass |
|
yield decrypted_data, "β
Success! Data decrypted by remote server." |
|
|
|
except Exception as e: |
|
logger.error(f"Error calling server with gradio_client: {e}") |
|
yield None, f"β Error calling server API: {e}" |
|
|
|
def refresh_and_update_all(): |
|
for dropdown_update, status_update, state_update in get_server_list(): |
|
pass |
|
return dropdown_update, dropdown_update, status_update, state_update |
|
|
|
theme = gr.themes.Base( |
|
primary_hue=gr.themes.colors.blue, secondary_hue=gr.themes.colors.sky, neutral_hue=gr.themes.colors.slate, |
|
font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"), |
|
).set( |
|
body_background_fill="#F1F5F9", panel_background_fill="white", block_background_fill="white", |
|
block_border_width="1px", block_shadow="*shadow_drop_lg", |
|
button_primary_background_fill="*primary_600", button_primary_background_fill_hover="*primary_700", |
|
) |
|
|
|
with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo: |
|
endpoints_state = gr.State([]) |
|
|
|
gr.Markdown("# π KeyLock Operations Dashboard") |
|
gr.Markdown("# β DEMO: Don't send personal information encrypted to the demo server") |
|
gr.Markdown("A centralized dashboard to manage and demonstrate the entire KeyLock ecosystem. Key/Image creation is performed locally, while decryption is handled by a **live, remote API call** to a secure server.") |
|
|
|
with gr.Tabs() as tabs: |
|
with gr.TabItem("β Create KeyLock", id=0): |
|
gr.Markdown("## Step 1: Create an Encrypted Authentication Image (Local)") |
|
gr.Markdown(f"This tool acts as the **Auth Creator**. It fetches a list of available servers from a public [configuration file]({CREATOR_ENDPOINTS_JSON_URL}), then uses the selected server's public key to encrypt your data into a PNG. **This entire creation process happens locally.**") |
|
with gr.Row(variant="panel"): |
|
with gr.Column(scale=2): |
|
with gr.Row(): |
|
creator_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to encrypt data for.") |
|
refresh_button = gr.Button("π", scale=0, size="sm") |
|
creator_secret_input = gr.Textbox(lines=8, label="Secret Data to Encrypt", placeholder="API_KEY: sk-123...\nUSER: demo-user") |
|
creator_button = gr.Button("β¨ Create Auth Image", variant="primary") |
|
with gr.Column(scale=1): |
|
creator_status = gr.Textbox(label="Status", interactive=False, lines=2) |
|
creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil", show_download_button=True, format="png") |
|
|
|
with gr.TabItem("β‘ Send KeyLock", id=1): |
|
gr.Markdown("## Step 2: Decrypt via Live API Call") |
|
gr.Markdown("This tool acts as the **Client**. It sends the encrypted image you created in Step 1 to the live, remote **Decoder Server** you select from the same configuration list. The server uses its securely stored private key to decrypt the data and sends the result back.") |
|
with gr.Row(variant="panel"): |
|
with gr.Column(scale=1): |
|
gr.Markdown("### Configuration") |
|
send_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to send the image to.") |
|
gr.Markdown("### Image to Send") |
|
client_image_input = gr.Image(type="pil", label="Upload or Drag Encrypted Image Here", sources=["upload", "clipboard"]) |
|
client_button = gr.Button("π Decrypt via Remote Server", variant="primary") |
|
with gr.Column(scale=1): |
|
gr.Markdown("### Response from Server") |
|
client_status = gr.Textbox(label="Status", interactive=False, lines=2) |
|
client_json_output = gr.JSON(label="Decrypted Data") |
|
|
|
with gr.TabItem("βΉοΈ Info & Key Generation", id=2): |
|
gr.Markdown("## Ecosystem Architecture") |
|
gr.Markdown(f"This dashboard uses a public [configuration file]({CREATOR_ENDPOINTS_JSON_URL}) to dynamically discover and interact with live services. It demonstrates a secure, decoupled workflow.") |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown(f"### π Auth Creator Service\n- **Space:** [{CREATOR_SPACE_ID}]({CREATOR_URL})\n- **Role:** Provides an API to encrypt data for various targets defined in its `endpoints.json` file.\n- **Source Code:** [app.py]({CREATOR_APP_PY_URL})") |
|
with gr.Column(): |
|
gr.Markdown(f"### π‘ Decoder Server\n- **Space:** [{SERVER_SPACE_ID}]({SERVER_URL})\n- **Role:** The trusted authority. It holds a secret private key and provides a secure API to decrypt images.\n- **Source Code:** [app.py]({SERVER_APP_PY_URL})") |
|
|
|
with gr.Accordion("π RSA Key Pair Generator", open=False): |
|
gr.Markdown("Create a new key pair. In a real scenario, you would add the **Public Key** and the server's **API Endpoint URL** to the `endpoints.json` configuration file, and set the **Private Key** as a secret variable in the corresponding server space.") |
|
with gr.Row(): |
|
with gr.Column(): |
|
output_public_key = gr.Textbox(lines=10, label="Generated Public Key", interactive=False, show_copy_button=True) |
|
with gr.Column(): |
|
output_private_key = gr.Textbox(lines=10, label="Generated Private Key", interactive=False, show_copy_button=True) |
|
gen_keys_button = gr.Button("βοΈ Generate New 2048-bit Key Pair", variant="secondary") |
|
|
|
gen_keys_button.click(fn=generate_rsa_keys, inputs=None, outputs=[output_private_key, output_public_key]) |
|
refresh_button.click(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state]) |
|
demo.load(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state]) |
|
creator_button.click(fn=create_keylock_wrapper, inputs=[creator_service_dropdown, creator_secret_input, endpoints_state], outputs=[creator_image_output, creator_status]) |
|
client_button.click(fn=send_keylock_wrapper, inputs=[send_service_dropdown, client_image_input, endpoints_state], outputs=[client_json_output, client_status]) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(mcp_server=True) |