fantos commited on
Commit
e1f7ceb
·
verified ·
1 Parent(s): 839bec8

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +195 -138
utils.py CHANGED
@@ -1,153 +1,210 @@
 
 
1
  import cv2
2
  import numpy as np
3
- from PIL import Image, PngImagePlugin
 
 
 
 
 
4
 
5
- def png_encode(im_name, extra):
6
- """Encode watermark using PNG metadata"""
7
- try:
8
- im = Image.open(im_name)
9
- info = PngImagePlugin.PngInfo()
10
- info.add_text("TXT", extra)
11
- im.save("test.png", pnginfo=info)
12
- return "test.png", "Watermark added successfully"
13
- except Exception as e:
14
- return im_name, f"Error adding watermark: {str(e)}"
15
 
16
- def to_bin(data):
17
- """Convert data to binary format as string"""
18
- if isinstance(data, str):
19
- return ''.join(format(x, '08b') for x in data.encode('utf-8'))
20
- elif isinstance(data, bytes):
21
- return ''.join(format(x, '08b') for x in data)
22
- elif isinstance(data, np.ndarray):
23
- return [format(i, "08b") for i in data]
24
- elif isinstance(data, int) or isinstance(data, np.uint8):
25
- return format(data, "08b")
26
- else:
27
- raise TypeError("Type not supported.")
28
 
29
- def decode(image_name, txt=None):
30
- """Decode watermark from image"""
31
- try:
32
- # First try PNG metadata method
33
  try:
34
- im = Image.open(image_name)
35
- if "TXT" in im.info:
36
- return im.info["TXT"]
37
- except:
38
- pass
39
-
40
- # Steganography method
41
- image = cv2.imread(image_name)
42
- if image is None:
43
- raise ValueError("Could not read image file")
44
-
45
- image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
46
- binary_data = ""
47
-
48
- # Extract binary data from image
49
- for row in image:
50
- for pixel in row:
51
- r, g, b = to_bin(pixel)
52
- binary_data += r[-1]
53
- binary_data += g[-1]
54
- binary_data += b[-1]
55
 
56
- # Convert binary string to bytes
57
- bytes_data = b''
58
- for i in range(0, len(binary_data), 8):
59
- byte = binary_data[i:i+8]
60
- if len(byte) == 8:
61
- try:
62
- byte_val = int(byte, 2)
63
- bytes_data += bytes([byte_val])
64
- except ValueError:
65
- continue
66
 
67
- # Try to find the end marker in raw bytes
 
68
  try:
69
- end_marker = b'====='
70
- if end_marker in bytes_data:
71
- bytes_data = bytes_data.split(end_marker)[0]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
 
73
- # Try to find the delimiter in raw bytes
74
- delimiter = b'#####'
75
- if delimiter in bytes_data:
76
- bytes_data = bytes_data.split(delimiter)[0]
77
 
78
- # Decode the bytes to string
79
- decoded_text = bytes_data.decode('utf-8', errors='ignore')
80
- return decoded_text.strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
  except Exception as e:
83
- print(f"Decoding error: {e}")
84
- # If regular decoding fails, try character by character
85
- result = ""
86
- current_bytes = bytearray()
87
-
88
- for byte in bytes_data:
89
- current_bytes.append(byte)
90
- try:
91
- char = current_bytes.decode('utf-8')
92
- result += char
93
- current_bytes = bytearray()
94
- except UnicodeDecodeError:
95
- continue
96
-
97
- if result:
98
- return result.strip()
99
- return "Error: Could not decode watermark"
100
 
101
- except Exception as e:
102
- return f"Error detecting watermark: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
 
