SecureWatermark / utils.py
fantos's picture
Update utils.py
a12f164 verified
raw
history blame
7.68 kB
# utils.py
import cv2
import numpy as np
from PIL import Image, PngImagePlugin, ImageDraw
import json
from datetime import datetime
from cryptography.fernet import Fernet
import base64
import hashlib
class WatermarkProcessor:
def __init__(self, encryption_key=None):
"""Initialize with optional encryption key"""
if encryption_key:
self.fernet = Fernet(encryption_key)
else:
key = Fernet.generate_key()
self.fernet = Fernet(key)
def to_bin(self, data):
"""Convert data to binary format as string"""
if isinstance(data, str):
return ''.join(format(ord(char), '08b') for char in data)
elif isinstance(data, bytes):
return ''.join(format(x, '08b') for x in data)
elif isinstance(data, np.ndarray):
return [format(i, "08b") for i in data]
elif isinstance(data, int) or isinstance(data, np.uint8):
return format(data, "08b")
else:
raise TypeError("Type not supported.")
def create_preview(self, image_path, watermark_text, opacity=0.3):
"""Create a preview of watermark on image"""
try:
image = Image.open(image_path)
txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0))
draw = ImageDraw.Draw(txt_layer)
# Calculate text position
text_width = draw.textlength(watermark_text)
text_x = (image.width - text_width) // 2
text_y = image.height // 2
# Add watermark text
draw.text((text_x, text_y), watermark_text,
fill=(255, 255, 255, int(255 * opacity)))
# Combine layers
preview = Image.alpha_composite(image.convert('RGBA'), txt_layer)
return preview
except Exception as e:
return None
def png_encode(self, im_name, extra):
"""Encode watermark using PNG metadata"""
try:
im = Image.open(im_name)
info = PngImagePlugin.PngInfo()
info.add_text("TXT", extra)
im.save("test.png", pnginfo=info)
return "test.png", "Watermark added successfully"
except Exception as e:
return im_name, f"Error adding watermark: {str(e)}"
def encode(self, image_path, watermark_text, metadata=None):
"""Encode watermark using simple LSB steganography"""
try:
image = cv2.imread(image_path)
if image is None:
raise ValueError("Could not read image file")
# Prepare watermark data
data = {
'text': watermark_text,
'timestamp': datetime.now().isoformat(),
'metadata': metadata or {}
}
# Convert to string and add delimiter
secret_data = json.dumps(data, ensure_ascii=False) + "###END###"
# Convert to binary string
binary_secret = ''.join(format(ord(char), '08b') for char in secret_data)
# Check capacity
if len(binary_secret) > image.shape[0] * image.shape[1] * 3:
return image_path, "Error: Image too small for watermark data"
# Embed data
data_index = 0
for i in range(image.shape[0]):
for j in range(image.shape[1]):
for k in range(3):
if data_index < len(binary_secret):
# Get current pixel value
pixel = image[i, j, k]
# Clear the LSB by setting it to 0 and add the watermark bit
pixel = (pixel & 0xFE) | (int(binary_secret[data_index]) & 0x01)
# Ensure value stays within uint8 range
image[i, j, k] = np.uint8(pixel)
data_index += 1
else:
break
if data_index >= len(binary_secret):
break
if data_index >= len(binary_secret):
break
# Save result
output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
cv2.imwrite(output_path, image)
return output_path, "Watermark added successfully"
except Exception as e:
return image_path, f"Error in encoding: {str(e)}"
def decode(self, image_path):
"""Decode watermark using simple LSB steganography"""
try:
# Try PNG metadata method first
try:
im = Image.open(image_path)
if "TXT" in im.info:
return im.info["TXT"]
except:
pass
# Read image
image = cv2.imread(image_path)
if image is None:
raise ValueError("Could not read image file")
# Extract LSB data
binary_text = ''
# Get LSB from each byte of each pixel
for i in range(image.shape[0]):
for j in range(image.shape[1]):
for k in range(3):
bit = image[i, j, k] & 1
binary_text += str(bit)
# Convert binary to text
text = ''
# Process 8 bits at a time
for i in range(0, len(binary_text)-8, 8):
byte = binary_text[i:i+8]
text += chr(int(byte, 2))
# Check for end marker
if text[-7:] == "###END###":
extracted_text = text[:-7]
try:
# Attempt to parse as JSON
data = json.loads(extracted_text)
return json.dumps(data, ensure_ascii=False, indent=2)
except:
# If not valid JSON, return as plain text
return extracted_text.strip()
# If we got here, try to find any valid JSON in the text
try:
# Convert remaining bits
for i in range(0, len(binary_text), 8):
if i + 8 <= len(binary_text):
byte = binary_text[i:i+8]
text += chr(int(byte, 2))
# Look for JSON markers
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1:
json_text = text[start:end+1]
try:
data = json.loads(json_text)
return json.dumps(data, ensure_ascii=False, indent=2)
except:
pass
except:
pass
return text if text.strip() else "Error: No valid watermark found"
except Exception as e:
return f"Error in decoding: {str(e)}"