broadfield-dev's picture
Create app.py
5cf429e verified
raw
history blame
10.4 kB
import gradio as gr
import json
import os
import io
import base64
import struct
import logging
import tempfile
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.exceptions import InvalidTag
# --- Configure Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ==============================================================================
# LOAD KEYS AND CONFIG
# ==============================================================================
KEYLOCK_PRIV_KEY = os.environ.get('KEYLOCK_PRIV_KEY')
PUBLIC_KEY_PEM_STRING = ""
try:
with open("keylock_pub.pem", "r") as f:
PUBLIC_KEY_PEM_STRING = f.read()
logger.info("Successfully loaded public key from keylock_pub.pem.")
except FileNotFoundError:
PUBLIC_KEY_PEM_STRING = "Error: keylock_pub.pem file not found. 'Creator' tab will not work."
# ==============================================================================
# CREATOR LOGIC
# ==============================================================================
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.")
# 1. Parse & Encrypt Data
data_dict = {}
for line in secret_data_str.splitlines():
if not line.strip() or line.strip().startswith('#'): continue
key, value = line.split(':', 1) if ':' in line else line.split('=', 1)
data_dict[key.strip()] = value.strip().strip("'\"")
json_bytes = json.dumps(data_dict).encode('utf-8')
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
aes_key = os.urandom(32)
nonce = os.urandom(12)
ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
rsa_encrypted_aes_key = public_key.encrypt(
aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
encrypted_aes_key_len_bytes = struct.pack('>I', len(rsa_encrypted_aes_key))
encrypted_payload = encrypted_aes_key_len_bytes + rsa_encrypted_aes_key + nonce + ciphertext_with_tag
# 2. Create Carrier Image and Embed Data
img = Image.new('RGB', (800, 600), color=(45, 52, 54))
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("DejaVuSans.ttf", 40)
except IOError:
font = ImageFont.load_default()
draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
img_rgb = img.convert("RGB")
pixel_data = np.array(img_rgb).ravel()
data_length_header = struct.pack('>I', len(encrypted_payload))
binary_payload = ''.join(format(byte, '08b') for byte in data_length_header + encrypted_payload)
if len(binary_payload) > pixel_data.size:
raise ValueError("Data is too large for the image capacity.")
for i in range(len(binary_payload)):
pixel_data[i] = (pixel_data[i] & 0xFE) | int(binary_payload[i])
stego_pixels = pixel_data.reshape(img_rgb.size[1], img_rgb.size[0], 3)
return Image.fromarray(stego_pixels, 'RGB')
# ==============================================================================
# SERVER (DECODER) LOGIC
# ==============================================================================
def decode_data_from_image(image: Image.Image) -> dict:
if not KEYLOCK_PRIV_KEY:
raise gr.Error("Server Error: The API is not configured with a private key.")
if image is None:
raise gr.Error("Please provide an image to decode.")
try:
private_key = serialization.load_pem_private_key(KEYLOCK_PRIV_KEY.encode(), password=None)
with io.BytesIO() as buffer:
image.save(buffer, format="PNG")
image_buffer = buffer.getvalue()
img_rgb = Image.open(io.BytesIO(image_buffer)).convert("RGB")
pixel_data = np.array(img_rgb).ravel()
header_binary_string = "".join(str(p & 1) for p in pixel_data[:32])
data_length = int(header_binary_string, 2)
end_offset = 32 + (data_length * 8)
if pixel_data.size < end_offset: raise ValueError("Image data corrupt or truncated.")
data_binary_string = "".join(str(p & 1) for p in pixel_data[32:end_offset])
crypto_payload = int(data_binary_string, 2).to_bytes(data_length, byteorder='big')
rsa_key_len = private_key.key_size // 8
len_header = struct.unpack('>I', crypto_payload[:4])[0]
if len_header != rsa_key_len: raise ValueError("Key pair mismatch or corrupt data.")
encrypted_key = crypto_payload[4:4 + rsa_key_len]
nonce = crypto_payload[4 + rsa_key_len : 4 + rsa_key_len + 12]
ciphertext = crypto_payload[4 + rsa_key_len + 12:]
aes_key = private_key.decrypt(encrypted_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
decrypted_bytes = AESGCM(aes_key).decrypt(nonce, ciphertext, None)
logger.info("Successfully decoded an image via internal call.")
return json.loads(decrypted_bytes.decode('utf-8'))
except Exception as e:
logger.error(f"Decryption failed: {e}", exc_info=True)
raise gr.Error(f"Decryption failed. The image may be corrupt, not encrypted with the correct key, or the server is misconfigured. Details: {e}")
# ==============================================================================
# GRADIO UI HELPER FUNCTIONS
# ==============================================================================
def creator_wrapper(secret_data_str: str):
"""Wrapper for the Gradio UI to call the creator function."""
if not PUBLIC_KEY_PEM_STRING or "Error" in PUBLIC_KEY_PEM_STRING:
raise gr.Error("Cannot create image: public key file is missing or invalid.")
try:
encrypted_image = create_encrypted_image(secret_data_str, PUBLIC_KEY_PEM_STRING)
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
encrypted_image.save(tmp.name)
return encrypted_image, tmp.name, "βœ… Success! Image created and ready for download."
except Exception as e:
return None, None, f"❌ Error: {e}"
# ==============================================================================
# GRADIO DASHBOARD INTERFACE
# ==============================================================================
theme = gr.themes.Base(
primary_hue="blue",
secondary_hue="blue",
neutral_hue="slate",
font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"),
).set(
body_background_fill="#F0F2F5",
block_background_fill="white",
block_border_width="1px",
block_shadow="*shadow_drop_lg",
button_primary_background_fill="*primary_500",
button_primary_background_fill_hover="*primary_600",
button_primary_text_color="white",
)
with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
gr.Markdown("# πŸ”‘ KeyLock Operations Dashboard")
gr.Markdown("A unified dashboard to demonstrate the KeyLock ecosystem: API Server, Image Creator, and API Client.")
with gr.Tabs():
with gr.TabItem("πŸ“‘ API Server Status & Docs", id=0):
gr.Markdown("## Server Status")
key_status = "βœ… Loaded successfully from secrets." if KEYLOCK_PRIV_KEY else "❌ NOT FOUND. The API will not work. Set the `KEYLOCK_PRIV_KEY` secret in the Space settings."
gr.Textbox(label="Private Key Status", value=key_status, interactive=False)
with gr.Accordion("View Service Public Key", open=False):
gr.Code(value=PUBLIC_KEY_PEM_STRING, language="pem", label="Service Public Key (from keylock_pub.pem)")
gr.Markdown("## API Documentation")
gr.Markdown("This server exposes a conceptual API endpoint for external clients.")
gr.Code(language="markdown", value="""
- **Endpoint**: `/run/keylock-auth-decoder`
- **Method**: `POST`
- **Body**: JSON payload `{ "data": ["<base64_encoded_image_string>"] }`
""")
with gr.TabItem("🏭 Image Creator", id=1):
gr.Markdown("## Create an Encrypted Image for the Server")
gr.Markdown("Use this tool to encrypt data using the server's public key.")
with gr.Row():
with gr.Column(scale=2):
creator_input = gr.Textbox(
lines=7,
label="Secret Data (Key-Value Pairs)",
placeholder="USERNAME: [email protected]\nAPI_KEY: sk-12345..."
)
creator_button = gr.Button("Create Encrypted Image", variant="primary")
with gr.Column(scale=1):
creator_status = gr.Textbox(label="Status", interactive=False)
creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil")
creator_download_output = gr.File(label="Download Image")
with gr.TabItem("πŸ’» API Client (Simulator)", id=2):
gr.Markdown("## Decrypt an Image via Simulated API Call")
gr.Markdown("Upload an image created by the 'Creator' tab to test the server's decryption logic.")
with gr.Row():
with gr.Column(scale=1):
client_input_image = gr.Image(type="pil", label="Upload Encrypted Image", sources=["upload", "clipboard"])
client_decrypt_button = gr.Button("Decrypt Image", variant="primary")
with gr.Column(scale=1):
client_output_json = gr.JSON(label="Decrypted Data from Server")
# --- Wire up the component logic ---
creator_button.click(
fn=creator_wrapper,
inputs=[creator_input],
outputs=[creator_image_output, creator_download_output, creator_status]
)
client_decrypt_button.click(
fn=decode_data_from_image,
inputs=[client_input_image],
outputs=[client_output_json]
)
if __name__ == "__main__":
demo.launch()