# utils.py import cv2 import numpy as np from PIL import Image, PngImagePlugin, ImageDraw import json from datetime import datetime from cryptography.fernet import Fernet import base64 import hashlib class WatermarkProcessor: def __init__(self, encryption_key=None): """Initialize with optional encryption key""" if encryption_key: self.fernet = Fernet(encryption_key) else: key = Fernet.generate_key() self.fernet = Fernet(key) def to_bin(self, data): """Convert data to binary format as string""" if isinstance(data, str): return ''.join(format(x, '08b') for x in data.encode('utf-8')) elif isinstance(data, bytes): return ''.join(format(x, '08b') for x in data) elif isinstance(data, np.ndarray): return [format(i, "08b") for i in data] elif isinstance(data, int) or isinstance(data, np.uint8): return format(data, "08b") else: raise TypeError("Type not supported.") def create_preview(self, image_path, watermark_text, opacity=0.3): """Create a preview of watermark on image""" try: image = Image.open(image_path) txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0)) draw = ImageDraw.Draw(txt_layer) # Calculate text position text_width = draw.textlength(watermark_text) text_x = (image.width - text_width) // 2 text_y = image.height // 2 # Add watermark text draw.text((text_x, text_y), watermark_text, fill=(255, 255, 255, int(255 * opacity))) # Combine layers preview = Image.alpha_composite(image.convert('RGBA'), txt_layer) return preview except Exception as e: return None def png_encode(self, im_name, extra): """Encode watermark using PNG metadata""" try: im = Image.open(im_name) info = PngImagePlugin.PngInfo() info.add_text("TXT", extra) im.save("test.png", pnginfo=info) return "test.png", "Watermark added successfully" except Exception as e: return im_name, f"Error adding watermark: {str(e)}" # utils.py의 encode와 decode 메서드만 수정 def encode(self, image_path, watermark_text, metadata=None): """Encode watermark using steganography""" try: # 이미지 읽기 image = cv2.imread(image_path) if image is None: raise ValueError("Could not read image file") # 워터마크 데이터 준비 data = { 'text': watermark_text, 'timestamp': datetime.now().isoformat(), 'metadata': metadata or {} } # JSON으로 변환 json_data = json.dumps(data, ensure_ascii=False) secret_data = json_data + "#####" # 구분자 추가 # 바이트로 변환 byte_data = secret_data.encode('utf-8') # 이미지 용량 확인 max_bytes = (image.shape[0] * image.shape[1] * 3) // 8 if len(byte_data) > max_bytes: raise ValueError("Watermark data is too large for this image") # 바이너리 데이터로 변환 binary_data = ''.join(format(byte, '08b') for byte in byte_data) data_index = 0 # 데이터 임베딩 for i in range(image.shape[0]): for j in range(image.shape[1]): for k in range(3): # RGB channels if data_index < len(binary_data): # LSB 수정 image[i, j, k] = (image[i, j, k] & 0xFE) | int(binary_data[data_index]) data_index += 1 # 결과 저장 output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" cv2.imwrite(output_path, image) return output_path, "Watermark added successfully" except Exception as e: return image_path, f"Error in encoding: {str(e)}" def decode(self, image_path): """Decode watermark from image""" try: # PNG metadata method first try: im = Image.open(image_path) if "TXT" in im.info: return im.info["TXT"] except: pass # 이미지 읽기 image = cv2.imread(image_path) if image is None: raise ValueError("Could not read image file") # LSB 추출 binary_data = '' for i in range(image.shape[0]): for j in range(image.shape[1]): for k in range(3): binary_data += str(image[i, j, k] & 1) # 8비트씩 잘라서 바이트로 변환 bytes_data = bytearray() for i in range(0, len(binary_data), 8): if i + 8 <= len(binary_data): byte = int(binary_data[i:i+8], 2) bytes_data.append(byte) # 구분자 체크 (#####) if bytes_data[-5:] == b'#####': bytes_data = bytes_data[:-5] # 구분자 제거 break try: # UTF-8로 디코딩 decoded_data = bytes_data.decode('utf-8') # JSON 파싱 watermark_data = json.loads(decoded_data) return json.dumps(watermark_data, ensure_ascii=False, indent=2) except UnicodeDecodeError: return "Error: Invalid character encoding" except json.JSONDecodeError: return decoded_data except Exception as e: return f"Error in decoding: {str(e)}" def analyze_quality(self, original_path, watermarked_path): """Analyze watermark quality""" try: original = cv2.imread(original_path) watermarked = cv2.imread(watermarked_path) if original is None or watermarked is None: raise ValueError("Could not read image files") # Calculate PSNR mse = np.mean((original - watermarked) ** 2) if mse == 0: psnr = float('inf') else: psnr = 20 * np.log10(255.0 / np.sqrt(mse)) # Calculate histogram similarity hist_original = cv2.calcHist([original], [0], None, [256], [0, 256]) hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256]) hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL) # Count modified pixels diff = cv2.bitwise_xor(original, watermarked) modified_pixels = np.count_nonzero(diff) report = { 'psnr': round(psnr, 2), 'histogram_similarity': round(hist_correlation, 4), 'modified_pixels': modified_pixels, 'image_size': original.shape, 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100 } return json.dumps(report, indent=2) except Exception as e: return f"Error in quality analysis: {str(e)}"