fantos commited on
Commit
a733f5b
·
verified ·
1 Parent(s): 2a90940

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +138 -196
utils.py CHANGED
@@ -1,211 +1,153 @@
1
- # utils.py
2
-
3
  import cv2
4
  import numpy as np
5
- from PIL import Image, PngImagePlugin, ImageDraw
6
- import json
7
- from datetime import datetime
8
- from cryptography.fernet import Fernet
9
- import base64
10
- import hashlib
11
 
12
- class WatermarkProcessor:
13
- def __init__(self, encryption_key=None):
14
- """Initialize with optional encryption key"""
15
- if encryption_key:
16
- self.fernet = Fernet(encryption_key)
17
- else:
18
- key = Fernet.generate_key()
19
- self.fernet = Fernet(key)
 
 
20
 
21
- def to_bin(self, data):
22
- """Convert data to binary format as string"""
23
- if isinstance(data, str):
24
- return ''.join(format(x, '08b') for x in data.encode('utf-8'))
25
- elif isinstance(data, bytes):
26
- return ''.join(format(x, '08b') for x in data)
27
- elif isinstance(data, np.ndarray):
28
- return [format(i, "08b") for i in data]
29
- elif isinstance(data, int) or isinstance(data, np.uint8):
30
- return format(data, "08b")
31
- else:
32
- raise TypeError("Type not supported.")
33
 
34
- def create_preview(self, image_path, watermark_text, opacity=0.3):
35
- """Create a preview of watermark on image"""
 
 
36
  try:
37
- image = Image.open(image_path)
38
- txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0))
39
- draw = ImageDraw.Draw(txt_layer)
40
-
41
- # Calculate text position
42
- text_width = draw.textlength(watermark_text)
43
- text_x = (image.width - text_width) // 2
44
- text_y = image.height // 2
45
-
46
- # Add watermark text
47
- draw.text((text_x, text_y), watermark_text,
48
- fill=(255, 255, 255, int(255 * opacity)))
49
-
50
- # Combine layers
51
- preview = Image.alpha_composite(image.convert('RGBA'), txt_layer)
52
- return preview
53
- except Exception as e:
54
- return None
55
 
56
- def png_encode(self, im_name, extra):
57
- """Encode watermark using PNG metadata"""
58
- try:
59
- im = Image.open(im_name)
60
- info = PngImagePlugin.PngInfo()
61
- info.add_text("TXT", extra)
62
- im.save("test.png", pnginfo=info)
63
- return "test.png", "Watermark added successfully"
64
- except Exception as e:
65
- return im_name, f"Error adding watermark: {str(e)}"
 
 
 
 
 
66
 
67
- def encode(self, image_path, watermark_text, metadata=None):
68
- """Encode watermark using steganography with encryption"""
69
- try:
70
- image = cv2.imread(image_path)
71
- if image is None:
72
- raise ValueError("Could not read image file")
73
-
74
- image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
75
-
76
- # Prepare watermark data
77
- watermark_data = {
78
- 'text': watermark_text,
79
- 'timestamp': datetime.now().isoformat(),
80
- 'metadata': metadata or {}
81
- }
82
-
83
- # Add image hash
84
- image_copy = image.copy() & 0xFE # Clear LSB
85
- watermark_data['image_hash'] = hashlib.sha256(image_copy.tobytes()).hexdigest()
86
-
87
- # Encrypt data
88
- secret_data = json.dumps(watermark_data)
89
- secret_data = f"{secret_data}#####=====" # Add delimiters
90
- binary_secret_data = self.to_bin(secret_data)
91
-
92
- # Calculate capacity
93
- n_bytes = image.shape[0] * image.shape[1] * 3 // 8
94
- if len(binary_secret_data) > n_bytes * 8:
95
- return image_path, "Watermark is too large for Image Size"
96
-
97
- # Embed data
98
- data_index = 0
99
- for i in range(image.shape[0]):
100
- for j in range(image.shape[1]):
101
- if data_index < len(binary_secret_data):
102
- pixel = image[i, j]
103
- for k in range(3):
104
- if data_index < len(binary_secret_data):
105
- binary_value = format(pixel[k], '08b')
106
- binary_value = binary_value[:-1] + binary_secret_data[data_index]
107
- image[i, j, k] = int(binary_value, 2)
108
- data_index += 1
109
-
110
- # Save result
111
- output_path = "watermarked_" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".png"
112
- cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
113
- return output_path, "Watermark added successfully"
114
-
115
- except Exception as e:
116
- return image_path, f"Error in encoding: {str(e)}"
117
 
