|
def generate_rsa_keys(): |
|
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) |
|
private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('utf-8') |
|
public_pem = private_key.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8') |
|
return private_pem, public_pem |
|
|
|
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image: |
|
if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.") |
|
if not public_key_pem.strip(): raise ValueError("Public Key cannot be empty.") |
|
data_dict = {} |
|
for line in secret_data_str.splitlines(): |
|
line = line.strip() |
|
if not line or line.startswith('#'): continue |
|
parts = line.split(':', 1) if ':' in line else line.split('=', 1) |
|
if len(parts) == 2: data_dict[parts[0].strip()] = parts[1].strip().strip("'\"") |
|
if not data_dict: raise ValueError("No valid key-value pairs found.") |
|
json_bytes = json.dumps(data_dict).encode('utf-8') |
|
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8')) |
|
aes_key, nonce = os.urandom(32), os.urandom(12) |
|
ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None) |
|
rsa_encrypted_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)) |
|
encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext |
|
img = Image.new('RGB', (800, 600), color=(45, 52, 54)) |
|
draw = ImageDraw.Draw(img) |
|
try: font = ImageFont.truetype("DejaVuSans.ttf", 40) |
|
except IOError: font = ImageFont.load_default(size=30) |
|
draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms") |
|
pixel_data = np.array(img.convert("RGB")).ravel() |
|
binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload) |
|
if len(binary_payload) > pixel_data.size: raise ValueError("Data too large for image.") |
|
pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8) |
|
stego_pixels = pixel_data.reshape((600, 800, 3)) |
|
return Image.fromarray(stego_pixels, 'RGB') |
|
|
|
def get_server_list(): |
|
status = f"Fetching server list from remote config..." |
|
yield gr.Dropdown(choices=[], value=None, label="β³ Fetching..."), status, [] |
|
try: |
|
response = requests.get(CREATOR_ENDPOINTS_JSON_URL, timeout=10) |
|
response.raise_for_status() |
|
all_entries = response.json() |
|
valid_endpoints = [] |
|
required_keys = ["name", "api_endpoint", "public_key"] |
|
for entry in all_entries: |
|
if all(key in entry for key in required_keys): |
|
valid_endpoints.append(entry) |
|
else: |
|
logger.warning(f"Skipping invalid entry in configuration file: {entry}") |
|
if not valid_endpoints: |
|
raise ValueError("No valid server configurations found in the remote file.") |
|
endpoint_names = [e['name'] for e in valid_endpoints] |
|
status = f"β
Success! Found {len(endpoint_names)} valid servers." |
|
yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Server"), status, valid_endpoints |
|
except Exception as e: |
|
status = f"β Error fetching or parsing configuration: {e}" |
|
yield gr.Dropdown(choices=[], value=None, label="Error fetching servers"), status, [] |
|
|
|
def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list): |
|
if not service_name: raise gr.Error("Please select a target server.") |
|
public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None) |
|
if not public_key: raise gr.Error(f"Could not find public key for '{service_name}'.") |
|
try: |
|
created_image = create_encrypted_image(secret_data, public_key) |
|
return created_image, f"β
Success! Image created for '{service_name}'." |
|
except Exception as e: |
|
return None, f"β Error: {e}" |
|
|
|
def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list): |
|
if not service_name: raise gr.Error("Please select a target server.") |
|
if image is None: raise gr.Error("Please upload an image to send.") |
|
api_endpoint = next((e.get('api_endpoint') for e in available_endpoints if e['name'] == service_name), None) |
|
if not api_endpoint: raise gr.Error(f"Configuration Error: Could not find 'api_endpoint' for '{service_name}'.") |
|
status = f"Connecting to endpoint: {api_endpoint}" |
|
yield None, status |
|
try: |
|
with io.BytesIO() as buffer: |
|
image.save(buffer, format="PNG") |
|
b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8") |
|
payload = {"data": [b64_string]} |
|
headers = {"Content-Type": "application/json"} |
|
response = requests.post(api_endpoint, headers=headers, json=payload, timeout=45) |
|
response_json = response.json() |
|
if response.status_code == 200: |
|
if "data" in response_json: |
|
yield response_json["data"][0], "β
Success! Data decrypted by remote server." |
|
else: |
|
raise gr.Error(f"API returned an unexpected success format: {response_json}") |
|
else: |
|
raise gr.Error(f"API Error (Status {response.status_code}): {response_json.get('error', 'Unknown error')}") |
|
except Exception as e: |
|
yield None, f"β Error calling server API: {e}" |
|
|
|
def refresh_and_update_all(): |
|
for dropdown_update, status_update, state_update in get_server_list(): |
|
pass |
|
return dropdown_update, dropdown_update, status_update, state_update |