fantos commited on
Commit
310b952
·
verified ·
1 Parent(s): 3dc58d8

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +57 -94
utils.py CHANGED
@@ -64,61 +64,60 @@ class WatermarkProcessor:
64
  except Exception as e:
65
  return im_name, f"Error adding watermark: {str(e)}"
66
 
 
 
67
  def encode(self, image_path, watermark_text, metadata=None):
68
- """Encode watermark using steganography with encryption"""
69
  try:
 
70
  image = cv2.imread(image_path)
71
  if image is None:
72
  raise ValueError("Could not read image file")
73
-
74
- image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
75
 
76
- # Prepare watermark data
77
- watermark_data = {
78
  'text': watermark_text,
79
  'timestamp': datetime.now().isoformat(),
80
  'metadata': metadata or {}
81
  }
82
 
83
- # Add image hash
84
- image_copy = image.copy() & 0xFE # Clear LSB
85
- watermark_data['image_hash'] = hashlib.sha256(image_copy.tobytes()).hexdigest()
86
-
87
- # Encrypt data
88
- secret_data = json.dumps(watermark_data)
89
- secret_data = f"{secret_data}#####=====" # Add delimiters
90
- binary_secret_data = self.to_bin(secret_data)
91
 
92
- # Calculate capacity
93
- n_bytes = image.shape[0] * image.shape[1] * 3 // 8
94
- if len(binary_secret_data) > n_bytes * 8:
95
- return image_path, "Watermark is too large for Image Size"
96
 
97
- # Embed data
 
 
 
 
 
 
98
  data_index = 0
 
 
99
  for i in range(image.shape[0]):
100
  for j in range(image.shape[1]):
101
- if data_index < len(binary_secret_data):
102
- pixel = image[i, j]
103
- for k in range(3):
104
- if data_index < len(binary_secret_data):
105
- binary_value = format(pixel[k], '08b')
106
- binary_value = binary_value[:-1] + binary_secret_data[data_index]
107
- image[i, j, k] = int(binary_value, 2)
108
- data_index += 1
109
-
110
- # Save result
111
- output_path = "watermarked_" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".png"
112
- cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
113
  return output_path, "Watermark added successfully"
114
-
115
  except Exception as e:
116
  return image_path, f"Error in encoding: {str(e)}"
117
 
118
  def decode(self, image_path):
119
- """Decode watermark with simplified robust error handling"""
120
  try:
121
- # Try PNG metadata method first
122
  try:
123
  im = Image.open(image_path)
124
  if "TXT" in im.info:
@@ -126,78 +125,42 @@ class WatermarkProcessor:
126
  except:
127
  pass
128
 
129
- # Steganography method
130
  image = cv2.imread(image_path)
131
  if image is None:
132
  raise ValueError("Could not read image file")
133
-
134
- image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
135
-
136
- # Extract binary data
137
- binary_data = ""
138
- for row in image:
139
- for pixel in row:
140
- for value in pixel:
141
- binary_data += format(value, '08b')[-1]
142
-
143
- # Convert binary string to bytes
144
  bytes_data = bytearray()
145
  for i in range(0, len(binary_data), 8):
146
  if i + 8 <= len(binary_data):
147
- try:
148
- byte = int(binary_data[i:i+8], 2)
149
- bytes_data.append(byte)
150
- except ValueError:
151
- continue
152
-
153
- # Find end marker in raw bytes
154
- end_sequence = b'====='
155
- if end_sequence in bytes_data:
156
- bytes_data = bytes_data[:bytes_data.index(end_sequence)]
157
-
158
- # Find delimiter in raw bytes
159
- delimiter = b'#####'
160
- if delimiter in bytes_data:
161
- bytes_data = bytes_data[:bytes_data.index(delimiter)]
162
 
163
  try:
164
- # Try direct UTF-8 decoding first
165
  decoded_data = bytes_data.decode('utf-8')
166
 
167
- try:
168
- # Try to parse as JSON
169
- watermark_data = json.loads(decoded_data)
170
- # Verify image hash
171
- image_copy = image.copy() & 0xFE
172
- current_hash = hashlib.sha256(image_copy.tobytes()).hexdigest()
173
-
174
- if current_hash != watermark_data.get('image_hash'):
175
- return "Warning: Image has been modified after watermarking"
176
-
177
- # Return properly formatted JSON
178
- return json.dumps(watermark_data, indent=2, ensure_ascii=False)
179
- except json.JSONDecodeError:
180
- # If not JSON, return decoded text
181
- return decoded_data.strip()
182
-
183
  except UnicodeDecodeError:
