Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| from PIL import Image, PngImagePlugin, ImageDraw | |
| import json | |
| from datetime import datetime | |
| from cryptography.fernet import Fernet | |
| import base64 | |
| import hashlib | |
| class WatermarkProcessor: | |
| def __init__(self, encryption_key=None): | |
| """Initialize with optional encryption key""" | |
| if encryption_key: | |
| self.fernet = Fernet(encryption_key) | |
| else: | |
| key = Fernet.generate_key() | |
| self.fernet = Fernet(key) | |
| def to_bin(self, data): | |
| """Convert data to binary format as string""" | |
| if isinstance(data, str): | |
| return ''.join(format(ord(char), '08b') for char in data) | |
| elif isinstance(data, bytes): | |
| return ''.join(format(x, '08b') for x in data) | |
| elif isinstance(data, np.ndarray): | |
| return [format(i, "08b") for i in data] | |
| elif isinstance(data, int) or isinstance(data, np.uint8): | |
| return format(data, "08b") | |
| else: | |
| raise TypeError("Type not supported.") | |
| def create_preview(self, image_path, watermark_text, opacity=0.3): | |
| """Create a preview of watermark on image""" | |
| try: | |
| image = Image.open(image_path) | |
| txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0)) | |
| draw = ImageDraw.Draw(txt_layer) | |
| # Calculate text position | |
| text_width = draw.textlength(watermark_text) | |
| text_x = (image.width - text_width) // 2 | |
| text_y = image.height // 2 | |
| # Add watermark text | |
| draw.text((text_x, text_y), watermark_text, | |
| fill=(255, 255, 255, int(255 * opacity))) | |
| # Combine layers | |
| preview = Image.alpha_composite(image.convert('RGBA'), txt_layer) | |
| return preview | |
| except Exception as e: | |
| return None | |
| def png_encode(self, im_name, extra): | |
| """Encode watermark using PNG metadata""" | |
| try: | |
| im = Image.open(im_name) | |
| info = PngImagePlugin.PngInfo() | |
| info.add_text("TXT", extra) | |
| im.save("test.png", pnginfo=info) | |
| return "test.png", "Watermark added successfully" | |
| except Exception as e: | |
| return im_name, f"Error adding watermark: {str(e)}" | |
| def encode(self, image_path, watermark_text, metadata=None): | |
| """Encode watermark using simple LSB steganography with header | |
| 헤더(32비트)는 watermark 데이터(UTF-8 문자열)의 길이(문자수)를 이진 문자열로 저장합니다. | |
| """ | |
| try: | |
| image = cv2.imread(image_path) | |
| if image is None: | |
| raise ValueError("Could not read image file") | |
| # Prepare watermark data | |
| data = { | |
| 'text': watermark_text, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'metadata': metadata or {} | |
| } | |
| # Convert data to string (UTF-8) | |
| json_str = json.dumps(data, ensure_ascii=False) | |
| # 헤더: 32비트에 데이터 길이(문자수)를 저장 | |
| data_length = len(json_str) | |
| header = format(data_length, '032b') | |
| # 본문: 각 문자를 8비트 이진수로 변환 | |
| body = ''.join(format(ord(char), '08b') for char in json_str) | |
| binary_data = header + body | |
| # Check capacity | |
| if len(binary_data) > image.shape[0] * image.shape[1] * 3: | |
| return image_path, "Error: Image too small for watermark data" | |
| # Embed data into LSB of each pixel | |
| data_index = 0 | |
| for i in range(image.shape[0]): | |
| for j in range(image.shape[1]): | |
| for k in range(3): | |
| if data_index < len(binary_data): | |
| pixel = int(image[i, j, k]) | |
| # Clear the LSB | |
| pixel = pixel & 0xFE | |
| # Set the LSB according to our data bit | |
| pixel = pixel | (int(binary_data[data_index]) & 1) | |
| image[i, j, k] = np.uint8(pixel) | |
| data_index += 1 | |
| else: | |
| break | |
| if data_index >= len(binary_data): | |
| break | |
| if data_index >= len(binary_data): | |
| break | |
| # Save result image (PNG: lossless) | |
| output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
| cv2.imwrite(output_path, image) | |
| return output_path, "Watermark added successfully" | |
| except Exception as e: | |
| return image_path, f"Error in encoding: {str(e)}" | |
| def decode(self, image_path): | |
| """Decode watermark using simple LSB steganography with header | |
| 먼저 32비트를 읽어 watermark 데이터의 길이(문자수)를 구한 후, 해당 길이만큼의 데이터를 추출합니다. | |
| """ | |
| try: | |
| # PNG 메타데이터 먼저 확인 | |
| try: | |
| im = Image.open(image_path) | |
| if "TXT" in im.info: | |
| return im.info["TXT"] | |
| except: | |
| pass | |
| image = cv2.imread(image_path) | |
| if image is None: | |
| raise ValueError("Could not read image file") | |
| # 모든 픽셀의 LSB를 읽어 이진 문자열 생성 | |
| binary_data = '' | |
| for i in range(image.shape[0]): | |
| for j in range(image.shape[1]): | |
| for k in range(3): | |
| binary_data += str(image[i, j, k] & 1) | |
| # 먼저 헤더(32비트)를 읽어 데이터 길이(문자수)를 결정 | |
| header = binary_data[:32] | |
| data_length = int(header, 2) | |
| total_bits = data_length * 8 | |
| # 본문 데이터 읽기 | |
| message_bits = binary_data[32:32 + total_bits] | |
| text = '' | |
| for i in range(0, len(message_bits), 8): | |
| byte = message_bits[i:i+8] | |
| text += chr(int(byte, 2)) | |
| try: | |
| data = json.loads(text) | |
| return json.dumps(data, ensure_ascii=False, indent=2) | |
| except json.JSONDecodeError: | |
| return text | |
| except Exception as e: | |
| return f"Error in decoding: {str(e)}" | |
| def analyze_quality(self, original_path, watermarked_path): | |
| """Analyze watermark quality""" | |
| try: | |
| original = cv2.imread(original_path) | |
| watermarked = cv2.imread(watermarked_path) | |
| if original is None or watermarked is None: | |
| raise ValueError("Could not read image files") | |
| # Calculate PSNR | |
| mse = np.mean((original - watermarked) ** 2) | |
| if mse == 0: | |
| psnr = float('inf') | |
| else: | |
| psnr = 20 * np.log10(255.0 / np.sqrt(mse)) | |
| # Calculate histogram similarity | |
| hist_original = cv2.calcHist([original], [0], None, [256], [0, 256]) | |
| hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256]) | |
| hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL) | |
| # Count modified pixels | |
| diff = cv2.bitwise_xor(original, watermarked) | |
| modified_pixels = np.count_nonzero(diff) | |
| report = { | |
| 'psnr': round(psnr, 2), | |
| 'histogram_similarity': round(hist_correlation, 4), | |
| 'modified_pixels': modified_pixels, | |
| 'image_size': original.shape, | |
| 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100 | |
| } | |
| return json.dumps(report, indent=2) | |
| except Exception as e: | |
| return f"Error in quality analysis: {str(e)}" | |