104
- def encode(image_name, secret_data, txt=None):
105
- """Encode watermark using steganography"""
106
- try:
107
- image = cv2.imread(image_name)
108
- if image is None:
109
- raise ValueError("Could not read image file")
110
-
111
- image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
112
-
113
- # Calculate maximum bytes that can be encoded
114
- n_bytes = image.shape[0] * image.shape[1] * 3 // 8
115
-
116
- # Prepare the data with delimiters
117
- secret_data = str(secret_data)
118
- complete_data = secret_data + "#####" + "====="
119
-
120
- # Convert to binary
121
- binary_secret_data = to_bin(complete_data)
122
-
123
- # Check if the data can fit in the image
124
- if len(binary_secret_data) > n_bytes * 8:
125
- return image_name, "Watermark is too large for Image Size"
126
-
127
- data_index = 0
128
- binary_len = len(binary_secret_data)
129
-
130
- # Embed the data
131
- for i in range(image.shape[0]):
132
- for j in range(image.shape[1]):
133
- if data_index < binary_len:
134
- pixel = image[i, j]
135
- for color_channel in range(3):
136
- if data_index < binary_len:
137
- # Get the binary value of the pixel
138
- binary_value = format(pixel[color_channel], '08b')
139
- # Replace the least significant bit
140
- binary_value = binary_value[:-1] + binary_secret_data[data_index]
141
- # Update the pixel value
142
- image[i, j, color_channel] = int(binary_value, 2)
143
- data_index += 1
144
- else:
145
- break
146
-
147
- # Save the result
148
- output_path = "watermarked_image.png"
149
- cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
150
- return output_path, "Watermark embedded successfully"
151
-
152
- except Exception as e:
153
- return image_name, f"Error encoding watermark: {str(e)}"
 
1
+ # utils.py
2
+
3
  import cv2
4
  import numpy as np
5
+ from PIL import Image, PngImagePlugin, ImageDraw
6
+ import json
7
+ from datetime import datetime
8
+ from cryptography.fernet import Fernet
9
+ import base64
10
+ import hashlib
11
 
12
+ class WatermarkProcessor:
13
+ def __init__(self, encryption_key=None):
14
+ """Initialize with optional encryption key"""
15
+ if encryption_key:
16
+ self.fernet = Fernet(encryption_key)
17
+ else:
18
+ key = Fernet.generate_key()
19
+ self.fernet = Fernet(key)
 
 
20
 
21
+ def to_bin(self, data):
22
+ """Convert data to binary format as string"""
23
+ if isinstance(data, str):
24
+ return ''.join(format(x, '08b') for x in data.encode('utf-8'))
25
+ elif isinstance(data, bytes):
26
+ return ''.join(format(x, '08b') for x in data)
27
+ elif isinstance(data, np.ndarray):
28
+ return [format(i, "08b") for i in data]
29
+ elif isinstance(data, int) or isinstance(data, np.uint8):
30
+ return format(data, "08b")
31
+ else:
32
+ raise TypeError("Type not supported.")
33
 
34
+ def create_preview(self, image_path, watermark_text, opacity=0.3):
35
+ """Create a preview of watermark on image"""
 
 
36
  try:
37
+ image = Image.open(image_path)
38
+ txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0))
39
+ draw = ImageDraw.Draw(txt_layer)
40
+
41
+ # Calculate text position
42
+ text_width = draw.textlength(watermark_text)
43
+ text_x = (image.width - text_width) // 2
44
+ text_y = image.height // 2
45
+
46
+ # Add watermark text
47
+ draw.text((text_x, text_y), watermark_text,
48
+ fill=(255, 255, 255, int(255 * opacity)))
49
+
50
+ # Combine layers
51
+ preview = Image.alpha_composite(image.convert('RGBA'), txt_layer)
52
+ return preview
53
+ except Exception as e:
54
+ return None
 
 
 
55
 
56
+ def png_encode(self, im_name, extra):
57
+ """Encode watermark using PNG metadata"""
58
+ try:
59
+ im = Image.open(im_name)
60
+ info = PngImagePlugin.PngInfo()
61
+ info.add_text("TXT", extra)
62
+ im.save("test.png", pnginfo=info)
63
+ return "test.png", "Watermark added successfully"
64
+ except Exception as e:
65
+ return im_name, f"Error adding watermark: {str(e)}"
66
 
67
+ def encode(self, image_path, watermark_text, metadata=None):
68
+ """Encode watermark using steganography with encryption"""
69
  try:
70
+ image = cv2.imread(image_path)
71
+ if image is None:
72
+ raise ValueError("Could not read image file")
73
+
74
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
75
+
76
+ # Prepare watermark data
77
+ watermark_data = {
78
+ 'text': watermark_text,
79
+ 'timestamp': datetime.now().isoformat(),
80
+ 'metadata': metadata or {}
81
+ }
82
+
83
+ # Add image hash
84
+ image_copy = image.copy() & 0xFE # Clear LSB
85
+ watermark_data['image_hash'] = hashlib.sha256(image_copy.tobytes()).hexdigest()
86
+
87
+ # Encrypt data
88
+ secret_data = json.dumps(watermark_data)
89
+ secret_data = f"{secret_data}#####=====" # Add delimiters
90
+ binary_secret_data = self.to_bin(secret_data)
91
 
