broadfield-dev's picture
Update app.py
5c3812c verified
raw
history blame
14.2 kB
import gradio as gr
from PIL import Image, ImageDraw, ImageFont
import base64
import io
import json
import logging
import os
import requests
import struct
import numpy as np
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from gradio_client import Client
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CREATOR_ENDPOINTS_JSON_URL = "https://huggingface.co/spaces/broadfield-dev/KeyLock-Auth-Creator/raw/main/endpoints.json"
BASE_HF_URL = "https://huggingface.co/spaces/"
CREATOR_SPACE_ID = "broadfield-dev/KeyLock-Auth-Creator"
SERVER_SPACE_ID = "broadfield-dev/KeyLock-Auth-Server"
CREATOR_URL = f"{BASE_HF_URL}{CREATOR_SPACE_ID}"
SERVER_URL = f"{BASE_HF_URL}{SERVER_SPACE_ID}"
CREATOR_APP_PY_URL = f"{CREATOR_URL}/raw/main/app.py"
SERVER_APP_PY_URL = f"{SERVER_URL}/raw/main/app.py"
def generate_rsa_keys():
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
return private_pem, public_pem
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
if not secret_data_str.strip():
raise ValueError("Secret data cannot be empty.")
if not public_key_pem.strip():
raise ValueError("Public Key cannot be empty.")
data_dict = {}
for line in secret_data_str.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split(':', 1) if ':' in line else line.split('=', 1)
if len(parts) == 2:
data_dict[parts[0].strip()] = parts[1].strip().strip("'\"")
if not data_dict:
raise ValueError("No valid key-value pairs found in secret data.")
json_bytes = json.dumps(data_dict).encode('utf-8')
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
aes_key, nonce = os.urandom(32), os.urandom(12)
ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
rsa_encrypted_key = public_key.encrypt(
aes_key,
padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext
img = Image.new('RGB', (800, 600), color=(45, 52, 54))
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("DejaVuSans.ttf", 40)
except IOError:
font = ImageFont.load_default(size=30)
draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
pixel_data = np.array(img.convert("RGB")).ravel()
binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload)
if len(binary_payload) > pixel_data.size:
raise ValueError(f"Data is too large for the image. Max size: {pixel_data.size // 8} bytes.")
pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
stego_pixels = pixel_data.reshape((600, 800, 3))
return Image.fromarray(stego_pixels, 'RGB')
def get_server_list():
status = "Fetching server list from remote config..."
yield gr.Dropdown(choices=[], value=None, label="⏳ Fetching..."), status, []
try:
response = requests.get(CREATOR_ENDPOINTS_JSON_URL, timeout=10)
response.raise_for_status()
all_entries = response.json()
valid_endpoints = []
for entry in all_entries:
if not isinstance(entry, dict) or "name" not in entry or "public_key" not in entry:
logger.warning(f"Skipping invalid entry (missing name or public_key): {entry}")
continue
if "api_endpoint" not in entry:
if "link" in entry:
base_url = entry["link"].strip("/")
entry["api_endpoint"] = f"{base_url}/run/predict/"
logger.info(f"Auto-generating API endpoint for '{entry['name']}': {entry['api_endpoint']}")
else:
logger.warning(f"Skipping invalid entry '{entry['name']}' (missing 'api_endpoint' and 'link'): {entry}")
continue
valid_endpoints.append(entry)
if not valid_endpoints:
raise ValueError("No valid server configurations found in the remote file.")
endpoint_names = [e['name'] for e in valid_endpoints]
status = f"βœ… Success! Found {len(endpoint_names)} valid servers."
yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Server"), status, valid_endpoints
except Exception as e:
status = f"❌ Error fetching or parsing configuration: {e}"
logger.error(status)
yield gr.Dropdown(choices=[], value=None, label="Error fetching servers"), status, []
def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list):
if not service_name:
raise gr.Error("Please select a target server.")
public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None)
if not public_key:
raise gr.Error(f"Could not find public key for '{service_name}'. Please refresh the server list.")
try:
created_image = create_encrypted_image(secret_data, public_key)
return created_image, f"βœ… Success! Image created for '{service_name}'."
except Exception as e:
logger.error(f"Error creating image: {e}")
return None, f"❌ Error: {e}"
def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list):
if not service_name:
raise gr.Error("Please select a target server.")
if image is None:
raise gr.Error("Please create or upload an image to send.")
endpoint_details = next((e for e in available_endpoints if e['name'] == service_name), None)
if not endpoint_details:
raise gr.Error(f"Configuration Error: Could not find details for '{service_name}'. Refresh the list.")
server_url = endpoint_details.get('link')
if not server_url:
raise gr.Error(f"Configuration Error: The selected server '{service_name}' is missing a 'link' to its Space.")
status = f"Connecting to remote server: {server_url}"
yield None, status
try:
with io.BytesIO() as buffer:
image.save(buffer, format="PNG")
b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
client = Client(server_url)
result = client.predict(
image_base64_string=b64_string,
api_name="/keylock-auth-decoder"
)
decrypted_data = result
if isinstance(decrypted_data, str):
try:
decrypted_data = json.loads(decrypted_data)
except json.JSONDecodeError:
pass
yield decrypted_data, "βœ… Success! Data decrypted by remote server."
except Exception as e:
logger.error(f"Error calling server with gradio_client: {e}")
yield None, f"❌ Error calling server API: {e}"
def refresh_and_update_all():
for dropdown_update, status_update, state_update in get_server_list():
pass
return dropdown_update, dropdown_update, status_update, state_update
theme = gr.themes.Base(
primary_hue=gr.themes.colors.blue, secondary_hue=gr.themes.colors.sky, neutral_hue=gr.themes.colors.slate,
font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"),
).set(
body_background_fill="#F1F5F9", panel_background_fill="white", block_background_fill="white",
block_border_width="1px", block_shadow="*shadow_drop_lg",
button_primary_background_fill="*primary_600", button_primary_background_fill_hover="*primary_700",
)
with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
endpoints_state = gr.State([])
gr.Markdown("# πŸ”‘ KeyLock Operations Dashboard")
gr.Markdown("# ❗ DEMO: Don't send personal information encrypted to the demo server")
gr.Markdown("A centralized dashboard to manage and demonstrate the entire KeyLock ecosystem. Key/Image creation is performed locally, while decryption is handled by a **live, remote API call** to a secure server.")
with gr.Tabs() as tabs:
with gr.TabItem("β‘  Create KeyLock", id=0):
gr.Markdown("## Step 1: Create an Encrypted Authentication Image (Local)")
gr.Markdown(f"This tool acts as the **Auth Creator**. It fetches a list of available servers from a public [configuration file]({CREATOR_ENDPOINTS_JSON_URL}), then uses the selected server's public key to encrypt your data into a PNG. **This entire creation process happens locally.**")
with gr.Row(variant="panel"):
with gr.Column(scale=2):
with gr.Row():
creator_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to encrypt data for.")
refresh_button = gr.Button("πŸ”„", scale=0, size="sm")
creator_secret_input = gr.Textbox(lines=8, label="Secret Data to Encrypt", placeholder="API_KEY: sk-123...\nUSER: demo-user")
creator_button = gr.Button("✨ Create Auth Image", variant="primary")
with gr.Column(scale=1):
creator_status = gr.Textbox(label="Status", interactive=False, lines=2)
creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil", show_download_button=True, format="png")
with gr.TabItem("β‘‘ Send KeyLock", id=1):
gr.Markdown("## Step 2: Decrypt via Live API Call")
gr.Markdown("This tool acts as the **Client**. It sends the encrypted image you created in Step 1 to the live, remote **Decoder Server** you select from the same configuration list. The server uses its securely stored private key to decrypt the data and sends the result back.")
with gr.Row(variant="panel"):
with gr.Column(scale=1):
gr.Markdown("### Configuration")
send_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to send the image to.")
gr.Markdown("### Image to Send")
client_image_input = gr.Image(type="pil", label="Upload or Drag Encrypted Image Here", sources=["upload", "clipboard"])
client_button = gr.Button("πŸ”“ Decrypt via Remote Server", variant="primary")
with gr.Column(scale=1):
gr.Markdown("### Response from Server")
client_status = gr.Textbox(label="Status", interactive=False, lines=2)
client_json_output = gr.JSON(label="Decrypted Data")
with gr.TabItem("ℹ️ Info & Key Generation", id=2):
gr.Markdown("## Ecosystem Architecture")
gr.Markdown(f"This dashboard uses a public [configuration file]({CREATOR_ENDPOINTS_JSON_URL}) to dynamically discover and interact with live services. It demonstrates a secure, decoupled workflow.")
with gr.Row():
with gr.Column():
gr.Markdown(f"### 🏭 Auth Creator Service\n- **Space:** [{CREATOR_SPACE_ID}]({CREATOR_URL})\n- **Role:** Provides an API to encrypt data for various targets defined in its `endpoints.json` file.\n- **Source Code:** [app.py]({CREATOR_APP_PY_URL})")
with gr.Column():
gr.Markdown(f"### πŸ“‘ Decoder Server\n- **Space:** [{SERVER_SPACE_ID}]({SERVER_URL})\n- **Role:** The trusted authority. It holds a secret private key and provides a secure API to decrypt images.\n- **Source Code:** [app.py]({SERVER_APP_PY_URL})")
with gr.Accordion("πŸ”‘ RSA Key Pair Generator", open=False):
gr.Markdown("Create a new key pair. In a real scenario, you would add the **Public Key** and the server's **API Endpoint URL** to the `endpoints.json` configuration file, and set the **Private Key** as a secret variable in the corresponding server space.")
with gr.Row():
with gr.Column():
output_public_key = gr.Textbox(lines=10, label="Generated Public Key", interactive=False, show_copy_button=True)
with gr.Column():
output_private_key = gr.Textbox(lines=10, label="Generated Private Key", interactive=False, show_copy_button=True)
gen_keys_button = gr.Button("βš™οΈ Generate New 2048-bit Key Pair", variant="secondary")
gen_keys_button.click(fn=generate_rsa_keys, inputs=None, outputs=[output_private_key, output_public_key])
refresh_button.click(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
demo.load(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state])
creator_button.click(fn=create_keylock_wrapper, inputs=[creator_service_dropdown, creator_secret_input, endpoints_state], outputs=[creator_image_output, creator_status])
client_button.click(fn=send_keylock_wrapper, inputs=[send_service_dropdown, client_image_input, endpoints_state], outputs=[client_json_output, client_status])
if __name__ == "__main__":
demo.launch(mcp_server=True)