|
import gradio as gr |
|
from PIL import Image, ImageDraw, ImageFont, ImageOps |
|
import base64 |
|
import io |
|
import json |
|
import logging |
|
import os |
|
import requests |
|
import struct |
|
import numpy as np |
|
from cryptography.hazmat.primitives import serialization |
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding |
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM |
|
from cryptography.hazmat.primitives import hashes |
|
from gradio_client import Client |
|
from huggingface_hub import InferenceClient |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
CREATOR_ENDPOINTS_JSON_URL = "https://huggingface.co/spaces/broadfield-dev/KeyLock-Auth-Creator/raw/main/endpoints.json" |
|
BASE_HF_URL = "https://huggingface.co/spaces/" |
|
CREATOR_SPACE_ID = "broadfield-dev/KeyLock-Auth-Creator" |
|
SERVER_SPACE_ID = "broadfield-dev/KeyLock-Auth-Server" |
|
CREATOR_URL = f"{BASE_HF_URL}{CREATOR_SPACE_ID}" |
|
SERVER_URL = f"{BASE_HF_URL}{SERVER_SPACE_ID}" |
|
CREATOR_APP_PY_URL = f"{CREATOR_URL}/raw/main/app.py" |
|
SERVER_APP_PY_URL = f"{SERVER_URL}/raw/main/app.py" |
|
|
|
|
|
DEFAULT_IMAGE_URL = "https://images.unsplash.com/photo-1506318137071-a8e063b4bec0?q=80&w=1200&auto=format&fit=crop" |
|
IMAGE_SIZE = 600 |
|
T2I_MODEL = "sd-community/sdxl-lightning" |
|
T2I_PROMPT = "A stunning view of a distant galaxy, nebulae, and constellations, digital art, vibrant colors, cinematic lighting, 8k, masterpiece." |
|
|
|
|
|
|
|
def resize_and_crop(img: Image.Image, size: int = IMAGE_SIZE) -> Image.Image: |
|
"""Resizes an image to fit within a square of `size` and then center-crops it.""" |
|
try: |
|
|
|
return ImageOps.fit(img, (size, size), Image.Resampling.LANCZOS) |
|
except Exception as e: |
|
logger.error(f"Failed to resize and crop image: {e}") |
|
|
|
return img.resize((size, size), Image.Resampling.LANCZOS) |
|
|
|
def prepare_base_image(uploaded_image: Image.Image | None, progress) -> Image.Image: |
|
""" |
|
Provides a base image using the fallback logic, updating a Gradio progress object. |
|
1. User-uploaded image. |
|
2. Default URL image. |
|
3. AI-generated image. |
|
All images are resized and cropped to a standard size. |
|
""" |
|
|
|
if uploaded_image: |
|
progress(0, desc="β
Using uploaded image...") |
|
logger.info("Using user-uploaded image.") |
|
return resize_and_crop(uploaded_image) |
|
|
|
|
|
try: |
|
progress(0, desc="β³ No image uploaded. Fetching default background...") |
|
logger.info(f"Fetching default image from URL: {DEFAULT_IMAGE_URL}") |
|
response = requests.get(DEFAULT_IMAGE_URL, timeout=15) |
|
response.raise_for_status() |
|
img = Image.open(io.BytesIO(response.content)).convert("RGB") |
|
progress(0, desc="β
Using default background image.") |
|
return resize_and_crop(img) |
|
except Exception as e: |
|
logger.warning(f"Could not fetch default image: {e}. Falling back to AI generation.") |
|
|
|
|
|
try: |
|
progress(0, desc=f"β³ Generating new image with {T2I_MODEL}...") |
|
logger.info(f"Generating a new image using model: {T2I_MODEL}") |
|
client = InferenceClient() |
|
image_bytes = client.text_to_image(T2I_PROMPT, model=T2I_MODEL) |
|
img = Image.open(io.BytesIO(image_bytes)).convert("RGB") |
|
progress(0, desc="β
New image generated successfully.") |
|
return resize_and_crop(img) |
|
except Exception as e: |
|
logger.error(f"Fatal: All image sources failed. Text-to-image failed with: {e}") |
|
raise gr.Error(f"Failed to obtain a base image. AI generation error: {e}") |
|
|
|
|
|
|
|
def generate_rsa_keys(): |
|
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) |
|
private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('utf-8') |
|
public_pem = private_key.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8') |
|
return private_pem, public_pem |
|
|
|
def create_encrypted_image( |
|
secret_data_str: str, |
|
public_key_pem: str, |
|
base_image: Image.Image, |
|
show_key_overlay: bool |
|
) -> Image.Image: |
|
if not secret_data_str.strip(): |
|
raise ValueError("Secret data cannot be empty.") |
|
if not public_key_pem.strip(): |
|
raise ValueError("Public Key cannot be empty.") |
|
|
|
data_dict = {} |
|
for line in secret_data_str.splitlines(): |
|
line = line.strip() |
|
if not line or line.startswith('#'): continue |
|
parts = line.split(':', 1) if ':' in line else line.split('=', 1) |
|
if len(parts) == 2: |
|
data_dict[parts[0].strip()] = parts[1].strip().strip("'\"") |
|
|
|
if not data_dict: |
|
raise ValueError("No valid key-value pairs found in secret data.") |
|
|
|
json_bytes = json.dumps(data_dict).encode('utf-8') |
|
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8')) |
|
|
|
aes_key, nonce = os.urandom(32), os.urandom(12) |
|
ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None) |
|
|
|
rsa_encrypted_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)) |
|
|
|
encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext |
|
|
|
|
|
img = base_image.copy().convert("RGB") |
|
width, height = img.size |
|
|
|
|
|
draw = ImageDraw.Draw(img, "RGBA") |
|
try: |
|
font_bold = ImageFont.truetype("DejaVuSans-Bold.ttf", 30) |
|
font_regular = ImageFont.truetype("DejaVuSans.ttf", 15) |
|
font_small = ImageFont.truetype("DejaVuSans.ttf", 12) |
|
except IOError: |
|
font_bold = ImageFont.load_default(size=28) |
|
font_regular = ImageFont.load_default(size=14) |
|
font_small = ImageFont.load_default(size=11) |
|
|
|
|
|
overlay_color = (15, 23, 42, 190) |
|
title_color = (226, 232, 240) |
|
key_color = (148, 163, 184) |
|
value_color = (241, 245, 249) |
|
|
|
|
|
draw.rectangle([0, 20, width, 80], fill=overlay_color) |
|
draw.text((width / 2, 50), "KeyLock Secure Data", fill=title_color, font=font_bold, anchor="ms") |
|
|
|
|
|
if show_key_overlay: |
|
box_padding = 15 |
|
line_spacing = 6 |
|
text_start_x = 35 |
|
|
|
|
|
lines = [f"{key}: {value}" for key, value in data_dict.items()] |
|
line_heights = [draw.textbbox((0,0), line, font=font_regular)[3] for line in lines] |
|
total_text_height = sum(line_heights) + (len(lines) - 1) * line_spacing |
|
box_height = total_text_height + (box_padding * 2) |
|
box_y0 = height - box_height - 20 |
|
|
|
draw.rectangle([20, box_y0, width - 20, height - 20], fill=overlay_color) |
|
|
|
current_y = box_y0 + box_padding |
|
for i, (key, value) in enumerate(data_dict.items()): |
|
|
|
key_text = f"{key}:" |
|
draw.text((text_start_x, current_y), key_text, fill=key_color, font=font_regular) |
|
key_bbox = draw.textbbox((text_start_x, current_y), key_text, font=font_regular) |
|
draw.text((key_bbox[2] + 8, current_y), str(value), fill=value_color, font=font_regular) |
|
current_y += line_heights[i] + line_spacing |
|
|
|
|
|
pixel_data = np.array(img.convert("RGB")).ravel() |
|
binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload) |
|
|
|
if len(binary_payload) > pixel_data.size: |
|
raise ValueError(f"Data is too large for the image. Max size: {pixel_data.size // 8} bytes. Your data: ~{len(binary_payload) // 8} bytes.") |
|
|
|
pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8) |
|
stego_pixels = pixel_data.reshape((height, width, 3)) |
|
|
|
return Image.fromarray(stego_pixels, 'RGB') |
|
|
|
|
|
|
|
def get_server_list(): |
|
status = "Fetching server list from remote config..." |
|
yield gr.Dropdown(choices=[], value=None, label="β³ Fetching..."), status, [] |
|
try: |
|
response = requests.get(CREATOR_ENDPOINTS_JSON_URL, timeout=10) |
|
response.raise_for_status() |
|
all_entries = response.json() |
|
valid_endpoints = [e for e in all_entries if isinstance(e, dict) and "name" in e and "public_key" in e and ("api_endpoint" in e or "link" in e)] |
|
if not valid_endpoints: |
|
raise ValueError("No valid server configurations found.") |
|
endpoint_names = [e['name'] for e in valid_endpoints] |
|
status = f"β
Success! Found {len(endpoint_names)} valid servers." |
|
yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Server"), status, valid_endpoints |
|
except Exception as e: |
|
status = f"β Error fetching configuration: {e}" |
|
logger.error(status) |
|
yield gr.Dropdown(choices=[], value=None, label="Error fetching servers"), status, [] |
|
|
|
def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list, uploaded_image: Image.Image | None, show_keys: bool, progress=gr.Progress(track_tqdm=True)): |
|
if not service_name: |
|
raise gr.Error("Please select a target server.") |
|
public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None) |
|
if not public_key: |
|
raise gr.Error(f"Could not find public key for '{service_name}'. Please refresh the server list.") |
|
try: |
|
|
|
base_image = prepare_base_image(uploaded_image, progress) |
|
progress(0.5, desc="Encrypting and embedding data...") |
|
|
|
|
|
created_image = create_encrypted_image(secret_data, public_key, base_image, show_keys) |
|
|
|
return created_image, f"β
Success! Image created for '{service_name}'." |
|
except Exception as e: |
|
logger.error(f"Error creating image: {e}", exc_info=True) |
|
return None, f"β Error: {e}" |
|
|
|
def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list): |
|
if not service_name: raise gr.Error("Please select a target server.") |
|
if image is None: raise gr.Error("Please create or upload an image to send.") |
|
endpoint_details = next((e for e in available_endpoints if e['name'] == service_name), None) |
|
if not endpoint_details or not endpoint_details.get('link'): raise gr.Error(f"Config Error for '{service_name}'.") |
|
|
|
server_url = endpoint_details['link'] |
|
status = f"Connecting to remote server: {server_url}" |
|
yield None, status |
|
|
|
try: |
|
with io.BytesIO() as buffer: |
|
image.save(buffer, format="PNG") |
|
b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8") |
|
|
|
client = Client(server_url) |
|
result = client.predict(image_base64_string=b64_string, api_name="/keylock-auth-decoder") |
|
decrypted_data = json.loads(result) if isinstance(result, str) else result |
|
yield decrypted_data, "β
Success! Data decrypted by remote server." |
|
|
|
except Exception as e: |
|
logger.error(f"Error calling server with gradio_client: {e}", exc_info=True) |
|
yield None, f"β Error calling server API: {e}" |
|
|
|
def refresh_and_update_all(): |
|
for dropdown_update, status_update, state_update in get_server_list(): pass |
|
return dropdown_update, dropdown_update, status_update, state_update |
|
|
|
|
|
|
|
theme = gr.themes.Base( |
|
primary_hue=gr.themes.colors.blue, secondary_hue=gr.themes.colors.sky, neutral_hue=gr.themes.colors.slate, |
|
font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"), |
|
).set( |
|
body_background_fill="#F1F5F9", panel_background_fill="white", block_background_fill="white", |
|
block_border_width="1px", block_shadow="*shadow_drop_lg", |
|
button_primary_background_fill="*primary_600", button_primary_background_fill_hover="*primary_700", |
|
) |
|
|
|
with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo: |
|
endpoints_state = gr.State([]) |
|
|
|
gr.Markdown("# π KeyLock Operations Dashboard") |
|
gr.Markdown("A centralized dashboard to manage and demonstrate the entire KeyLock ecosystem. Key/Image creation is performed locally, while decryption is handled by a **live, remote API call** to a secure server.") |
|
|
|
with gr.Tabs() as tabs: |
|
with gr.TabItem("β Create KeyLock", id=0): |
|
gr.Markdown("## Step 1: Create an Encrypted Authentication Image (Local)") |
|
gr.Markdown("## β this is a DEMO, don't send personal information encrypted to the DEMO server") |
|
gr.Markdown(f"Encrypt your data into a PNG. You can upload your own background image, or we'll provide a stylish default or generate one for you. The encryption process happens entirely in your browser.") |
|
with gr.Row(variant="panel"): |
|
with gr.Column(scale=2): |
|
with gr.Row(): |
|
creator_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to encrypt data for.") |
|
refresh_button = gr.Button("π", scale=0, size="sm") |
|
|
|
creator_secret_input = gr.Textbox(lines=5, label="Secret Data to Encrypt", placeholder="API_KEY: sk-123...\nUSER: demo-user") |
|
|
|
gr.Markdown("### Image Options") |
|
creator_base_image_input = gr.Image(label="Optional Base Image (600x600 recommended)", type="pil", sources=["upload"], show_download_button=False) |
|
creator_show_keys_checkbox = gr.Checkbox(label="Show key/value data on final image", value=True) |
|
|
|
creator_button = gr.Button("β¨ Create Auth Image", variant="primary", scale=2) |
|
|
|
with gr.Column(scale=3): |
|
creator_status = gr.Textbox(label="Status", interactive=False, lines=1) |
|
creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil", show_download_button=True, format="png", height=600) |
|
|
|
with gr.TabItem("β‘ Send KeyLock", id=1): |
|
gr.Markdown("## Step 2: Decrypt via Live API Call") |
|
gr.Markdown("This tool acts as the **Client**. It sends the encrypted image you created in Step 1 to the live, remote **Decoder Server** you select. The server uses its securely stored private key to decrypt the data and sends the result back.") |
|
with gr.Row(variant="panel"): |
|
with gr.Column(scale=1): |
|
gr.Markdown("### Configuration") |
|
send_service_dropdown = gr.Dropdown(label="Target Server", interactive=True, info="Select the API server to send the image to.") |
|
gr.Markdown("### Image to Send") |
|
client_image_input = gr.Image(type="pil", label="Upload or Drag Encrypted Image Here", sources=["upload", "clipboard"]) |
|
client_button = gr.Button("π Decrypt via Remote Server", variant="primary") |
|
with gr.Column(scale=1): |
|
gr.Markdown("### Response from Server") |
|
client_status = gr.Textbox(label="Status", interactive=False, lines=2) |
|
client_json_output = gr.JSON(label="Decrypted Data") |
|
|
|
with gr.TabItem("βΉοΈ Info & Key Generation", id=2): |
|
gr.Markdown("## Ecosystem Architecture") |
|
gr.Markdown(f"This dashboard uses a public [configuration file]({CREATOR_ENDPOINTS_JSON_URL}) to dynamically discover and interact with live services. It demonstrates a secure, decoupled workflow.") |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown(f"### π Auth Creator Service\n- **Space:** [{CREATOR_SPACE_ID}]({CREATOR_URL})\n- **Role:** Provides an API to encrypt data for various targets defined in its `endpoints.json` file.\n- **Source Code:** [app.py]({CREATOR_APP_PY_URL})") |
|
with gr.Column(): |
|
gr.Markdown(f"### π‘ Decoder Server\n- **Space:** [{SERVER_SPACE_ID}]({SERVER_URL})\n- **Role:** The trusted authority. It holds a secret private key and provides a secure API to decrypt images.\n- **Source Code:** [app.py]({SERVER_APP_PY_URL})") |
|
|
|
with gr.Accordion("π RSA Key Pair Generator", open=False): |
|
gr.Markdown("Create a new key pair. In a real scenario, you would add the **Public Key** and the server's **API Endpoint URL** to the `endpoints.json` configuration file, and set the **Private Key** as a secret variable in the corresponding server space.") |
|
with gr.Row(): |
|
with gr.Column(): |
|
output_public_key = gr.Textbox(lines=10, label="Generated Public Key", interactive=False, show_copy_button=True) |
|
with gr.Column(): |
|
output_private_key = gr.Textbox(lines=10, label="Generated Private Key", interactive=False, show_copy_button=True) |
|
gen_keys_button = gr.Button("βοΈ Generate New 2048-bit Key Pair", variant="secondary") |
|
|
|
|
|
gen_keys_button.click(fn=generate_rsa_keys, inputs=None, outputs=[output_private_key, output_public_key]) |
|
refresh_button.click(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state]) |
|
demo.load(fn=refresh_and_update_all, outputs=[creator_service_dropdown, send_service_dropdown, creator_status, endpoints_state]) |
|
|
|
creator_button.click( |
|
fn=create_keylock_wrapper, |
|
inputs=[creator_service_dropdown, creator_secret_input, endpoints_state, creator_base_image_input, creator_show_keys_checkbox], |
|
outputs=[creator_image_output, creator_status] |
|
) |
|
|
|
client_button.click(fn=send_keylock_wrapper, inputs=[send_service_dropdown, client_image_input, endpoints_state], outputs=[client_json_output, client_status]) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(mcp_server=True) |