184
- # If UTF-8 decoding fails, try character by character
185
- result = []
186
- i = 0
187
- while i < len(bytes_data):
188
- for j in range(min(4, len(bytes_data) - i), 0, -1):
189
- try:
190
- char = bytes_data[i:i+j].decode('utf-8')
191
- result.append(char)
192
- i += j
193
- break
194
- except UnicodeDecodeError:
195
- if j == 1: # If even single byte fails
196
- i += 1
197
- continue
198
-
199
- return ''.join(result).strip()
200
-
201
  except Exception as e:
202
  return f"Error in decoding: {str(e)}"
203
 
 
64
  except Exception as e:
65
  return im_name, f"Error adding watermark: {str(e)}"
66
 
67
+ # utils.py의 encode와 decode 메서드만 수정
68
+
69
  def encode(self, image_path, watermark_text, metadata=None):
70
+ """Encode watermark using steganography"""
71
  try:
72
+ # 이미지 읽기
73
  image = cv2.imread(image_path)
74
  if image is None:
75
  raise ValueError("Could not read image file")
 
 
76
 
77
+ # 워터마크 데이터 준비
78
+ data = {
79
  'text': watermark_text,
80
  'timestamp': datetime.now().isoformat(),
81
  'metadata': metadata or {}
82
  }
83
 
84
+ # JSON으로 변환
85
+ json_data = json.dumps(data, ensure_ascii=False)
86
+ secret_data = json_data + "#####" # 구분자 추가
 
 
 
 
 
87
 
88
+ # 바이트로 변환
89
+ byte_data = secret_data.encode('utf-8')
 
 
90
 
91
+ # 이미지 용량 확인
92
+ max_bytes = (image.shape[0] * image.shape[1] * 3) // 8
93
+ if len(byte_data) > max_bytes:
94
+ raise ValueError("Watermark data is too large for this image")
95
+
96
+ # 바이너리 데이터로 변환
97
+ binary_data = ''.join(format(byte, '08b') for byte in byte_data)
98
  data_index = 0
99
+
100
+ # 데이터 임베딩
101
  for i in range(image.shape[0]):
102
  for j in range(image.shape[1]):
103
+ for k in range(3): # RGB channels
104
+ if data_index < len(binary_data):
105
+ # LSB 수정
106
+ image[i, j, k] = (image[i, j, k] & 0xFE) | int(binary_data[data_index])
107
+ data_index += 1
108
+
109
+ # 결과 저장
110
+ output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
111
+ cv2.imwrite(output_path, image)
 
 
 
112
  return output_path, "Watermark added successfully"
113
+
114
  except Exception as e:
115
  return image_path, f"Error in encoding: {str(e)}"
116
 
117
  def decode(self, image_path):
118
+ """Decode watermark from image"""
119
  try:
120
+ # PNG metadata method first
121
  try:
122
  im = Image.open(image_path)
123
  if "TXT" in im.info:
 
125
  except:
126
  pass
127
 
128
+ # 이미지 읽기
129
  image = cv2.imread(image_path)
130
  if image is None:
131
  raise ValueError("Could not read image file")
132
+
133
+ # LSB 추출
134
+ binary_data = ''
135
+ for i in range(image.shape[0]):
136
+ for j in range(image.shape[1]):
137
+ for k in range(3):
138
+ binary_data += str(image[i, j, k] & 1)
139
+
140
+ # 8비트씩 잘라서 ��이트로 변환
 
 
141
  bytes_data = bytearray()
142
  for i in range(0, len(binary_data), 8):
143
  if i + 8 <= len(binary_data):
144
+ byte = int(binary_data[i:i+8], 2)
145
+ bytes_data.append(byte)
146
+
147
+ # 구분자 체크 (#####)
148
+ if bytes_data[-5:] == b'#####':
149
+ bytes_data = bytes_data[:-5] # 구분자 제거
150
+ break
 
 
 
 
 
 
 
 
151
 
152
  try:
153
+ # UTF-8 디코딩
154
  decoded_data = bytes_data.decode('utf-8')
155
 
156
+ # JSON 파싱
157
+ watermark_data = json.loads(decoded_data)
158
+ return json.dumps(watermark_data, ensure_ascii=False, indent=2)
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  except UnicodeDecodeError:
160
+ return "Error: Invalid character encoding"
161
+ except json.JSONDecodeError:
162
+ return decoded_data
163
+
 
 
 
 
 
 
 
 
 
 
 
 
 
164
  except Exception as e:
165
  return f"Error in decoding: {str(e)}"
166