fantos commited on
Commit
4263379
·
verified ·
1 Parent(s): d5eb308

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +189 -191
utils.py CHANGED
@@ -1,5 +1,3 @@
1
- # utils.py
2
-
3
  import cv2
4
  import numpy as np
5
  from PIL import Image, PngImagePlugin, ImageDraw
@@ -10,193 +8,193 @@ import base64
10
  import hashlib
11
 
12
  class WatermarkProcessor:
13
- def __init__(self, encryption_key=None):
14
- """Initialize with optional encryption key"""
15
- if encryption_key:
16
- self.fernet = Fernet(encryption_key)
17
- else:
18
- key = Fernet.generate_key()
19
- self.fernet = Fernet(key)
20
-
21
- def to_bin(self, data):
22
- """Convert data to binary format as string"""
23
- if isinstance(data, str):
24
- return ''.join(format(ord(char), '08b') for char in data)
25
- elif isinstance(data, bytes):
26
- return ''.join(format(x, '08b') for x in data)
27
- elif isinstance(data, np.ndarray):
28
- return [format(i, "08b") for i in data]
29
- elif isinstance(data, int) or isinstance(data, np.uint8):
30
- return format(data, "08b")
31
- else:
32
- raise TypeError("Type not supported.")
33
-
34
- def create_preview(self, image_path, watermark_text, opacity=0.3):
35
- """Create a preview of watermark on image"""
36
- try:
37
- image = Image.open(image_path)
38
- txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0))
39
- draw = ImageDraw.Draw(txt_layer)
40
-
41
- # Calculate text position
42
- text_width = draw.textlength(watermark_text)
43
- text_x = (image.width - text_width) // 2
44
- text_y = image.height // 2
45
-
46
- # Add watermark text
47
- draw.text((text_x, text_y), watermark_text,
48
- fill=(255, 255, 255, int(255 * opacity)))
49
-
50
- # Combine layers
51
- preview = Image.alpha_composite(image.convert('RGBA'), txt_layer)
52
- return preview
53
- except Exception as e:
54
- return None
55
-
56
- def png_encode(self, im_name, extra):
57
- """Encode watermark using PNG metadata"""
58
- try:
59
- im = Image.open(im_name)
60
- info = PngImagePlugin.PngInfo()
61
- info.add_text("TXT", extra)
62
- im.save("test.png", pnginfo=info)
63
- return "test.png", "Watermark added successfully"
64
- except Exception as e:
65
- return im_name, f"Error adding watermark: {str(e)}"
66
-
67
- def encode(self, image_path, watermark_text, metadata=None):
68
- """Encode watermark using simple LSB steganography"""
69
- try:
70
- image = cv2.imread(image_path)
71
- if image is None:
72
- raise ValueError("Could not read image file")
73
-
74
- # Prepare watermark data
75
- data = {
76
- 'text': watermark_text,
77
- 'timestamp': datetime.now().isoformat(),
78
- 'metadata': metadata or {}
79
- }
80
-
81
- # Convert data to string with end marker
82
- json_str = json.dumps(data, ensure_ascii=False)
83
- secret_data = json_str + "#####"
84
-
85
- # Convert string to binary (using utf-8 encoding)
86
- binary_data = ''.join(format(ord(char), '08b') for char in secret_data)
87
-
88
- # Check capacity
89
- if len(binary_data) > image.shape[0] * image.shape[1] * 3:
90
- return image_path, "Error: Image too small for watermark data"
91
-
92
- # Embed data
93
- data_index = 0
94
- for i in range(image.shape[0]):
95
- for j in range(image.shape[1]):
96
- for k in range(3):
97
- if data_index < len(binary_data):
98
- pixel = int(image[i, j, k])
99
- # Clear the LSB
100
- pixel = pixel & 0xFE # Clear last bit
101
- # Set the LSB according to our data
102
- pixel = pixel | (int(binary_data[data_index]) & 1)
103
- image[i, j, k] = np.uint8(pixel)
104
- data_index += 1
105
- else:
106
- break
107
- if data_index >= len(binary_data):
108
- break
109
  if data_index >= len(binary_data):
