File size: 10,417 Bytes
5cf429e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import gradio as gr
import json
import os
import io
import base64
import struct
import logging
import tempfile
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.exceptions import InvalidTag

# --- Configure Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# ==============================================================================
#  LOAD KEYS AND CONFIG
# ==============================================================================
KEYLOCK_PRIV_KEY = os.environ.get('KEYLOCK_PRIV_KEY')
PUBLIC_KEY_PEM_STRING = ""
try:
    with open("keylock_pub.pem", "r") as f:
        PUBLIC_KEY_PEM_STRING = f.read()
    logger.info("Successfully loaded public key from keylock_pub.pem.")
except FileNotFoundError:
    PUBLIC_KEY_PEM_STRING = "Error: keylock_pub.pem file not found. 'Creator' tab will not work."

# ==============================================================================
#  CREATOR LOGIC
# ==============================================================================
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
    if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.")
    
    # 1. Parse & Encrypt Data
    data_dict = {}
    for line in secret_data_str.splitlines():
        if not line.strip() or line.strip().startswith('#'): continue
        key, value = line.split(':', 1) if ':' in line else line.split('=', 1)
        data_dict[key.strip()] = value.strip().strip("'\"")

    json_bytes = json.dumps(data_dict).encode('utf-8')
    public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
    aes_key = os.urandom(32)
    nonce = os.urandom(12)
    ciphertext_with_tag = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
    rsa_encrypted_aes_key = public_key.encrypt(
        aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
    )
    encrypted_aes_key_len_bytes = struct.pack('>I', len(rsa_encrypted_aes_key))
    encrypted_payload = encrypted_aes_key_len_bytes + rsa_encrypted_aes_key + nonce + ciphertext_with_tag

    # 2. Create Carrier Image and Embed Data
    img = Image.new('RGB', (800, 600), color=(45, 52, 54))
    draw = ImageDraw.Draw(img)
    try:
        font = ImageFont.truetype("DejaVuSans.ttf", 40)
    except IOError:
        font = ImageFont.load_default()
    draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
    
    img_rgb = img.convert("RGB")
    pixel_data = np.array(img_rgb).ravel()
    data_length_header = struct.pack('>I', len(encrypted_payload))
    binary_payload = ''.join(format(byte, '08b') for byte in data_length_header + encrypted_payload)
    if len(binary_payload) > pixel_data.size:
        raise ValueError("Data is too large for the image capacity.")
    for i in range(len(binary_payload)):
        pixel_data[i] = (pixel_data[i] & 0xFE) | int(binary_payload[i])
    stego_pixels = pixel_data.reshape(img_rgb.size[1], img_rgb.size[0], 3)
    return Image.fromarray(stego_pixels, 'RGB')