92
+ # Calculate capacity
93
+ n_bytes = image.shape[0] * image.shape[1] * 3 // 8
94
+ if len(binary_secret_data) > n_bytes * 8:
95
+ return image_path, "Watermark is too large for Image Size"
96
 
97
+ # Embed data
98
+ data_index = 0
99
+ for i in range(image.shape[0]):
100
+ for j in range(image.shape[1]):
101
+ if data_index < len(binary_secret_data):
102
+ pixel = image[i, j]
103
+ for k in range(3):
104
+ if data_index < len(binary_secret_data):
105
+ binary_value = format(pixel[k], '08b')
106
+ binary_value = binary_value[:-1] + binary_secret_data[data_index]
107
+ image[i, j, k] = int(binary_value, 2)
108
+ data_index += 1
109
+
110
+ # Save result
111
+ output_path = "watermarked_" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".png"
112
+ cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
113
+ return output_path, "Watermark added successfully"
114
 
115
  except Exception as e:
116
+ return image_path, f"Error in encoding: {str(e)}"
117
+
118
+ def decode(self, image_path):
119
+ """Decode watermark with decryption"""
120
+ try:
121
+ # Try PNG metadata method first
122
+ try:
123
+ im = Image.open(image_path)
124
+ if "TXT" in im.info:
125
+ return im.info["TXT"]
126
+ except:
127
+ pass
 
 
 
 
 
128
 
129
+ # Steganography method
130
+ image = cv2.imread(image_path)
131
+ if image is None:
132
+ raise ValueError("Could not read image file")
133
+
134
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
135
+
136
+ # Extract binary data
137
+ binary_data = ""
138
+ for row in image:
139
+ for pixel in row:
140
+ for value in pixel:
141
+ binary_data += format(value, '08b')[-1]
142
+
143
+ # Convert to bytes
144
+ bytes_data = bytearray()
145
+ for i in range(0, len(binary_data), 8):
146
+ if i + 8 <= len(binary_data):
147
+ byte = int(binary_data[i:i+8], 2)
148
+ bytes_data.append(byte)
149
+
150
+ # Process the data
151
+ decoded_data = bytes(bytes_data).decode('utf-8')
152
+ if "=====" in decoded_data:
153
+ decoded_data = decoded_data.split("=====")[0]
154
+ if "#####" in decoded_data:
155
+ decoded_data = decoded_data.split("#####")[0]
156
+
157
+ try:
158
+ # Parse JSON data
159
+ watermark_data = json.loads(decoded_data)
160
+ # Verify image hash
161
+ image_copy = image.copy() & 0xFE
162
+ current_hash = hashlib.sha256(image_copy.tobytes()).hexdigest()
163
+
164
+ if current_hash != watermark_data.get('image_hash'):
165
+ return "Warning: Image has been modified after watermarking"
166
+
167
+ return json.dumps(watermark_data, indent=2, ensure_ascii=False)
168
+ except:
169
+ return decoded_data
170
+
171
+ except Exception as e:
172
+ return f"Error in decoding: {str(e)}"
173
 
174
+ def analyze_quality(self, original_path, watermarked_path):
175
+ """Analyze watermark quality"""
176
+ try:
177
+ original = cv2.imread(original_path)
178
+ watermarked = cv2.imread(watermarked_path)
179
+
180
+ if original is None or watermarked is None:
181
+ raise ValueError("Could not read image files")
182
+
183
+ # Calculate PSNR
184
+ mse = np.mean((original - watermarked) ** 2)
185
+ if mse == 0:
186
+ psnr = float('inf')
187
+ else:
188
+ psnr = 20 * np.log10(255.0 / np.sqrt(mse))
189
+
190
+ # Calculate histogram similarity
191
+ hist_original = cv2.calcHist([original], [0], None, [256], [0, 256])
192
+ hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256])
193
+ hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL)
194
+
195
+ # Count modified pixels
196
+ diff = cv2.bitwise_xor(original, watermarked)
197
+ modified_pixels = np.count_nonzero(diff)
198
+
199
+ report = {
200
+ 'psnr': round(psnr, 2),
201
+ 'histogram_similarity': round(hist_correlation, 4),
202
+ 'modified_pixels': modified_pixels,
203
+ 'image_size': original.shape,
204
+ 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100
205
+ }
206
+
207
+ return json.dumps(report, indent=2)
208
+
209
+ except Exception as e:
210
+ return f"Error in quality analysis: {str(e)}"