fantos commited on
Commit
6eb0ffe
·
verified ·
1 Parent(s): 310b952

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +174 -191
utils.py CHANGED
@@ -10,194 +10,177 @@ import base64
10
  import hashlib
11
 
12
  class WatermarkProcessor:
13
- def __init__(self, encryption_key=None):
14
- """Initialize with optional encryption key"""
15
- if encryption_key:
16
- self.fernet = Fernet(encryption_key)
17
- else:
18
- key = Fernet.generate_key()
19
- self.fernet = Fernet(key)
20
-
21
- def to_bin(self, data):
22
- """Convert data to binary format as string"""
23
- if isinstance(data, str):
24
- return ''.join(format(x, '08b') for x in data.encode('utf-8'))
25
- elif isinstance(data, bytes):
26
- return ''.join(format(x, '08b') for x in data)
27
- elif isinstance(data, np.ndarray):
28
- return [format(i, "08b") for i in data]
29
- elif isinstance(data, int) or isinstance(data, np.uint8):
30
- return format(data, "08b")
31
- else:
32
- raise TypeError("Type not supported.")
33
-
34
- def create_preview(self, image_path, watermark_text, opacity=0.3):
35
- """Create a preview of watermark on image"""
36
- try:
37
- image = Image.open(image_path)
38
- txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0))
39
- draw = ImageDraw.Draw(txt_layer)
40
-
41
- # Calculate text position
42
- text_width = draw.textlength(watermark_text)
43
- text_x = (image.width - text_width) // 2
44
- text_y = image.height // 2
45
-
46
- # Add watermark text
47
- draw.text((text_x, text_y), watermark_text,
48
- fill=(255, 255, 255, int(255 * opacity)))
49
-
50
- # Combine layers
51
- preview = Image.alpha_composite(image.convert('RGBA'), txt_layer)
52
- return preview
53
- except Exception as e:
54
- return None
55
-
56
- def png_encode(self, im_name, extra):
57
- """Encode watermark using PNG metadata"""
58
- try:
59
- im = Image.open(im_name)
60
- info = PngImagePlugin.PngInfo()
61
- info.add_text("TXT", extra)
62
- im.save("test.png", pnginfo=info)
63
- return "test.png", "Watermark added successfully"
64
- except Exception as e:
65
- return im_name, f"Error adding watermark: {str(e)}"
66
-
67
- # utils.py의 encode decode 메서드만 수정
68
-
69
- def encode(self, image_path, watermark_text, metadata=None):
70
- """Encode watermark using steganography"""
71
- try:
72
- # 이미지 읽기
73
- image = cv2.imread(image_path)
74
- if image is None:
75
- raise ValueError("Could not read image file")
76
-
77
- # 워터마크 데이터 준비
78
- data = {
79
- 'text': watermark_text,
80
- 'timestamp': datetime.now().isoformat(),
81
- 'metadata': metadata or {}
82
- }
83
-
84
- # JSON으로 변환
85
- json_data = json.dumps(data, ensure_ascii=False)
86
- secret_data = json_data + "#####" # 구분자 추가
87
-
88
- # 바이트로 변환
89
- byte_data = secret_data.encode('utf-8')
90
-
91
- # 이미지 용량 확인
92
- max_bytes = (image.shape[0] * image.shape[1] * 3) // 8
93
- if len(byte_data) > max_bytes:
94
- raise ValueError("Watermark data is too large for this image")
95
-
96
- # 바이너리 데이터로 변환
97
- binary_data = ''.join(format(byte, '08b') for byte in byte_data)
98
- data_index = 0
99
-
100
- # 데이터 임베딩
101
- for i in range(image.shape[0]):
102
- for j in range(image.shape[1]):
103
- for k in range(3): # RGB channels
104
- if data_index < len(binary_data):
105
- # LSB 수정
106
- image[i, j, k] = (image[i, j, k] & 0xFE) | int(binary_data[data_index])
107
- data_index += 1
108
-
109
- # 결과 저장
110
- output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
111
- cv2.imwrite(output_path, image)
112
- return output_path, "Watermark added successfully"
113
-
114
- except Exception as e:
115
- return image_path, f"Error in encoding: {str(e)}"
116
-
117
- def decode(self, image_path):
118
- """Decode watermark from image"""
119
- try:
120
- # PNG metadata method first
121
- try:
122
- im = Image.open(image_path)
123
- if "TXT" in im.info:
124
- return im.info["TXT"]
125
- except:
126
- pass
127
-
128
- # 이미지 읽기
129
- image = cv2.imread(image_path)
130
- if image is None:
131
- raise ValueError("Could not read image file")
132
-
133
- # LSB 추출
134
- binary_data = ''
135
- for i in range(image.shape[0]):
136
- for j in range(image.shape[1]):
137
- for k in range(3):
138
- binary_data += str(image[i, j, k] & 1)
139
-
140
- # 8비트씩 잘라서 바이트로 변환
141
- bytes_data = bytearray()
142
- for i in range(0, len(binary_data), 8):
143
- if i + 8 <= len(binary_data):
144
- byte = int(binary_data[i:i+8], 2)
145
- bytes_data.append(byte)
146
-
147
- # 구분자 체크 (#####)
148
- if bytes_data[-5:] == b'#####':
149
- bytes_data = bytes_data[:-5] # 구분자 제거
150
- break
151
-
152
- try:
153
- # UTF-8로 디코딩
154
- decoded_data = bytes_data.decode('utf-8')
155
-
156
- # JSON 파싱
157
- watermark_data = json.loads(decoded_data)
158
- return json.dumps(watermark_data, ensure_ascii=False, indent=2)
159
- except UnicodeDecodeError:
160
- return "Error: Invalid character encoding"
161
- except json.JSONDecodeError:
162
- return decoded_data
163
-
164
- except Exception as e:
165
- return f"Error in decoding: {str(e)}"
166
-
167
- def analyze_quality(self, original_path, watermarked_path):
168
- """Analyze watermark quality"""
169
- try:
170
- original = cv2.imread(original_path)
171
- watermarked = cv2.imread(watermarked_path)
172
-
173
- if original is None or watermarked is None:
174
- raise ValueError("Could not read image files")
175
-
176
- # Calculate PSNR
177
- mse = np.mean((original - watermarked) ** 2)
178
- if mse == 0:
179
- psnr = float('inf')
180
- else:
181
- psnr = 20 * np.log10(255.0 / np.sqrt(mse))
182
-
183
- # Calculate histogram similarity
184
- hist_original = cv2.calcHist([original], [0], None, [256], [0, 256])
185
- hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256])
186
- hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL)
187
-
188
- # Count modified pixels
189
- diff = cv2.bitwise_xor(original, watermarked)
190
- modified_pixels = np.count_nonzero(diff)
191
-
192
- report = {
193
- 'psnr': round(psnr, 2),
194
- 'histogram_similarity': round(hist_correlation, 4),
195
- 'modified_pixels': modified_pixels,
196
- 'image_size': original.shape,
197
- 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100
198
- }
199
-
200
- return json.dumps(report, indent=2)
201
-
202
- except Exception as e:
203
- return f"Error in quality analysis: {str(e)}"
 