# ==============================================================================
#  SERVER (DECODER) LOGIC
# ==============================================================================
def decode_data_from_image(image: Image.Image) -> dict:
    if not KEYLOCK_PRIV_KEY:
        raise gr.Error("Server Error: The API is not configured with a private key.")
    if image is None:
        raise gr.Error("Please provide an image to decode.")
    
    try:
        private_key = serialization.load_pem_private_key(KEYLOCK_PRIV_KEY.encode(), password=None)
        with io.BytesIO() as buffer:
            image.save(buffer, format="PNG")
            image_buffer = buffer.getvalue()
        
        img_rgb = Image.open(io.BytesIO(image_buffer)).convert("RGB")
        pixel_data = np.array(img_rgb).ravel()
        
        header_binary_string = "".join(str(p & 1) for p in pixel_data[:32])
        data_length = int(header_binary_string, 2)
        end_offset = 32 + (data_length * 8)
        if pixel_data.size < end_offset: raise ValueError("Image data corrupt or truncated.")
        
        data_binary_string = "".join(str(p & 1) for p in pixel_data[32:end_offset])
        crypto_payload = int(data_binary_string, 2).to_bytes(data_length, byteorder='big')
        
        rsa_key_len = private_key.key_size // 8
        len_header = struct.unpack('>I', crypto_payload[:4])[0]
        if len_header != rsa_key_len: raise ValueError("Key pair mismatch or corrupt data.")
        
        encrypted_key = crypto_payload[4:4 + rsa_key_len]
        nonce = crypto_payload[4 + rsa_key_len : 4 + rsa_key_len + 12]
        ciphertext = crypto_payload[4 + rsa_key_len + 12:]
        
        aes_key = private_key.decrypt(encrypted_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
        decrypted_bytes = AESGCM(aes_key).decrypt(nonce, ciphertext, None)
        
        logger.info("Successfully decoded an image via internal call.")
        return json.loads(decrypted_bytes.decode('utf-8'))
        
    except Exception as e:
        logger.error(f"Decryption failed: {e}", exc_info=True)
        raise gr.Error(f"Decryption failed. The image may be corrupt, not encrypted with the correct key, or the server is misconfigured. Details: {e}")

# ==============================================================================
#  GRADIO UI HELPER FUNCTIONS
# ==============================================================================
def creator_wrapper(secret_data_str: str):
    """Wrapper for the Gradio UI to call the creator function."""
    if not PUBLIC_KEY_PEM_STRING or "Error" in PUBLIC_KEY_PEM_STRING:
        raise gr.Error("Cannot create image: public key file is missing or invalid.")
    try:
        encrypted_image = create_encrypted_image(secret_data_str, PUBLIC_KEY_PEM_STRING)
        with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
            encrypted_image.save(tmp.name)
            return encrypted_image, tmp.name, "βœ… Success! Image created and ready for download."
    except Exception as e:
        return None, None, f"❌ Error: {e}"

# ==============================================================================
#  GRADIO DASHBOARD INTERFACE
# ==============================================================================
theme = gr.themes.Base(
    primary_hue="blue",
    secondary_hue="blue",
    neutral_hue="slate",
    font=(gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"),
).set(
    body_background_fill="#F0F2F5",
    block_background_fill="white",
    block_border_width="1px",
    block_shadow="*shadow_drop_lg",
    button_primary_background_fill="*primary_500",
    button_primary_background_fill_hover="*primary_600",
    button_primary_text_color="white",
)

with gr.Blocks(theme=theme, title="KeyLock Operations Dashboard") as demo:
    gr.Markdown("# πŸ”‘ KeyLock Operations Dashboard")
    gr.Markdown("A unified dashboard to demonstrate the KeyLock ecosystem: API Server, Image Creator, and API Client.")

    with gr.Tabs():
        with gr.TabItem("πŸ“‘ API Server Status & Docs", id=0):
            gr.Markdown("## Server Status")
            key_status = "βœ… Loaded successfully from secrets." if KEYLOCK_PRIV_KEY else "❌ NOT FOUND. The API will not work. Set the `KEYLOCK_PRIV_KEY` secret in the Space settings."
            gr.Textbox(label="Private Key Status", value=key_status, interactive=False)
            
            with gr.Accordion("View Service Public Key", open=False):
                gr.Code(value=PUBLIC_KEY_PEM_STRING, language="pem", label="Service Public Key (from keylock_pub.pem)")
            
            gr.Markdown("## API Documentation")
            gr.Markdown("This server exposes a conceptual API endpoint for external clients.")
            gr.Code(language="markdown", value="""
- **Endpoint**: `/run/keylock-auth-decoder`
- **Method**: `POST`
- **Body**: JSON payload `{ "data": ["<base64_encoded_image_string>"] }`
""")

        with gr.TabItem("🏭 Image Creator", id=1):
            gr.Markdown("## Create an Encrypted Image for the Server")
            gr.Markdown("Use this tool to encrypt data using the server's public key.")
            with gr.Row():
                with gr.Column(scale=2):
                    creator_input = gr.Textbox(
                        lines=7,
                        label="Secret Data (Key-Value Pairs)",
                        placeholder="USERNAME: [email protected]\nAPI_KEY: sk-12345..."
                    )
                    creator_button = gr.Button("Create Encrypted Image", variant="primary")
                with gr.Column(scale=1):
                    creator_status = gr.Textbox(label="Status", interactive=False)
                    creator_image_output = gr.Image(label="Generated Encrypted Image", type="pil")
                    creator_download_output = gr.File(label="Download Image")

        with gr.TabItem("πŸ’» API Client (Simulator)", id=2):
            gr.Markdown("## Decrypt an Image via Simulated API Call")
            gr.Markdown("Upload an image created by the 'Creator' tab to test the server's decryption logic.")
            with gr.Row():
                with gr.Column(scale=1):
                    client_input_image = gr.Image(type="pil", label="Upload Encrypted Image", sources=["upload", "clipboard"])
                    client_decrypt_button = gr.Button("Decrypt Image", variant="primary")
                with gr.Column(scale=1):
                    client_output_json = gr.JSON(label="Decrypted Data from Server")

    # --- Wire up the component logic ---
    creator_button.click(
        fn=creator_wrapper,
        inputs=[creator_input],
        outputs=[creator_image_output, creator_download_output, creator_status]
    )
    
    client_decrypt_button.click(
        fn=decode_data_from_image,
        inputs=[client_input_image],
        outputs=[client_output_json]
    )

if __name__ == "__main__":
    demo.launch()