Spaces:
Running
Running
import cv2 | |
import numpy as np | |
from PIL import Image, PngImagePlugin | |
def png_encode(im_name, extra): | |
"""Encode watermark using PNG metadata""" | |
try: | |
im = Image.open(im_name) | |
info = PngImagePlugin.PngInfo() | |
info.add_text("TXT", extra) | |
im.save("test.png", pnginfo=info) | |
return "test.png", "Watermark added successfully" | |
except Exception as e: | |
return im_name, f"Error adding watermark: {str(e)}" | |
def to_bin(data): | |
"""Convert data to binary format as string""" | |
if isinstance(data, str): | |
return ''.join(format(x, '08b') for x in data.encode('utf-8')) | |
elif isinstance(data, bytes): | |
return ''.join(format(x, '08b') for x in data) | |
elif isinstance(data, np.ndarray): | |
return [format(i, "08b") for i in data] | |
elif isinstance(data, int) or isinstance(data, np.uint8): | |
return format(data, "08b") | |
else: | |
raise TypeError("Type not supported.") | |
def decode(image_name, txt=None): | |
"""Decode watermark from image""" | |
try: | |
# First try PNG metadata method | |
try: | |
im = Image.open(image_name) | |
if "TXT" in im.info: | |
return im.info["TXT"] | |
except: | |
pass | |
# Steganography method | |
image = cv2.imread(image_name) | |
if image is None: | |
raise ValueError("Could not read image file") | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
binary_data = "" | |
# Extract binary data from image | |
for row in image: | |
for pixel in row: | |
r, g, b = to_bin(pixel) | |
binary_data += r[-1] | |
binary_data += g[-1] | |
binary_data += b[-1] | |
# Convert binary string to bytes | |
bytes_data = b'' | |
for i in range(0, len(binary_data), 8): | |
byte = binary_data[i:i+8] | |
if len(byte) == 8: | |
try: | |
byte_val = int(byte, 2) | |
bytes_data += bytes([byte_val]) | |
except ValueError: | |
continue | |
# Try to find the end marker in raw bytes | |
try: | |
end_marker = b'=====' | |
if end_marker in bytes_data: | |
bytes_data = bytes_data.split(end_marker)[0] | |
# Try to find the delimiter in raw bytes | |
delimiter = b'#####' | |
if delimiter in bytes_data: | |
bytes_data = bytes_data.split(delimiter)[0] | |
# Decode the bytes to string | |
decoded_text = bytes_data.decode('utf-8', errors='ignore') | |
return decoded_text.strip() | |
except Exception as e: | |
print(f"Decoding error: {e}") | |
# If regular decoding fails, try character by character | |
result = "" | |
current_bytes = bytearray() | |
for byte in bytes_data: | |
current_bytes.append(byte) | |
try: | |
char = current_bytes.decode('utf-8') | |
result += char | |
current_bytes = bytearray() | |
except UnicodeDecodeError: | |
continue | |
if result: | |
return result.strip() | |
return "Error: Could not decode watermark" | |
except Exception as e: | |
return f"Error detecting watermark: {str(e)}" | |
def encode(image_name, secret_data, txt=None): | |
"""Encode watermark using steganography""" | |
try: | |
image = cv2.imread(image_name) | |
if image is None: | |
raise ValueError("Could not read image file") | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
# Calculate maximum bytes that can be encoded | |
n_bytes = image.shape[0] * image.shape[1] * 3 // 8 | |
# Prepare the data with delimiters | |
secret_data = str(secret_data) | |
complete_data = secret_data + "#####" + "=====" | |
# Convert to binary | |
binary_secret_data = to_bin(complete_data) | |
# Check if the data can fit in the image | |
if len(binary_secret_data) > n_bytes * 8: | |
return image_name, "Watermark is too large for Image Size" | |
data_index = 0 | |
binary_len = len(binary_secret_data) | |
# Embed the data | |
for i in range(image.shape[0]): | |
for j in range(image.shape[1]): | |
if data_index < binary_len: | |
pixel = image[i, j] | |
for color_channel in range(3): | |
if data_index < binary_len: | |
# Get the binary value of the pixel | |
binary_value = format(pixel[color_channel], '08b') | |
# Replace the least significant bit | |
binary_value = binary_value[:-1] + binary_secret_data[data_index] | |
# Update the pixel value | |
image[i, j, color_channel] = int(binary_value, 2) | |
data_index += 1 | |
else: | |
break | |
# Save the result | |
output_path = "watermarked_image.png" | |
cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) | |
return output_path, "Watermark embedded successfully" | |
except Exception as e: | |
return image_name, f"Error encoding watermark: {str(e)}" |