10
  import hashlib
11
 
12
  class WatermarkProcessor:
13
+ def __init__(self, encryption_key=None):
14
+ """Initialize with optional encryption key"""
15
+ if encryption_key:
16
+ self.fernet = Fernet(encryption_key)
17
+ else:
18
+ key = Fernet.generate_key()
19
+ self.fernet = Fernet(key)
20
+
21
+ def to_bin(self, data):
22
+ """Convert data to binary format as string"""
23
+ if isinstance(data, str):
24
+ return ''.join(format(ord(char), '08b') for char in data)
25
+ elif isinstance(data, bytes):
26
+ return ''.join(format(x, '08b') for x in data)
27
+ elif isinstance(data, np.ndarray):
28
+ return [format(i, "08b") for i in data]
29
+ elif isinstance(data, int) or isinstance(data, np.uint8):
30
+ return format(data, "08b")
31
+ else:
32
+ raise TypeError("Type not supported.")
33
+
34
+ def create_preview(self, image_path, watermark_text, opacity=0.3):
35
+ """Create a preview of watermark on image"""
36
+ try:
37
+ image = Image.open(image_path)
38
+ txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0))
39
+ draw = ImageDraw.Draw(txt_layer)
40
+
41
+ # Calculate text position
42
+ text_width = draw.textlength(watermark_text)
43
+ text_x = (image.width - text_width) // 2
44
+ text_y = image.height // 2
45
+
46
+ # Add watermark text
47
+ draw.text((text_x, text_y), watermark_text,
48
+ fill=(255, 255, 255, int(255 * opacity)))
49
+
50
+ # Combine layers
51
+ preview = Image.alpha_composite(image.convert('RGBA'), txt_layer)
52
+ return preview
53
+ except Exception as e:
54
+ return None
55
+
56
+ def png_encode(self, im_name, extra):
57
+ """Encode watermark using PNG metadata"""
58
+ try:
59
+ im = Image.open(im_name)
60
+ info = PngImagePlugin.PngInfo()
61
+ info.add_text("TXT", extra)
62
+ im.save("test.png", pnginfo=info)
63
+ return "test.png", "Watermark added successfully"
64
+ except Exception as e:
65
+ return im_name, f"Error adding watermark: {str(e)}"
66
+
67
+ def encode(self, image_path, watermark_text, metadata=None):
68
+ """Encode watermark using simple LSB steganography"""
69
+ try:
70
+ image = cv2.imread(image_path)
71
+ if image is None:
72
+ raise ValueError("Could not read image file")
73
+
74
+ # Prepare watermark data
75
+ data = {
76
+ 'text': watermark_text,
77
+ 'timestamp': datetime.now().isoformat(),
78
+ 'metadata': metadata or {}
79
+ }
80
+
81
+ # Convert to string and add delimiter
82
+ secret_data = json.dumps(data, ensure_ascii=False) + "###END###"
83
+
84
+ # Convert to binary string
85
+ binary_secret = ''.join(format(ord(char), '08b') for char in secret_data)
86
+
87
+ # Check capacity
88
+ if len(binary_secret) > image.shape[0] * image.shape[1] * 3:
89
+ return image_path, "Error: Image too small for watermark data"
90
+
91
+ # Embed data
92
+ data_index = 0
93
+ for i in range(image.shape[0]):
94
+ for j in range(image.shape[1]):
95
+ for k in range(3):
96
+ if data_index < len(binary_secret):
97
+ # Clear LSB and add data bit
98
+ image[i, j, k] = (image[i, j, k] & ~1) | int(binary_secret[data_index])
99
+ data_index += 1
100
+ else:
101
+ break
102
+ if data_index >= len(binary_secret):
103
+ break
104
+ if data_index >= len(binary_secret):
105
+ break
106
+
107
+ # Save result
108
+ output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
109
+ cv2.imwrite(output_path, image)
110
+ return output_path, "Watermark added successfully"
111
+
112
+ except Exception as e:
113
+ return image_path, f"Error in encoding: {str(e)}"
114
+
115
+ def decode(self, image_path):
116
+ """Decode watermark using simple LSB steganography"""
117
+ try:
118
+ image = cv2.imread(image_path)
119
+ if image is None:
120
+ raise ValueError("Could not read image file")
121
+
122
+ # Extract binary string
123
+ binary_data = ''
124
+ for i in range(image.shape[0]):
125
+ for j in range(image.shape[1]):
126
+ for k in range(3):
127
+ binary_data += str(image[i, j, k] & 1)
128
+
129
+ # Convert binary to string
130
+ decoded_chars = []
131
+ for i in range(0, len(binary_data), 8):
132
+ byte = binary_data[i:i+8]
133
+ if len(byte) == 8:
134
+ decoded_chars.append(chr(int(byte, 2)))
135
+ # Check for delimiter
136
+ if ''.join(decoded_chars[-7:]) == "###END###":
137
+ decoded_text = ''.join(decoded_chars[:-7])
138
+ try:
139
+ # Parse JSON data
140
+ watermark_data = json.loads(decoded_text)
141
+ return json.dumps(watermark_data, ensure_ascii=False, indent=2)
142
+ except json.JSONDecodeError:
143
+ return decoded_text
144
+
145
+ return "Error: No valid watermark found"
146
+
147
+ except Exception as e:
148
+ return f"Error in decoding: {str(e)}"
149
+
150
+ def analyze_quality(self, original_path, watermarked_path):
151
+ """Analyze watermark quality"""
152
+ try:
153
+ original = cv2.imread(original_path)
154
+ watermarked = cv2.imread(watermarked_path)
155
+
156
+ if original is None or watermarked is None:
157
+ raise ValueError("Could not read image files")
158
+
159
+ # Calculate PSNR
160
+ mse = np.mean((original - watermarked) ** 2)
161
+ if mse == 0:
162
+ psnr = float('inf')
163
+ else:
164
+ psnr = 20 * np.log10(255.0 / np.sqrt(mse))
165
+
166
+ # Calculate histogram similarity
167
+ hist_original = cv2.calcHist([original], [0], None, [256], [0, 256])
168
+ hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256])
169
+ hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL)
170
+
171
+ # Count modified pixels
172
+ diff = cv2.bitwise_xor(original, watermarked)
173
+ modified_pixels = np.count_nonzero(diff)
174
+
175
+ report = {
176
+ 'psnr': round(psnr, 2),
177
+ 'histogram_similarity': round(hist_correlation, 4),
178
+ 'modified_pixels': modified_pixels,
179
+ 'image_size': original.shape,
180
+ 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100
181
+ }
182
+
183
+ return json.dumps(report, indent=2)
184
+
185
+ except Exception as e:
186
+ return f"Error in quality analysis: {str(e)}"