File size: 5,920 Bytes
71bdca5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7a2942e
71bdca5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7a2942e
71bdca5
 
cefc939
71bdca5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7a2942e
71bdca5
 
 
7191e6a
71bdca5
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def generate_rsa_keys():
    private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
    private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('utf-8')
    public_pem = private_key.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8')
    return private_pem, public_pem

def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
    if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.")
    if not public_key_pem.strip(): raise ValueError("Public Key cannot be empty.")
    data_dict = {}
    for line in secret_data_str.splitlines():
        line = line.strip()
        if not line or line.startswith('#'): continue
        parts = line.split(':', 1) if ':' in line else line.split('=', 1)
        if len(parts) == 2: data_dict[parts[0].strip()] = parts[1].strip().strip("'\"")
    if not data_dict: raise ValueError("No valid key-value pairs found.")
    json_bytes = json.dumps(data_dict).encode('utf-8')
    public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
    aes_key, nonce = os.urandom(32), os.urandom(12)
    ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
    rsa_encrypted_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
    encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext
    img = Image.new('RGB', (800, 600), color=(45, 52, 54))
    draw = ImageDraw.Draw(img)
    try: font = ImageFont.truetype("DejaVuSans.ttf", 40)
    except IOError: font = ImageFont.load_default(size=30)
    draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
    pixel_data = np.array(img.convert("RGB")).ravel()
    binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload)
    if len(binary_payload) > pixel_data.size: raise ValueError("Data too large for image.")
    pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
    stego_pixels = pixel_data.reshape((600, 800, 3))
    return Image.fromarray(stego_pixels, 'RGB')

def get_server_list():
    status = f"Fetching server list from remote config..."
    yield gr.Dropdown(choices=[], value=None, label="⏳ Fetching..."), status, []
    try:
        response = requests.get(CREATOR_ENDPOINTS_JSON_URL, timeout=10)
        response.raise_for_status()
        all_entries = response.json()
        valid_endpoints = []
        required_keys = ["name", "api_endpoint", "public_key"]
        for entry in all_entries:
            if all(key in entry for key in required_keys):
                valid_endpoints.append(entry)
            else:
                logger.warning(f"Skipping invalid entry in configuration file: {entry}")
        if not valid_endpoints:
            raise ValueError("No valid server configurations found in the remote file.")
        endpoint_names = [e['name'] for e in valid_endpoints]
        status = f"βœ… Success! Found {len(endpoint_names)} valid servers."
        yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Server"), status, valid_endpoints
    except Exception as e:
        status = f"❌ Error fetching or parsing configuration: {e}"
        yield gr.Dropdown(choices=[], value=None, label="Error fetching servers"), status, []

def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list):
    if not service_name: raise gr.Error("Please select a target server.")
    public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None)
    if not public_key: raise gr.Error(f"Could not find public key for '{service_name}'.")
    try:
        created_image = create_encrypted_image(secret_data, public_key)
        return created_image, f"βœ… Success! Image created for '{service_name}'."
    except Exception as e:
        return None, f"❌ Error: {e}"

def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list):
    if not service_name: raise gr.Error("Please select a target server.")
    if image is None: raise gr.Error("Please upload an image to send.")
    api_endpoint = next((e.get('api_endpoint') for e in available_endpoints if e['name'] == service_name), None)
    if not api_endpoint: raise gr.Error(f"Configuration Error: Could not find 'api_endpoint' for '{service_name}'.")
    status = f"Connecting to endpoint: {api_endpoint}"
    yield None, status
    try:
        with io.BytesIO() as buffer:
            image.save(buffer, format="PNG")
            b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
        payload = {"data": [b64_string]}
        headers = {"Content-Type": "application/json"}
        response = requests.post(api_endpoint, headers=headers, json=payload, timeout=45)
        response_json = response.json()
        if response.status_code == 200:
            if "data" in response_json:
                yield response_json["data"][0], "βœ… Success! Data decrypted by remote server."
            else:
                raise gr.Error(f"API returned an unexpected success format: {response_json}")
        else:
            raise gr.Error(f"API Error (Status {response.status_code}): {response_json.get('error', 'Unknown error')}")
    except Exception as e:
        yield None, f"❌ Error calling server API: {e}"

def refresh_and_update_all():
    for dropdown_update, status_update, state_update in get_server_list():
        pass
    return dropdown_update, dropdown_update, status_update, state_update