110
- break
111
-
112
- # Save result
113
- output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
114
- cv2.imwrite(output_path, image)
115
- return output_path, "Watermark added successfully"
116
-
117
- except Exception as e:
118
- return image_path, f"Error in encoding: {str(e)}"
119
-
120
- def decode(self, image_path):
121
- """Decode watermark using simple LSB steganography"""
122
- try:
123
- # Try PNG metadata first
124
- try:
125
- im = Image.open(image_path)
126
- if "TXT" in im.info:
127
- return im.info["TXT"]
128
- except:
129
- pass
130
-
131
- image = cv2.imread(image_path)
132
- if image is None:
133
- raise ValueError("Could not read image file")
134
-
135
- # Extract binary data
136
- binary_data = ''
137
- for i in range(image.shape[0]):
138
- for j in range(image.shape[1]):
139
- for k in range(3):
140
- binary_data += str(image[i, j, k] & 1)
141
-
142
- # Convert binary to text
143
- text = ''
144
- for i in range(0, len(binary_data), 8):
145
- if i + 8 <= len(binary_data):
146
- byte = binary_data[i:i+8]
147
- text += chr(int(byte, 2))
148
-
149
- # Check for end marker
150
- if "#####" in text:
151
- # Extract content before marker
152
- text = text.split("#####")[0]
153
- try:
154
- # Parse JSON
155
- data = json.loads(text)
156
- return json.dumps(data, ensure_ascii=False, indent=2)
157
- except json.JSONDecodeError:
158
- return text
159
- break
160
-
161
- return "Error: No valid watermark found"
162
-
163
- except Exception as e:
164
- return f"Error in decoding: {str(e)}"
165
-
166
- def analyze_quality(self, original_path, watermarked_path):
167
- """Analyze watermark quality"""
168
- try:
169
- original = cv2.imread(original_path)
170
- watermarked = cv2.imread(watermarked_path)
171
-
172
- if original is None or watermarked is None:
173
- raise ValueError("Could not read image files")
174
-
175
- # Calculate PSNR
176
- mse = np.mean((original - watermarked) ** 2)
177
- if mse == 0:
178
- psnr = float('inf')
179
- else:
180
- psnr = 20 * np.log10(255.0 / np.sqrt(mse))
181
-
182
- # Calculate histogram similarity
183
- hist_original = cv2.calcHist([original], [0], None, [256], [0, 256])
184
- hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256])
185
- hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL)
186
-
187
- # Count modified pixels
188
- diff = cv2.bitwise_xor(original, watermarked)
189
- modified_pixels = np.count_nonzero(diff)
190
-
191
- report = {
192
- 'psnr': round(psnr, 2),
193
- 'histogram_similarity': round(hist_correlation, 4),
194
- 'modified_pixels': modified_pixels,
195
- 'image_size': original.shape,
196
- 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100
197
- }
198
-
199
- return json.dumps(report, indent=2)
200
-
201
- except Exception as e:
202
- return f"Error in quality analysis: {str(e)}"
 
 
 
 
 
1
  import cv2
2
  import numpy as np
3
  from PIL import Image, PngImagePlugin, ImageDraw
 
8
  import hashlib
9
 
10
  class WatermarkProcessor:
11
+ def __init__(self, encryption_key=None):
12
+ """Initialize with optional encryption key"""
13
+ if encryption_key:
14
+ self.fernet = Fernet(encryption_key)
15
+ else:
16
+ key = Fernet.generate_key()
17
+ self.fernet = Fernet(key)
18
+
19
+ def to_bin(self, data):
20
+ """Convert data to binary format as string"""
21
+ if isinstance(data, str):
22
+ return ''.join(format(ord(char), '08b') for char in data)
23
+ elif isinstance(data, bytes):
24
+ return ''.join(format(x, '08b') for x in data)
25
+ elif isinstance(data, np.ndarray):
26
+ return [format(i, "08b") for i in data]
27
+ elif isinstance(data, int) or isinstance(data, np.uint8):
28
+ return format(data, "08b")
29
+ else:
30
+ raise TypeError("Type not supported.")
31
+
32
+ def create_preview(self, image_path, watermark_text, opacity=0.3):
33
+ """Create a preview of watermark on image"""
34
+ try:
35
+ image = Image.open(image_path)
36
+ txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0))
37
+ draw = ImageDraw.Draw(txt_layer)
38
+
39
+ # Calculate text position
40
+ text_width = draw.textlength(watermark_text)
41
+ text_x = (image.width - text_width) // 2
42
+ text_y = image.height // 2
43
+
44
+ # Add watermark text
45
+ draw.text((text_x, text_y), watermark_text,
46
+ fill=(255, 255, 255, int(255 * opacity)))
47
+
48
+ # Combine layers
49
+ preview = Image.alpha_composite(image.convert('RGBA'), txt_layer)
50
+ return preview
51
+ except Exception as e:
52
+ return None
53
+
54
+ def png_encode(self, im_name, extra):
55
+ """Encode watermark using PNG metadata"""
56
+ try:
57
+ im = Image.open(im_name)
58
+ info = PngImagePlugin.PngInfo()
59
+ info.add_text("TXT", extra)
60
+ im.save("test.png", pnginfo=info)
61
+ return "test.png", "Watermark added successfully"
62
+ except Exception as e:
63
+ return im_name, f"Error adding watermark: {str(e)}"
64
+
65
+ def encode(self, image_path, watermark_text, metadata=None):
66
+ """Encode watermark using simple LSB steganography"""
67
+ try:
68
+ image = cv2.imread(image_path)
69
+ if image is None:
70
+ raise ValueError("Could not read image file")
71
+
72
+ # Prepare watermark data
73
+ data = {
74
+ 'text': watermark_text,
75
+ 'timestamp': datetime.now().isoformat(),
76
+ 'metadata': metadata or {}
77
+ }
78
+
79
+ # Convert data to string with end marker
80
+ json_str = json.dumps(data, ensure_ascii=False)
81
+ secret_data = json_str + "#####"
82
+
83
+ # Convert string to binary (using utf-8 encoding)
84
+ binary_data = ''.join(format(ord(char), '08b') for char in secret_data)
85
+
86
+ # Check capacity
87
+ if len(binary_data) > image.shape[0] * image.shape[1] * 3:
88
+ return image_path, "Error: Image too small for watermark data"
89
+
90
+ # Embed data
91
+ data_index = 0
92
+ for i in range(image.shape[0]):
93
+ for j in range(image.shape[1]):
94
+ for k in range(3):
95
+ if data_index < len(binary_data):
96
+ pixel = int(image[i, j, k])
97
+ # Clear the LSB
98
+ pixel = pixel & 0xFE # Clear last bit
99
+ # Set the LSB according to our data
100
+ pixel = pixel | (int(binary_data[data_index]) & 1)
101
+ image[i, j, k] = np.uint8(pixel)
102
+ data_index += 1
103
+ else:
104
+ break
 
 
105
  if data_index >= len(binary_data):
