# utils.py import cv2 import numpy as np from PIL import Image, PngImagePlugin, ImageDraw import json from datetime import datetime from cryptography.fernet import Fernet import base64 import hashlib class WatermarkProcessor: def __init__(self, encryption_key=None): """Initialize with optional encryption key""" if encryption_key: self.fernet = Fernet(encryption_key) else: key = Fernet.generate_key() self.fernet = Fernet(key) def to_bin(self, data): """Convert data to binary format as string""" if isinstance(data, str): return ''.join(format(ord(char), '08b') for char in data) elif isinstance(data, bytes): return ''.join(format(x, '08b') for x in data) elif isinstance(data, np.ndarray): return [format(i, "08b") for i in data] elif isinstance(data, int) or isinstance(data, np.uint8): return format(data, "08b") else: raise TypeError("Type not supported.") def create_preview(self, image_path, watermark_text, opacity=0.3): """Create a preview of watermark on image""" try: image = Image.open(image_path) txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0)) draw = ImageDraw.Draw(txt_layer) # Calculate text position text_width = draw.textlength(watermark_text) text_x = (image.width - text_width) // 2 text_y = image.height // 2 # Add watermark text draw.text((text_x, text_y), watermark_text, fill=(255, 255, 255, int(255 * opacity))) # Combine layers preview = Image.alpha_composite(image.convert('RGBA'), txt_layer) return preview except Exception as e: return None def png_encode(self, im_name, extra): """Encode watermark using PNG metadata""" try: im = Image.open(im_name) info = PngImagePlugin.PngInfo() info.add_text("TXT", extra) im.save("test.png", pnginfo=info) return "test.png", "Watermark added successfully" except Exception as e: return im_name, f"Error adding watermark: {str(e)}" def encode(self, image_path, watermark_text, metadata=None): """Encode watermark using simple LSB steganography""" try: image = cv2.imread(image_path) if image is None: raise ValueError("Could not read image file") # Prepare watermark data data = { 'text': watermark_text, 'timestamp': datetime.now().isoformat(), 'metadata': metadata or {} } # Convert to string and add delimiter secret_data = json.dumps(data, ensure_ascii=False) + "###END###" # Convert string to bytes then to binary binary_secret = ''.join(format(x, '08b') for x in secret_data.encode('utf-8')) # Check capacity if len(binary_secret) > image.shape[0] * image.shape[1] * 3: return image_path, "Error: Image too small for watermark data" # Embed data data_index = 0 for i in range(image.shape[0]): for j in range(image.shape[1]): for k in range(3): if data_index < len(binary_secret): # Clear LSB and add watermark bit pixel = image[i, j, k] pixel = (pixel & 0xFE) | int(binary_secret[data_index]) image[i, j, k] = pixel data_index += 1 else: break if data_index >= len(binary_secret): break if data_index >= len(binary_secret): break # Save result output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" cv2.imwrite(output_path, image) return output_path, "Watermark added successfully" except Exception as e: return image_path, f"Error in encoding: {str(e)}" def decode(self, image_path): """Decode watermark using simple LSB steganography""" try: # Try PNG metadata method first try: im = Image.open(image_path) if "TXT" in im.info: return im.info["TXT"] except: pass # Read image image = cv2.imread(image_path) if image is None: raise ValueError("Could not read image file") # Extract LSB data binary_string = '' for i in range(image.shape[0]): for j in range(image.shape[1]): for k in range(3): binary_string += str(image[i, j, k] & 1) # Convert binary to text chars = [] for i in range(0, len(binary_string), 8): byte = binary_string[i:i+8] if len(byte) == 8: chars.append(chr(int(byte, 2))) # Join all characters text = ''.join(chars) # Find end marker if "###END###" in text: text = text.split("###END###")[0] try: # Try to parse as JSON data = json.loads(text) return json.dumps(data, ensure_ascii=False, indent=2) except json.JSONDecodeError: return text return "Error: No valid watermark found" except Exception as e: return f"Error in decoding: {str(e)}" def analyze_quality(self, original_path, watermarked_path): """Analyze watermark quality""" try: original = cv2.imread(original_path) watermarked = cv2.imread(watermarked_path) if original is None or watermarked is None: raise ValueError("Could not read image files") # Calculate PSNR mse = np.mean((original - watermarked) ** 2) if mse == 0: psnr = float('inf') else: psnr = 20 * np.log10(255.0 / np.sqrt(mse)) # Calculate histogram similarity hist_original = cv2.calcHist([original], [0], None, [256], [0, 256]) hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256]) hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL) # Count modified pixels diff = cv2.bitwise_xor(original, watermarked) modified_pixels = np.count_nonzero(diff) report = { 'psnr': round(psnr, 2), 'histogram_similarity': round(hist_correlation, 4), 'modified_pixels': modified_pixels, 'image_size': original.shape, 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100 } return json.dumps(report, indent=2) except Exception as e: return f"Error in quality analysis: {str(e)}"