File size: 10,417 Bytes
5cf429e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import gradio as gr
import json
import os
import io
import base64
import struct
import logging
import tempfile
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.exceptions import InvalidTag
# --- Configure Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ==============================================================================
# LOAD KEYS AND CONFIG
# ==============================================================================
KEYLOCK_PRIV_KEY = os.environ.get('KEYLOCK_PRIV_KEY')
PUBLIC_KEY_PEM_STRING = ""
try:
with open("keylock_pub.pem", "r") as f:
PUBLIC_KEY_PEM_STRING = f.read()
logger.info("Successfully loaded public key from keylock_pub.pem.")
except FileNotFoundError:
PUBLIC_KEY_PEM_STRING = "Error: keylock_pub.pem file not found. 'Creator' tab will not work."
# ==============================================================================
# CREATOR LOGIC
# ==============================================================================
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.")
# 1. Parse & Encrypt Data
data_dict = {}
for line in secret_data_str.splitlines():
if not line.strip() or line.strip().startswith('#'): continue
key, value = line.split(':', 1) if ':' in line else line.split('=', 1)
data_dict[key.strip()] = value.strip().strip("'\"")
json_bytes = json.dumps(data_dict).encode('utf-8')
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
aes_key = os.urandom(32)
nonce = os.urandom(12)
ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
rsa_encrypted_aes_key = public_key.encrypt(
aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
encrypted_aes_key_len_bytes = struct.pack('>I', len(rsa_encrypted_aes_key))
encrypted_payload = encrypted_aes_key_len_bytes + rsa_encrypted_aes_key + nonce + ciphertext_with_tag
# 2. Create Carrier Image and Embed Data
img = Image.new('RGB', (800, 600), color=(45, 52, 54))
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("DejaVuSans.ttf", 40)
except IOError:
font = ImageFont.load_default()
draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
img_rgb = img.convert("RGB")
pixel_data = np.array(img_rgb).ravel()
data_length_header = struct.pack('>I', len(encrypted_payload))
binary_payload = ''.join(format(byte, '08b') for byte in data_length_header + encrypted_payload)
if len(binary_payload) > pixel_data.size:
raise ValueError("Data is too large for the image capacity.")
for i in range(len(binary_payload)):
pixel_data[i] = (pixel_data[i] & 0xFE) | int(binary_payload[i])
stego_pixels = pixel_data.reshape(img_rgb.size[1], img_rgb.size[0], 3)
return Image.fromarray(stego_pixels, 'RGB')
# ==============================================================================
# SERVER (DECODER) LOGIC
# ==============================================================================
def decode_data_from_image(image: Image.Image) -> dict:
if not KEYLOCK_PRIV_KEY:
raise gr.Error("Server Error: The API is not configured with a private key.")
if image is None:
raise gr.Error("Please provide an image to decode.")
try:
private_key = serialization.load_pem_private_key(KEYLOCK_PRIV_KEY.encode(), password=None)
with io.BytesIO() as buffer:
image.save(buffer, format="PNG")
image_buffer = buffer.getvalue()
img_rgb = Image.open(io.BytesIO(image_buffer)).convert("RGB")
pixel_data = np.array(img_rgb).ravel()
header_binary_string = "".join(str(p & 1) for p in pixel_data[:32])
data_length = int(header_binary_string, 2)
end_offset = 32 + (data_length * 8)
if pixel_data.size < end_offset: raise ValueError("Image data corrupt or truncated.")
data_binary_string = "".join(str(p & 1) for p in pixel_data[32:end_offset])
crypto_payload = int(data_binary_string, 2).to_bytes(data_length, byteorder='big')
rsa_key_len = private_key.key_size // 8
len_header = struct.unpack('>I', crypto_payload[:4])[0]
if len_header != rsa_key_len: raise ValueError("Key pair mismatch or corrupt data.")
encrypted_key = crypto_payload[4:4 + rsa_key_len]
nonce = crypto_payload[4 + rsa_key_len : 4 + rsa_key_len + 12]
ciphertext = crypto_payload[4 + rsa_key_len + 12:]
aes_key = private_key.decrypt(encrypted_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
decrypted_bytes = AESGCM(aes_key).decrypt(nonce, ciphertext, None)
logger.info("Successfully decoded an image via internal call.")
return json.loads(decrypted_bytes.decode('utf-8'))
except Exception as e:
logger.error(f"Decryption failed: {e}", exc_info=True)
raise gr.Error(f"Decryption failed. The image may be corrupt, not encrypted with the correct key, or the server is misconfigured. Details: {e}")
# ==============================================================================
# GRADIO UI HELPER FUNCTIONS
# ==============================================================================
def creator_wrapper(secret_data_str: str):
"""Wrapper for the Gradio UI to call the creator function."""
if not PUBLIC_KEY_PEM_STRING or "Error" in PUBLIC_KEY_PEM_STRING:
raise gr.Error("Cannot create image: public key file is missing or invalid.")
try:
encrypted_image = create_encrypted_image(secret_data_str, PUBLIC_KEY_PEM_STRING)
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
encrypted_image.save(tmp.name)
return encrypted_image, tmp.name, "β
Success! Image created and ready for download."
except Exception as e:
return None, None, f"β Error: {e}"
# ==============================================================================
# GRADIO DASHBOARD INTERFACE
# ==============================================================================
theme = gr.themes.Base(
primary_hue="blue",
secondary_hue="blue",
neutral_hue="slate",
font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"),
).set(
body_background_fill="#F0F2F5",
block_background_fill="white",
block_border_width="1px",
block_shadow="*shadow_drop_lg",
button_primary_background_fill="*primary_500",
button_primary_background_fill_hover="*primary_600",
button_primary_text_color="white",
)
with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
gr.Markdown("# π KeyLock Operations Dashboard")
gr.Markdown("A unified dashboard to demonstrate the KeyLock ecosystem: API Server, Image Creator, and API Client.")
with gr.Tabs():
with gr.TabItem("π‘ API Server Status & Docs", id=0):
gr.Markdown("## Server Status")
key_status = "β
Loaded successfully from secrets." if KEYLOCK_PRIV_KEY else "β NOT FOUND. The API will not work. Set the `KEYLOCK_PRIV_KEY` secret in the Space settings."
gr.Textbox(label="Private Key Status", value=key_status, interactive=False)
with gr.Accordion("View Service Public Key", open=False):
gr.Code(value=PUBLIC_KEY_PEM_STRING, language="pem", label="Service Public Key (from keylock_pub.pem)")
gr.Markdown("## API Documentation")
gr.Markdown("This server exposes a conceptual API endpoint for external clients.")
gr.Code(language="markdown", value="""
- **Endpoint**: `/run/keylock-auth-decoder`
- **Method**: `POST`
- **Body**: JSON payload `{ "data": ["<base64_encoded_image_string>"] }`
""")
with gr.TabItem("π Image Creator", id=1):
gr.Markdown("## Create an Encrypted Image for the Server")
gr.Markdown("Use this tool to encrypt data using the server's public key.")
with gr.Row():
with gr.Column(scale=2):
creator_input = gr.Textbox(
lines=7,
label="Secret Data (Key-Value Pairs)",
placeholder="USERNAME: [email protected]\nAPI_KEY: sk-12345..."
)
creator_button = gr.Button("Create Encrypted Image", variant="primary")
with gr.Column(scale=1):
creator_status = gr.Textbox(label="Status", interactive=False)
creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil")
creator_download_output = gr.File(label="Download Image")
with gr.TabItem("π» API Client (Simulator)", id=2):
gr.Markdown("## Decrypt an Image via Simulated API Call")
gr.Markdown("Upload an image created by the 'Creator' tab to test the server's decryption logic.")
with gr.Row():
with gr.Column(scale=1):
client_input_image = gr.Image(type="pil", label="Upload Encrypted Image", sources=["upload", "clipboard"])
client_decrypt_button = gr.Button("Decrypt Image", variant="primary")
with gr.Column(scale=1):
client_output_json = gr.JSON(label="Decrypted Data from Server")
# --- Wire up the component logic ---
creator_button.click(
fn=creator_wrapper,
inputs=[creator_input],
outputs=[creator_image_output, creator_download_output, creator_status]
)
client_decrypt_button.click(
fn=decode_data_from_image,
inputs=[client_input_image],
outputs=[client_output_json]
)
if __name__ == "__main__":
demo.launch() |