106
+ break
107
+ if data_index >= len(binary_data):
108
+ break
109
+
110
+ # Save result
111
+ output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
112
+ cv2.imwrite(output_path, image)
113
+ return output_path, "Watermark added successfully"
114
+
115
+ except Exception as e:
116
+ return image_path, f"Error in encoding: {str(e)}"
117
+
118
+ def decode(self, image_path):
119
+ """Decode watermark using simple LSB steganography"""
120
+ try:
121
+ # Try PNG metadata first
122
+ try:
123
+ im = Image.open(image_path)
124
+ if "TXT" in im.info:
125
+ return im.info["TXT"]
126
+ except:
127
+ pass
128
+
129
+ image = cv2.imread(image_path)
130
+ if image is None:
131
+ raise ValueError("Could not read image file")
132
+
133
+ # Extract binary data
134
+ binary_data = ''
135
+ for i in range(image.shape[0]):
136
+ for j in range(image.shape[1]):
137
+ for k in range(3):
138
+ binary_data += str(image[i, j, k] & 1)
139
+
140
+ # Convert binary to text
141
+ text = ''
142
+ for i in range(0, len(binary_data), 8):
143
+ if i + 8 <= len(binary_data):
144
+ byte = binary_data[i:i+8]
145
+ text += chr(int(byte, 2))
146
+
147
+ # Check for end marker
148
+ if "#####" in text:
149
+ # Extract content before marker
150
+ text = text.split("#####")[0]
151
+ try:
152
+ # Parse JSON
153
+ data = json.loads(text)
154
+ return json.dumps(data, ensure_ascii=False, indent=2)
155
+ except json.JSONDecodeError:
156
+ return text
157
+ break
158
+
159
+ return "Error: No valid watermark found"
160
+
161
+ except Exception as e:
162
+ return f"Error in decoding: {str(e)}"
163
+
164
+ def analyze_quality(self, original_path, watermarked_path):
165
+ """Analyze watermark quality"""
166
+ try:
167
+ original = cv2.imread(original_path)
168
+ watermarked = cv2.imread(watermarked_path)
169
+
170
+ if original is None or watermarked is None:
171
+ raise ValueError("Could not read image files")
172
+
173
+ # Calculate PSNR
174
+ mse = np.mean((original - watermarked) ** 2)
175
+ if mse == 0:
176
+ psnr = float('inf')
177
+ else:
178
+ psnr = 20 * np.log10(255.0 / np.sqrt(mse))
179
+
180
+ # Calculate histogram similarity
181
+ hist_original = cv2.calcHist([original], [0], None, [256], [0, 256])
182
+ hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256])
183
+ hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL)
184
+
185
+ # Count modified pixels
186
+ diff = cv2.bitwise_xor(original, watermarked)
187
+ modified_pixels = np.count_nonzero(diff)
188
+
189
+ report = {
190
+ 'psnr': round(psnr, 2),
191
+ 'histogram_similarity': round(hist_correlation, 4),
192
+ 'modified_pixels': modified_pixels,
193
+ 'image_size': original.shape,
194
+ 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100
195
+ }
196
+
197
+ return json.dumps(report, indent=2)
198
+
199
+ except Exception as e:
200
+ return f"Error in quality analysis: {str(e)}"