SecureWatermark / utils.py
fantos's picture
Update utils.py
310b952 verified
raw
history blame
7.72 kB
# utils.py
import cv2
import numpy as np
from PIL import Image, PngImagePlugin, ImageDraw
import json
from datetime import datetime
from cryptography.fernet import Fernet
import base64
import hashlib
class WatermarkProcessor:
def __init__(self, encryption_key=None):
"""Initialize with optional encryption key"""
if encryption_key:
self.fernet = Fernet(encryption_key)
else:
key = Fernet.generate_key()
self.fernet = Fernet(key)
def to_bin(self, data):
"""Convert data to binary format as string"""
if isinstance(data, str):
return ''.join(format(x, '08b') for x in data.encode('utf-8'))
elif isinstance(data, bytes):
return ''.join(format(x, '08b') for x in data)
elif isinstance(data, np.ndarray):
return [format(i, "08b") for i in data]
elif isinstance(data, int) or isinstance(data, np.uint8):
return format(data, "08b")
else:
raise TypeError("Type not supported.")
def create_preview(self, image_path, watermark_text, opacity=0.3):
"""Create a preview of watermark on image"""
try:
image = Image.open(image_path)
txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0))
draw = ImageDraw.Draw(txt_layer)
# Calculate text position
text_width = draw.textlength(watermark_text)
text_x = (image.width - text_width) // 2
text_y = image.height // 2
# Add watermark text
draw.text((text_x, text_y), watermark_text,
fill=(255, 255, 255, int(255 * opacity)))
# Combine layers
preview = Image.alpha_composite(image.convert('RGBA'), txt_layer)
return preview
except Exception as e:
return None
def png_encode(self, im_name, extra):
"""Encode watermark using PNG metadata"""
try:
im = Image.open(im_name)
info = PngImagePlugin.PngInfo()
info.add_text("TXT", extra)
im.save("test.png", pnginfo=info)
return "test.png", "Watermark added successfully"
except Exception as e:
return im_name, f"Error adding watermark: {str(e)}"
# utils.py์˜ encode์™€ decode ๋ฉ”์„œ๋“œ๋งŒ ์ˆ˜์ •
def encode(self, image_path, watermark_text, metadata=None):
"""Encode watermark using steganography"""
try:
# ์ด๋ฏธ์ง€ ์ฝ๊ธฐ
image = cv2.imread(image_path)
if image is None:
raise ValueError("Could not read image file")
# ์›Œํ„ฐ๋งˆํฌ ๋ฐ์ดํ„ฐ ์ค€๋น„
data = {
'text': watermark_text,
'timestamp': datetime.now().isoformat(),
'metadata': metadata or {}
}
# JSON์œผ๋กœ ๋ณ€ํ™˜
json_data = json.dumps(data, ensure_ascii=False)
secret_data = json_data + "#####" # ๊ตฌ๋ถ„์ž ์ถ”๊ฐ€
# ๋ฐ”์ดํŠธ๋กœ ๋ณ€ํ™˜
byte_data = secret_data.encode('utf-8')
# ์ด๋ฏธ์ง€ ์šฉ๋Ÿ‰ ํ™•์ธ
max_bytes = (image.shape[0] * image.shape[1] * 3) // 8
if len(byte_data) > max_bytes:
raise ValueError("Watermark data is too large for this image")
# ๋ฐ”์ด๋„ˆ๋ฆฌ ๋ฐ์ดํ„ฐ๋กœ ๋ณ€ํ™˜
binary_data = ''.join(format(byte, '08b') for byte in byte_data)
data_index = 0
# ๋ฐ์ดํ„ฐ ์ž„๋ฒ ๋”ฉ
for i in range(image.shape[0]):
for j in range(image.shape[1]):
for k in range(3): # RGB channels
if data_index < len(binary_data):
# LSB ์ˆ˜์ •
image[i, j, k] = (image[i, j, k] & 0xFE) | int(binary_data[data_index])
data_index += 1
# ๊ฒฐ๊ณผ ์ €์žฅ
output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
cv2.imwrite(output_path, image)
return output_path, "Watermark added successfully"
except Exception as e:
return image_path, f"Error in encoding: {str(e)}"
def decode(self, image_path):
"""Decode watermark from image"""
try:
# PNG metadata method first
try:
im = Image.open(image_path)
if "TXT" in im.info:
return im.info["TXT"]
except:
pass
# ์ด๋ฏธ์ง€ ์ฝ๊ธฐ
image = cv2.imread(image_path)
if image is None:
raise ValueError("Could not read image file")
# LSB ์ถ”์ถœ
binary_data = ''
for i in range(image.shape[0]):
for j in range(image.shape[1]):
for k in range(3):
binary_data += str(image[i, j, k] & 1)
# 8๋น„ํŠธ์”ฉ ์ž˜๋ผ์„œ ๋ฐ”์ดํŠธ๋กœ ๋ณ€ํ™˜
bytes_data = bytearray()
for i in range(0, len(binary_data), 8):
if i + 8 <= len(binary_data):
byte = int(binary_data[i:i+8], 2)
bytes_data.append(byte)
# ๊ตฌ๋ถ„์ž ์ฒดํฌ (#####)
if bytes_data[-5:] == b'#####':
bytes_data = bytes_data[:-5] # ๊ตฌ๋ถ„์ž ์ œ๊ฑฐ
break
try:
# UTF-8๋กœ ๋””์ฝ”๋”ฉ
decoded_data = bytes_data.decode('utf-8')
# JSON ํŒŒ์‹ฑ
watermark_data = json.loads(decoded_data)
return json.dumps(watermark_data, ensure_ascii=False, indent=2)
except UnicodeDecodeError:
return "Error: Invalid character encoding"
except json.JSONDecodeError:
return decoded_data
except Exception as e:
return f"Error in decoding: {str(e)}"
def analyze_quality(self, original_path, watermarked_path):
"""Analyze watermark quality"""
try:
original = cv2.imread(original_path)
watermarked = cv2.imread(watermarked_path)
if original is None or watermarked is None:
raise ValueError("Could not read image files")
# Calculate PSNR
mse = np.mean((original - watermarked) ** 2)
if mse == 0:
psnr = float('inf')
else:
psnr = 20 * np.log10(255.0 / np.sqrt(mse))
# Calculate histogram similarity
hist_original = cv2.calcHist([original], [0], None, [256], [0, 256])
hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256])
hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL)
# Count modified pixels
diff = cv2.bitwise_xor(original, watermarked)
modified_pixels = np.count_nonzero(diff)
report = {
'psnr': round(psnr, 2),
'histogram_similarity': round(hist_correlation, 4),
'modified_pixels': modified_pixels,
'image_size': original.shape,
'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100
}
return json.dumps(report, indent=2)
except Exception as e:
return f"Error in quality analysis: {str(e)}"