118
- def decode(self, image_path):
119
- """Decode watermark with decryption"""
120
  try:
121
- # Try PNG metadata method first
122
- try:
123
- im = Image.open(image_path)
124
- if "TXT" in im.info:
125
- return im.info["TXT"]
126
- except:
127
- pass
128
-
129
- # Steganography method
130
- image = cv2.imread(image_path)
131
- if image is None:
132
- raise ValueError("Could not read image file")
133
-
134
- image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
135
-
136
- # Extract binary data
137
- binary_data = ""
138
- for row in image:
139
- for pixel in row:
140
- for value in pixel:
141
- binary_data += format(value, '08b')[-1]
142
 
143
- # Convert to bytes
144
- bytes_data = bytearray()
145
- for i in range(0, len(binary_data), 8):
146
- if i + 8 <= len(binary_data):
147
- byte = int(binary_data[i:i+8], 2)
148
- bytes_data.append(byte)
149
 
150
- # Process the data
151
- decoded_data = bytes(bytes_data).decode('utf-8')
152
- if "=====" in decoded_data:
153
- decoded_data = decoded_data.split("=====")[0]
154
- if "#####" in decoded_data:
155
- decoded_data = decoded_data.split("#####")[0]
156
-
157
- try:
158
- # Parse JSON data
159
- watermark_data = json.loads(decoded_data)
160
- # Verify image hash
161
- image_copy = image.copy() & 0xFE
162
- current_hash = hashlib.sha256(image_copy.tobytes()).hexdigest()
163
-
164
- if current_hash != watermark_data.get('image_hash'):
165
- return "Warning: Image has been modified after watermarking"
166
-
167
- return json.dumps(watermark_data, indent=2, ensure_ascii=False)
168
- except:
169
- return decoded_data
170
 
171
  except Exception as e:
172
- return f"Error in decoding: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
173
 
174
- def analyze_quality(self, original_path, watermarked_path):
175
- """Analyze watermark quality"""
176
- try:
177
- original = cv2.imread(original_path)
178
- watermarked = cv2.imread(watermarked_path)
179
-
180
- if original is None or watermarked is None:
181
- raise ValueError("Could not read image files")
182
-
183
- # Calculate PSNR
184
- mse = np.mean((original - watermarked) ** 2)
185
- if mse == 0:
186
- psnr = float('inf')
187
- else:
188
- psnr = 20 * np.log10(255.0 / np.sqrt(mse))
189
-
190
- # Calculate histogram similarity
191
- hist_original = cv2.calcHist([original], [0], None, [256], [0, 256])
192
- hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256])
193
- hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL)
194
-
195
- # Count modified pixels
196
- diff = cv2.bitwise_xor(original, watermarked)
197
- modified_pixels = np.count_nonzero(diff)
198
-
199
- report = {
200
- 'psnr': round(psnr, 2),
201
- 'histogram_similarity': round(hist_correlation, 4),
202
- 'modified_pixels': modified_pixels,
203
- 'image_size': original.shape,
204
- 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100
205
- }
206
-
207
- return json.dumps(report, indent=2)
208
-
209
- except Exception as e:
210
- return f"Error in quality analysis: {str(e)}"
211
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import cv2
2
  import numpy as np
3
+ from PIL import Image, PngImagePlugin
 
 
 
 
 
4
 
5
+ def png_encode(im_name, extra):
6
+ """Encode watermark using PNG metadata"""
7
+ try:
8
+ im = Image.open(im_name)
9
+ info = PngImagePlugin.PngInfo()
10
+ info.add_text("TXT", extra)
11
+ im.save("test.png", pnginfo=info)
12
+ return "test.png", "Watermark added successfully"
13
+ except Exception as e:
14
+ return im_name, f"Error adding watermark: {str(e)}"
15
 
16
+ def to_bin(data):
17
+ """Convert data to binary format as string"""
18
+ if isinstance(data, str):
19
+ return ''.join(format(x, '08b') for x in data.encode('utf-8'))
20
+ elif isinstance(data, bytes):
21
+ return ''.join(format(x, '08b') for x in data)
22
+ elif isinstance(data, np.ndarray):
23
+ return [format(i, "08b") for i in data]
24
+ elif isinstance(data, int) or isinstance(data, np.uint8):
25
+ return format(data, "08b")
26
+ else:
27
+ raise TypeError("Type not supported.")
28
 
