broadfield-dev's picture
Update app.py
71bdca5 verified
raw
history blame
5.92 kB
def generate_rsa_keys():
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('utf-8')
public_pem = private_key.public_key().public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo).decode('utf-8')
return private_pem, public_pem
def create_encrypted_image(secret_data_str: str, public_key_pem: str) -> Image.Image:
if not secret_data_str.strip(): raise ValueError("Secret data cannot be empty.")
if not public_key_pem.strip(): raise ValueError("Public Key cannot be empty.")
data_dict = {}
for line in secret_data_str.splitlines():
line = line.strip()
if not line or line.startswith('#'): continue
parts = line.split(':', 1) if ':' in line else line.split('=', 1)
if len(parts) == 2: data_dict[parts[0].strip()] = parts[1].strip().strip("'\"")
if not data_dict: raise ValueError("No valid key-value pairs found.")
json_bytes = json.dumps(data_dict).encode('utf-8')
public_key = serialization.load_pem_public_key(public_key_pem.encode('utf-8'))
aes_key, nonce = os.urandom(32), os.urandom(12)
ciphertext = AESGCM(aes_key).encrypt(nonce, json_bytes, None)
rsa_encrypted_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
encrypted_payload = struct.pack('>I', len(rsa_encrypted_key)) + rsa_encrypted_key + nonce + ciphertext
img = Image.new('RGB', (800, 600), color=(45, 52, 54))
draw = ImageDraw.Draw(img)
try: font = ImageFont.truetype("DejaVuSans.ttf", 40)
except IOError: font = ImageFont.load_default(size=30)
draw.text((400, 300), "KeyLock Secure Data", fill=(223, 230, 233), font=font, anchor="ms")
pixel_data = np.array(img.convert("RGB")).ravel()
binary_payload = ''.join(format(b, '08b') for b in struct.pack('>I', len(encrypted_payload)) + encrypted_payload)
if len(binary_payload) > pixel_data.size: raise ValueError("Data too large for image.")
pixel_data[:len(binary_payload)] = (pixel_data[:len(binary_payload)] & 0xFE) | np.array(list(binary_payload), dtype=np.uint8)
stego_pixels = pixel_data.reshape((600, 800, 3))
return Image.fromarray(stego_pixels, 'RGB')
def get_server_list():
status = f"Fetching server list from remote config..."
yield gr.Dropdown(choices=[], value=None, label="⏳ Fetching..."), status, []
try:
response = requests.get(CREATOR_ENDPOINTS_JSON_URL, timeout=10)
response.raise_for_status()
all_entries = response.json()
valid_endpoints = []
required_keys = ["name", "api_endpoint", "public_key"]
for entry in all_entries:
if all(key in entry for key in required_keys):
valid_endpoints.append(entry)
else:
logger.warning(f"Skipping invalid entry in configuration file: {entry}")
if not valid_endpoints:
raise ValueError("No valid server configurations found in the remote file.")
endpoint_names = [e['name'] for e in valid_endpoints]
status = f"βœ… Success! Found {len(endpoint_names)} valid servers."
yield gr.Dropdown(choices=endpoint_names, value=endpoint_names[0] if endpoint_names else None, label="Target Server"), status, valid_endpoints
except Exception as e:
status = f"❌ Error fetching or parsing configuration: {e}"
yield gr.Dropdown(choices=[], value=None, label="Error fetching servers"), status, []
def create_keylock_wrapper(service_name: str, secret_data: str, available_endpoints: list):
if not service_name: raise gr.Error("Please select a target server.")
public_key = next((e['public_key'] for e in available_endpoints if e['name'] == service_name), None)
if not public_key: raise gr.Error(f"Could not find public key for '{service_name}'.")
try:
created_image = create_encrypted_image(secret_data, public_key)
return created_image, f"βœ… Success! Image created for '{service_name}'."
except Exception as e:
return None, f"❌ Error: {e}"
def send_keylock_wrapper(service_name: str, image: Image.Image, available_endpoints: list):
if not service_name: raise gr.Error("Please select a target server.")
if image is None: raise gr.Error("Please upload an image to send.")
api_endpoint = next((e.get('api_endpoint') for e in available_endpoints if e['name'] == service_name), None)
if not api_endpoint: raise gr.Error(f"Configuration Error: Could not find 'api_endpoint' for '{service_name}'.")
status = f"Connecting to endpoint: {api_endpoint}"
yield None, status
try:
with io.BytesIO() as buffer:
image.save(buffer, format="PNG")
b64_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
payload = {"data": [b64_string]}
headers = {"Content-Type": "application/json"}
response = requests.post(api_endpoint, headers=headers, json=payload, timeout=45)
response_json = response.json()
if response.status_code == 200:
if "data" in response_json:
yield response_json["data"][0], "βœ… Success! Data decrypted by remote server."
else:
raise gr.Error(f"API returned an unexpected success format: {response_json}")
else:
raise gr.Error(f"API Error (Status {response.status_code}): {response_json.get('error', 'Unknown error')}")
except Exception as e:
yield None, f"❌ Error calling server API: {e}"
def refresh_and_update_all():
for dropdown_update, status_update, state_update in get_server_list():
pass
return dropdown_update, dropdown_update, status_update, state_update