Spaces:
Sleeping
Sleeping
# utils.py | |
import cv2 | |
import numpy as np | |
from PIL import Image, PngImagePlugin, ImageDraw | |
import json | |
from datetime import datetime | |
from cryptography.fernet import Fernet | |
import base64 | |
import hashlib | |
class WatermarkProcessor: | |
def __init__(self, encryption_key=None): | |
"""Initialize with optional encryption key""" | |
if encryption_key: | |
self.fernet = Fernet(encryption_key) | |
else: | |
key = Fernet.generate_key() | |
self.fernet = Fernet(key) | |
def to_bin(self, data): | |
"""Convert data to binary format as string""" | |
if isinstance(data, str): | |
return ''.join(format(ord(char), '08b') for char in data) | |
elif isinstance(data, bytes): | |
return ''.join(format(x, '08b') for x in data) | |
elif isinstance(data, np.ndarray): | |
return [format(i, "08b") for i in data] | |
elif isinstance(data, int) or isinstance(data, np.uint8): | |
return format(data, "08b") | |
else: | |
raise TypeError("Type not supported.") | |
def create_preview(self, image_path, watermark_text, opacity=0.3): | |
"""Create a preview of watermark on image""" | |
try: | |
image = Image.open(image_path) | |
txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0)) | |
draw = ImageDraw.Draw(txt_layer) | |
# Calculate text position | |
text_width = draw.textlength(watermark_text) | |
text_x = (image.width - text_width) // 2 | |
text_y = image.height // 2 | |
# Add watermark text | |
draw.text((text_x, text_y), watermark_text, | |
fill=(255, 255, 255, int(255 * opacity))) | |
# Combine layers | |
preview = Image.alpha_composite(image.convert('RGBA'), txt_layer) | |
return preview | |
except Exception as e: | |
return None | |
def png_encode(self, im_name, extra): | |
"""Encode watermark using PNG metadata""" | |
try: | |
im = Image.open(im_name) | |
info = PngImagePlugin.PngInfo() | |
info.add_text("TXT", extra) | |
im.save("test.png", pnginfo=info) | |
return "test.png", "Watermark added successfully" | |
except Exception as e: | |
return im_name, f"Error adding watermark: {str(e)}" | |
def encode(self, image_path, watermark_text, metadata=None): | |
"""Encode watermark using simple LSB steganography""" | |
try: | |
image = cv2.imread(image_path) | |
if image is None: | |
raise ValueError("Could not read image file") | |
# Prepare watermark data | |
data = { | |
'text': watermark_text, | |
'timestamp': datetime.now().isoformat(), | |
'metadata': metadata or {} | |
} | |
# Convert to string and add delimiter | |
secret_data = json.dumps(data, ensure_ascii=False) + "###END###" | |
# Convert to binary string | |
binary_secret = ''.join(format(ord(char), '08b') for char in secret_data) | |
# Check capacity | |
if len(binary_secret) > image.shape[0] * image.shape[1] * 3: | |
return image_path, "Error: Image too small for watermark data" | |
# Embed data | |
data_index = 0 | |
for i in range(image.shape[0]): | |
for j in range(image.shape[1]): | |
for k in range(3): | |
if data_index < len(binary_secret): | |
# Get current pixel value | |
pixel = image[i, j, k] | |
# Clear the LSB by setting it to 0 and add the watermark bit | |
pixel = (pixel & 0xFE) | (int(binary_secret[data_index]) & 0x01) | |
# Ensure value stays within uint8 range | |
image[i, j, k] = np.uint8(pixel) | |
data_index += 1 | |
else: | |
break | |
if data_index >= len(binary_secret): | |
break | |
if data_index >= len(binary_secret): | |
break | |
# Save result | |
output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
cv2.imwrite(output_path, image) | |
return output_path, "Watermark added successfully" | |
except Exception as e: | |
return image_path, f"Error in encoding: {str(e)}" | |
def decode(self, image_path): | |
"""Decode watermark using simple LSB steganography""" | |
try: | |
# Try PNG metadata method first | |
try: | |
im = Image.open(image_path) | |
if "TXT" in im.info: | |
return im.info["TXT"] | |
except: | |
pass | |
# Read image | |
image = cv2.imread(image_path) | |
if image is None: | |
raise ValueError("Could not read image file") | |
# Extract LSB data | |
binary_text = '' | |
# Get LSB from each byte of each pixel | |
for i in range(image.shape[0]): | |
for j in range(image.shape[1]): | |
for k in range(3): | |
bit = image[i, j, k] & 1 | |
binary_text += str(bit) | |
# Convert binary to text | |
text = '' | |
# Process 8 bits at a time | |
for i in range(0, len(binary_text)-8, 8): | |
byte = binary_text[i:i+8] | |
text += chr(int(byte, 2)) | |
# Check for end marker | |
if text[-7:] == "###END###": | |
extracted_text = text[:-7] | |
try: | |
# Attempt to parse as JSON | |
data = json.loads(extracted_text) | |
return json.dumps(data, ensure_ascii=False, indent=2) | |
except: | |
# If not valid JSON, return as plain text | |
return extracted_text.strip() | |
# If we got here, try to find any valid JSON in the text | |
try: | |
# Convert remaining bits | |
for i in range(0, len(binary_text), 8): | |
if i + 8 <= len(binary_text): | |
byte = binary_text[i:i+8] | |
text += chr(int(byte, 2)) | |
# Look for JSON markers | |
start = text.find('{') | |
end = text.rfind('}') | |
if start != -1 and end != -1: | |
json_text = text[start:end+1] | |
try: | |
data = json.loads(json_text) | |
return json.dumps(data, ensure_ascii=False, indent=2) | |
except: | |
pass | |
except: | |
pass | |
return text if text.strip() else "Error: No valid watermark found" | |
except Exception as e: | |
return f"Error in decoding: {str(e)}" | |