File size: 12,034 Bytes
5cf429e b1bd8c8 5cf429e b1bd8c8 5cf429e b1bd8c8 5cf429e b1bd8c8 94b8a3f b1bd8c8 5cf429e eb48225 5cf429e d1be50a 5cf429e d1be50a 5cf429e b1bd8c8 5cf429e 7191e6a eb48225 b1bd8c8 eb48225 b1bd8c8 eb48225 94b8a3f eb48225 b1bd8c8 5cf429e eb48225 b1bd8c8 eb48225 b1bd8c8 eb48225 5cf429e eb48225 94b8a3f eb48225 94b8a3f eb48225 b1bd8c8 94b8a3f 5cf429e eb48225 b1bd8c8 5cf429e eb48225 94b8a3f b1bd8c8 7191e6a b1bd8c8 5cf429e d1be50a b1bd8c8 eb48225 1db56f0 eb48225 1db56f0 b1bd8c8 1db56f0 d1be50a 7191e6a 5cf429e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
import gradio as gr
import json
import os
import io
import base64
import struct
import logging
import requests
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.exceptions import InvalidTag
# --- Configure Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ==============================================================================
# CONFIGURATION: URL of the Remote SERVER Service
# ==============================================================================
SERVER_SPACE_ID = "broadfield-dev/KeyLock-Auth-Server"
BASE_HF_URL = "https://huggingface.co/spaces/"
SERVER_URL = f"{BASE_HF_URL}{SERVER_SPACE_ID}"
# The API endpoint is constructed from the server's direct URL, not the hub URL.
SERVER_DIRECT_URL_BASE = f"https://{SERVER_SPACE_ID.replace('/', '-')}.hf.space"
SERVER_API_ENDPOINT = f"{SERVER_DIRECT_URL_BASE}/run/keylock-auth-decoder"
# ==============================================================================
# LOCAL LOGIC (Key and Image Generation)
# ==============================================================================
def generate_rsa_keys():
"""Generates a new 2048-bit RSA key pair LOCALLY."""
logger.info("Generating new RSA key pair locally.")
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
return private_pem, public_pem
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
"""Creates the encrypted image LOCALLY."""
logger.info("Starting local image creation process...")
if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.")
if not public_key_pem.strip(): raise ValueError("Public Key cannot be empty.")
data_dict = {}
for line in secret_data_str.splitlines():
if not line.strip() or line.strip().startswith('#'): continue
parts = line.split(':', 1) if ':' in line else line.split('=', 1)
if len(parts) != 2: continue
data_dict[parts[0].strip()] = parts[1].strip().strip("'\"")
if not data_dict: raise ValueError("No valid key-value pairs found.")
json_bytes = json.dumps(data_dict).encode('utf-8')
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
aes_key, nonce = os.urandom(32), os.urandom(12)
ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
rsa_encrypted_aes_key = public_key.encrypt(
aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
encrypted_payload = struct.pack('>I', len(rsa_encrypted_aes_key)) + rsa_encrypted_aes_key + nonce + ciphertext_with_tag
img = Image.new('RGB', (800, 600), color=(45, 52, 54))
draw = ImageDraw.Draw(img)
try: font = ImageFont.truetype("DejaVuSans.ttf", 40)
except IOError: font = ImageFont.load_default(size=30)
draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
pixel_data = np.array(img.convert("RGB")).ravel()
binary_payload = ''.join(format(byte, '08b') for byte in struct.pack('>I', len(encrypted_payload)) + encrypted_payload)
if len(binary_payload) > pixel_data.size: raise ValueError("Data too large for image capacity.")
for i in range(len(binary_payload)): pixel_data[i] = (pixel_data[i] & 0xFE) | int(binary_payload[i])
stego_pixels = pixel_data.reshape((600, 800, 3))
return Image.fromarray(stego_pixels, 'RGB')
# ==============================================================================
# REMOTE API CALL LOGIC
# ==============================================================================
def decrypt_image_via_api(image: Image.Image):
"""Makes a LIVE API call to the deployed server to decrypt an image."""
if image is None: raise gr.Error("Please provide an image to send.")
status = f"Connecting to server: {SERVER_SPACE_ID}..."
yield None, status
try:
with io.BytesIO() as buffer:
image.save(buffer, format="PNG")
b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
payload = {"data": [b64_string]}
headers = {"Content-Type": "application/json"}
status = f"Sending image to API endpoint:\n{SERVER_API_ENDPOINT}"
yield None, status
response = requests.post(SERVER_API_ENDPOINT, headers=headers, json=payload, timeout=45)
response_json = response.json()
if response.status_code == 200:
if "data" in response_json:
decrypted_data = response_json["data"][0]
status = "β
Success! Data decrypted by the remote server."
return decrypted_data, status
elif "error" in response_json:
raise gr.Error(f"API returned an error: {response_json['error']}")
else:
error_detail = response_json.get("error", "Unknown error.")
raise gr.Error(f"API Error (Status {response.status_code}): {error_detail}")
except requests.exceptions.RequestException as e:
logger.error(f"Network error calling API: {e}")
raise gr.Error(f"Could not connect to the API. Check the server space is running and the URL is correct. Error: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
raise gr.Error(f"An unexpected error occurred: {e}")
# ==============================================================================
# GRADIO DASHBOARD INTERFACE
# ==============================================================================
theme = gr.themes.Base(
primary_hue=gr.themes.colors.blue, secondary_hue=gr.themes.colors.sky, neutral_hue=gr.themes.colors.slate,
font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"),
).set(
body_background_fill="#F1F5F9", panel_background_fill="white", block_background_fill="white",
block_border_width="1px", block_shadow="*shadow_drop_lg",
button_primary_background_fill="*primary_600", button_primary_background_fill_hover="*primary_700",
)
with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
gr.Markdown("# π KeyLock Operations Dashboard")
gr.Markdown("A self-contained dashboard to demonstrate the KeyLock ecosystem. Key/Image creation is performed locally, while decryption is handled by a **live, remote API call** to a secure server.")
with gr.Tabs() as tabs:
with gr.TabItem("β Generate Keys", id=0):
gr.Markdown("## Step 1: Create a Secure Key Pair (Local)")
gr.Markdown(
"""
This tool generates a new RSA key pair within your browser session. In a real-world scenario, the **Private Key** would be immediately stored as a secure secret on a server (like the `KEYLOCK_PRIV_KEY` secret on our demo server), and would never be shown in a UI like this. The **Public Key** would be distributed to clients or other services that need to encrypt data for that server.
**Action:** Click the button below, then copy both keys for the next steps.
"""
)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
gr.Markdown("### Your New Keys")
gen_keys_button = gr.Button("Generate New 2048-bit Key Pair", icon="π", variant="secondary")
with gr.Column(scale=2):
with gr.Row():
output_public_key = gr.Textbox(lines=11, label="Generated Public Key (For Creator)", interactive=False, show_copy_button=True)
output_private_key = gr.Textbox(lines=11, label="Generated Private Key (For Decoder)", interactive=False, show_copy_button=True)
with gr.TabItem("β‘ Create KeyLock", id=1):
gr.Markdown("## Step 2: Create an Encrypted Auth Image (Local)")
gr.Markdown(
"""
This tool acts as the **Auth Creator**. It takes your secret data and uses the **Public Key** you generated in Step 1 to encrypt it into a new PNG image. This entire process happens locally in this application. This simulates a user or an automated client preparing credentials to send to the secure server.
**Action:** Paste the **Public Key** from Step 1, enter some secrets, and click create.
"""
)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
gr.Markdown("### Configuration")
creator_pubkey_input = gr.Textbox(lines=8, label="Paste the Public Key Here", placeholder="Copy the public key generated in Step 1...")
creator_secret_input = gr.Textbox(lines=5, label="Secret Data to Encrypt", placeholder="SESSION_ID: abc-123\nUSER: [email protected]")
creator_button = gr.Button("β¨ Create Auth Image", variant="primary")
with gr.Column(scale=1):
gr.Markdown("### Output")
creator_status = gr.Textbox(label="Status", interactive=False, lines=2)
creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil", show_download_button=True, format="png", show_share_button=False)
with gr.TabItem("β’ Send KeyLock", id=2):
gr.Markdown("## Step 3: Decrypt via Live API Call")
gr.Markdown(
f"""
This is the core demonstration. This tool acts as a **Client** sending the encrypted image to our live, remote **Server** at [{SERVER_SPACE_ID}]({SERVER_URL}).
For this demo to work, the **Private Key** you generated in Step 1 must be **the same one** set as the `KEYLOCK_PRIV_KEY` secret in the `{SERVER_SPACE_ID}` Space settings. The client sends the image, and the server uses its own secret key to decrypt it.
**Action:** Upload the image from Step 2. The dashboard will make a live API call to `{SERVER_API_ENDPOINT}`.
"""
)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
gr.Markdown("### Input")
client_image_input = gr.Image(type="pil", label="Upload or Drag Encrypted Image Here", sources=["upload", "clipboard"])
client_button = gr.Button("π Decrypt Image via Remote Server", variant="primary")
with gr.Column(scale=1):
gr.Markdown("### Decrypted Data")
client_status = gr.Textbox(label="Status", interactive=False, lines=2)
client_json_output = gr.JSON(label="Result from Server")
# --- Wire up the component logic ---
gen_keys_button.click(fn=generate_rsa_keys, inputs=None, outputs=[output_private_key, output_public_key])
creator_button.click(
fn=create_encrypted_image,
inputs=[creator_pubkey_input, creator_secret_input],
outputs=[creator_image_output, creator_status]
)
client_button.click(
fn=decrypt_image_via_api,
inputs=[client_image_input],
outputs=[client_json_output, client_status]
)
if __name__ == "__main__":
demo.launch() |