# utils.py import cv2 import numpy as np from PIL import Image, PngImagePlugin, ImageDraw import json from datetime import datetime from cryptography.fernet import Fernet import base64 import hashlib class WatermarkProcessor: def __init__(self, encryption_key=None): """Initialize with optional encryption key""" if encryption_key: self.fernet = Fernet(encryption_key) else: key = Fernet.generate_key() self.fernet = Fernet(key) def to_bin(self, data): """Convert data to binary format as string""" if isinstance(data, str): return ''.join(format(x, '08b') for x in data.encode('utf-8')) elif isinstance(data, bytes): return ''.join(format(x, '08b') for x in data) elif isinstance(data, np.ndarray): return [format(i, "08b") for i in data] elif isinstance(data, int) or isinstance(data, np.uint8): return format(data, "08b") else: raise TypeError("Type not supported.") def create_preview(self, image_path, watermark_text, opacity=0.3): """Create a preview of watermark on image""" try: image = Image.open(image_path) txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0)) draw = ImageDraw.Draw(txt_layer) # Calculate text position text_width = draw.textlength(watermark_text) text_x = (image.width - text_width) // 2 text_y = image.height // 2 # Add watermark text draw.text((text_x, text_y), watermark_text, fill=(255, 255, 255, int(255 * opacity))) # Combine layers preview = Image.alpha_composite(image.convert('RGBA'), txt_layer) return preview except Exception as e: return None def png_encode(self, im_name, extra): """Encode watermark using PNG metadata""" try: im = Image.open(im_name) info = PngImagePlugin.PngInfo() info.add_text("TXT", extra) im.save("test.png", pnginfo=info) return "test.png", "Watermark added successfully" except Exception as e: return im_name, f"Error adding watermark: {str(e)}" def encode(self, image_path, watermark_text, metadata=None): """Encode watermark using steganography with encryption""" try: image = cv2.imread(image_path) if image is None: raise ValueError("Could not read image file") image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Prepare watermark data watermark_data = { 'text': watermark_text, 'timestamp': datetime.now().isoformat(), 'metadata': metadata or {} } # Add image hash image_copy = image.copy() & 0xFE # Clear LSB watermark_data['image_hash'] = hashlib.sha256(image_copy.tobytes()).hexdigest() # Encrypt data secret_data = json.dumps(watermark_data) secret_data = f"{secret_data}#####=====" # Add delimiters binary_secret_data = self.to_bin(secret_data) # Calculate capacity n_bytes = image.shape[0] * image.shape[1] * 3 // 8 if len(binary_secret_data) > n_bytes * 8: return image_path, "Watermark is too large for Image Size" # Embed data data_index = 0 for i in range(image.shape[0]): for j in range(image.shape[1]): if data_index < len(binary_secret_data): pixel = image[i, j] for k in range(3): if data_index < len(binary_secret_data): binary_value = format(pixel[k], '08b') binary_value = binary_value[:-1] + binary_secret_data[data_index] image[i, j, k] = int(binary_value, 2) data_index += 1 # Save result output_path = "watermarked_" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".png" cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) return output_path, "Watermark added successfully" except Exception as e: return image_path, f"Error in encoding: {str(e)}" def decode(self, image_path): """Decode watermark with improved error handling""" try: # Try PNG metadata method first try: im = Image.open(image_path) if "TXT" in im.info: return im.info["TXT"] except: pass # Steganography method image = cv2.imread(image_path) if image is None: raise ValueError("Could not read image file") image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Extract binary data binary_data = "" for row in image: for pixel in row: for value in pixel: binary_data += format(value, '08b')[-1] # Convert to bytes with error checking bytes_data = bytearray() for i in range(0, len(binary_data), 8): if i + 8 <= len(binary_data): try: byte = int(binary_data[i:i+8], 2) bytes_data.append(byte) except ValueError: continue # Process the data with robust decoding try: # First try UTF-8 decoding with error handling decoded_data = bytes(bytes_data).decode('utf-8', errors='ignore') # Clean up decoded data decoded_data = ''.join(char for char in decoded_data if ord(char) < 128 or ord(char) > 160) # Find markers end_marker = "=====" delimiter = "#####" if end_marker in decoded_data: parts = decoded_data.split(end_marker) decoded_data = parts[0] # Take the part before ===== if delimiter in decoded_data: parts = decoded_data.split(delimiter) decoded_data = parts[0] # Take the part before ##### # Clean the decoded data decoded_data = decoded_data.strip() # Try to parse as JSON try: watermark_data = json.loads(decoded_data) # Verify image hash image_copy = image.copy() & 0xFE current_hash = hashlib.sha256(image_copy.tobytes()).hexdigest() if current_hash != watermark_data.get('image_hash'): return "Warning: Image has been modified after watermarking" return json.dumps(watermark_data, indent=2, ensure_ascii=False) except json.JSONDecodeError: # If not valid JSON, return cleaned decoded data return decoded_data except UnicodeDecodeError: # Try alternative decoding method try: fallback_decoded = "" current_bytes = bytearray() for byte in bytes_data: current_bytes.append(byte) try: char = current_bytes.decode('utf-8') fallback_decoded += char current_bytes = bytearray() except UnicodeDecodeError: if len(current_bytes) >= 4: # Maximum UTF-8 character length current_bytes = bytearray() continue if fallback_decoded: return fallback_decoded.strip() else: return "Error: Could not decode watermark data" except Exception as e: return f"Error in fallback decoding: {str(e)}" except Exception as e: return f"Error in decoding: {str(e)}" def analyze_quality(self, original_path, watermarked_path): """Analyze watermark quality""" try: original = cv2.imread(original_path) watermarked = cv2.imread(watermarked_path) if original is None or watermarked is None: raise ValueError("Could not read image files") # Calculate PSNR mse = np.mean((original - watermarked) ** 2) if mse == 0: psnr = float('inf') else: psnr = 20 * np.log10(255.0 / np.sqrt(mse)) # Calculate histogram similarity hist_original = cv2.calcHist([original], [0], None, [256], [0, 256]) hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256]) hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL) # Count modified pixels diff = cv2.bitwise_xor(original, watermarked) modified_pixels = np.count_nonzero(diff) report = { 'psnr': round(psnr, 2), 'histogram_similarity': round(hist_correlation, 4), 'modified_pixels': modified_pixels, 'image_size': original.shape, 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100 } return json.dumps(report, indent=2) except Exception as e: return f"Error in quality analysis: {str(e)}"