# utils.py import cv2 import numpy as np from PIL import Image, PngImagePlugin, ImageDraw import json from datetime import datetime from cryptography.fernet import Fernet import base64 import hashlib class WatermarkProcessor: def __init__(self, encryption_key=None): """Initialize with optional encryption key""" if encryption_key: self.fernet = Fernet(encryption_key) else: key = Fernet.generate_key() self.fernet = Fernet(key) def to_bin(self, data): """Convert data to binary format as string""" if isinstance(data, str): return ''.join(format(ord(char), '08b') for char in data) elif isinstance(data, bytes): return ''.join(format(x, '08b') for x in data) elif isinstance(data, np.ndarray): return [format(i, "08b") for i in data] elif isinstance(data, int) or isinstance(data, np.uint8): return format(data, "08b") else: raise TypeError("Type not supported.") def create_preview(self, image_path, watermark_text, opacity=0.3): """Create a preview of watermark on image""" try: image = Image.open(image_path) txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0)) draw = ImageDraw.Draw(txt_layer) # Calculate text position text_width = draw.textlength(watermark_text) text_x = (image.width - text_width) // 2 text_y = image.height // 2 # Add watermark text draw.text((text_x, text_y), watermark_text, fill=(255, 255, 255, int(255 * opacity))) # Combine layers preview = Image.alpha_composite(image.convert('RGBA'), txt_layer) return preview except Exception as e: return None def png_encode(self, im_name, extra): """Encode watermark using PNG metadata""" try: im = Image.open(im_name) info = PngImagePlugin.PngInfo() info.add_text("TXT", extra) im.save("test.png", pnginfo=info) return "test.png", "Watermark added successfully" except Exception as e: return im_name, f"Error adding watermark: {str(e)}" def encode(self, image_path, watermark_text, metadata=None): """Encode watermark using simple LSB steganography""" try: image = cv2.imread(image_path) if image is None: raise ValueError("Could not read image file") # Prepare watermark data data = { 'text': watermark_text, 'timestamp': datetime.now().isoformat(), 'metadata': metadata or {} } # Convert to string and add delimiter secret_data = json.dumps(data, ensure_ascii=False) + "###END###" # Convert to binary string binary_secret = ''.join(format(ord(char), '08b') for char in secret_data) # Check capacity if len(binary_secret) > image.shape[0] * image.shape[1] * 3: return image_path, "Error: Image too small for watermark data" # Embed data data_index = 0 for i in range(image.shape[0]): for j in range(image.shape[1]): for k in range(3): if data_index < len(binary_secret): # Get current pixel value pixel = image[i, j, k] # Clear the LSB by setting it to 0 and add the watermark bit pixel = (pixel & 0xFE) | (int(binary_secret[data_index]) & 0x01) # Ensure value stays within uint8 range image[i, j, k] = np.uint8(pixel) data_index += 1 else: break if data_index >= len(binary_secret): break if data_index >= len(binary_secret): break # Save result output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" cv2.imwrite(output_path, image) return output_path, "Watermark added successfully" except Exception as e: return image_path, f"Error in encoding: {str(e)}" def decode(self, image_path): """Decode watermark using simple LSB steganography""" try: # Try PNG metadata method first try: im = Image.open(image_path) if "TXT" in im.info: return im.info["TXT"] except: pass # Read image image = cv2.imread(image_path) if image is None: raise ValueError("Could not read image file") # Extract LSB data binary_text = '' # Get LSB from each byte of each pixel for i in range(image.shape[0]): for j in range(image.shape[1]): for k in range(3): bit = image[i, j, k] & 1 binary_text += str(bit) # Convert binary to text text = '' # Process 8 bits at a time for i in range(0, len(binary_text)-8, 8): byte = binary_text[i:i+8] text += chr(int(byte, 2)) # Check for end marker if text[-7:] == "###END###": extracted_text = text[:-7] try: # Attempt to parse as JSON data = json.loads(extracted_text) return json.dumps(data, ensure_ascii=False, indent=2) except: # If not valid JSON, return as plain text return extracted_text.strip() # If we got here, try to find any valid JSON in the text try: # Convert remaining bits for i in range(0, len(binary_text), 8): if i + 8 <= len(binary_text): byte = binary_text[i:i+8] text += chr(int(byte, 2)) # Look for JSON markers start = text.find('{') end = text.rfind('}') if start != -1 and end != -1: json_text = text[start:end+1] try: data = json.loads(json_text) return json.dumps(data, ensure_ascii=False, indent=2) except: pass except: pass return text if text.strip() else "Error: No valid watermark found" except Exception as e: return f"Error in decoding: {str(e)}" def analyze_quality(self, original_path, watermarked_path): """Analyze watermark quality""" try: original = cv2.imread(original_path) watermarked = cv2.imread(watermarked_path) if original is None or watermarked is None: raise ValueError("Could not read image files") # Calculate PSNR mse = np.mean((original - watermarked) ** 2) if mse == 0: psnr = float('inf') else: psnr = 20 * np.log10(255.0 / np.sqrt(mse)) # Calculate histogram similarity hist_original = cv2.calcHist([original], [0], None, [256], [0, 256]) hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256]) hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL) # Count modified pixels diff = cv2.bitwise_xor(original, watermarked) modified_pixels = np.count_nonzero(diff) report = { 'psnr': round(psnr, 2), 'histogram_similarity': round(hist_correlation, 4), 'modified_pixels': modified_pixels, 'image_size': original.shape, 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100 } return json.dumps(report, indent=2) except Exception as e: return f"Error in quality analysis: {str(e)}"