29
+ def decode(image_name, txt=None):
30
+ """Decode watermark from image"""
31
+ try:
32
+ # First try PNG metadata method
33
  try:
34
+ im = Image.open(image_name)
35
+ if "TXT" in im.info:
36
+ return im.info["TXT"]
37
+ except:
38
+ pass
 
 
 
 
 
 
 
 
 
 
 
 
 
39
 
40
+ # Steganography method
41
+ image = cv2.imread(image_name)
42
+ if image is None:
43
+ raise ValueError("Could not read image file")
44
+
45
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
46
+ binary_data = ""
47
+
48
+ # Extract binary data from image
49
+ for row in image:
50
+ for pixel in row:
51
+ r, g, b = to_bin(pixel)
52
+ binary_data += r[-1]
53
+ binary_data += g[-1]
54
+ binary_data += b[-1]
55
 
56
+ # Convert binary string to bytes
57
+ bytes_data = b''
58
+ for i in range(0, len(binary_data), 8):
59
+ byte = binary_data[i:i+8]
60
+ if len(byte) == 8:
61
+ try:
62
+ byte_val = int(byte, 2)
63
+ bytes_data += bytes([byte_val])
64
+ except ValueError:
65
+ continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
 
67
+ # Try to find the end marker in raw bytes
 
68
  try:
69
+ end_marker = b'====='
70
+ if end_marker in bytes_data:
71
+ bytes_data = bytes_data.split(end_marker)[0]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
 
73
+ # Try to find the delimiter in raw bytes
74
+ delimiter = b'#####'
75
+ if delimiter in bytes_data:
76
+ bytes_data = bytes_data.split(delimiter)[0]
 
 
77
 
78
+ # Decode the bytes to string
79
+ decoded_text = bytes_data.decode('utf-8', errors='ignore')
80
+ return decoded_text.strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
  except Exception as e:
83
+ print(f"Decoding error: {e}")
84
+ # If regular decoding fails, try character by character
85
+ result = ""
86
+ current_bytes = bytearray()
87
+
88
+ for byte in bytes_data:
89
+ current_bytes.append(byte)
90
+ try:
91
+ char = current_bytes.decode('utf-8')
92
+ result += char
93
+ current_bytes = bytearray()
94
+ except UnicodeDecodeError:
95
+ continue
96
+
97
+ if result:
98
+ return result.strip()
99
+ return "Error: Could not decode watermark"
100
 
101
+ except Exception as e:
102
+ return f"Error detecting watermark: {str(e)}"
103
+
104
+ def encode(image_name, secret_data, txt=None):
105
+ """Encode watermark using steganography"""
106
+ try:
107
+ image = cv2.imread(image_name)
108
+ if image is None:
109
+ raise ValueError("Could not read image file")
110
+
111
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
112
+
113
+ # Calculate maximum bytes that can be encoded
114
+ n_bytes = image.shape[0] * image.shape[1] * 3 // 8
115
+
116
+ # Prepare the data with delimiters
117
+ secret_data = str(secret_data)
118
+ complete_data = secret_data + "#####" + "====="
119
+
120
+ # Convert to binary
121
+ binary_secret_data = to_bin(complete_data)
122
+
123
+ # Check if the data can fit in the image
124
+ if len(binary_secret_data) > n_bytes * 8:
125
+ return image_name, "Watermark is too large for Image Size"
126
+
127
+ data_index = 0
128
+ binary_len = len(binary_secret_data)
129
+
130
+ # Embed the data
131
+ for i in range(image.shape[0]):
132
+ for j in range(image.shape[1]):
133
+ if data_index < binary_len:
134
+ pixel = image[i, j]
135
+ for color_channel in range(3):
136
+ if data_index < binary_len:
137
+ # Get the binary value of the pixel
138
+ binary_value = format(pixel[color_channel], '08b')
139
+ # Replace the least significant bit
140
+ binary_value = binary_value[:-1] + binary_secret_data[data_index]
141
+ # Update the pixel value
142
+ image[i, j, color_channel] = int(binary_value, 2)
143
+ data_index += 1
144
+ else:
145
+ break
146
+
147
+ # Save the result
148
+ output_path = "watermarked_image.png"
149
+ cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
150
+ return output_path, "Watermark embedded successfully"
151
+
152
+ except Exception as e:
153
+ return image_name, f"Error encoding watermark: {str(e)}"