seanpedrickcase commited on
Commit
f957846
·
1 Parent(s): bafcf39

General code changes and reformatting to address code vulnerabilities highlighted by codeQL scan, and black/ruff repplied to code. Fixes/optimisation of Github Actions

Browse files
.dockerignore CHANGED
@@ -26,3 +26,11 @@ input/
26
  feedback/
27
  config/
28
  usage/
 
 
 
 
 
 
 
 
 
26
  feedback/
27
  config/
28
  usage/
29
+ test/config/*
30
+ test/feedback/*
31
+ test/input/*
32
+ test/logs/*
33
+ test/output/*
34
+ test/tmp/*
35
+ test/usage/*
36
+ .ruff_cache/*
.github/README.md CHANGED
@@ -27,7 +27,7 @@ This directory contains GitHub Actions workflows for automated testing of the CL
27
 
28
  ### 3. **Multi-OS Testing** (`.github/workflows/multi-os-test.yml`)
29
  - **Purpose**: Cross-platform testing
30
- - **OS**: Ubuntu, Windows, macOS
31
  - **Python**: 3.10, 3.11, 3.12
32
  - **Features**: Tests compatibility across different operating systems
33
 
 
27
 
28
  ### 3. **Multi-OS Testing** (`.github/workflows/multi-os-test.yml`)
29
  - **Purpose**: Cross-platform testing
30
+ - **OS**: Ubuntu, macOS (Windows not included currently but may be reintroduced)
31
  - **Python**: 3.10, 3.11, 3.12
32
  - **Features**: Tests compatibility across different operating systems
33
 
.github/scripts/setup_test_data.py CHANGED
@@ -142,14 +142,17 @@ def create_allow_deny_lists():
142
  def create_ocr_output():
143
  """Create dummy OCR output CSV."""
144
  ocr_data = {
145
- "file_name": ["test.pdf", "test.pdf", "test.pdf"],
146
- "page_number": [1, 2, 3],
147
  "text": [
148
  "This is page 1 content with some text",
149
  "This is page 2 content with different text",
150
  "This is page 3 content with more text",
151
  ],
152
- "confidence": [0.95, 0.92, 0.88],
 
 
 
 
153
  }
154
  df = pd.DataFrame(ocr_data)
155
  df.to_csv(
 
142
  def create_ocr_output():
143
  """Create dummy OCR output CSV."""
144
  ocr_data = {
145
+ "page": [1, 2, 3],
 
146
  "text": [
147
  "This is page 1 content with some text",
148
  "This is page 2 content with different text",
149
  "This is page 3 content with more text",
150
  ],
151
+ "left": [0.1, 0.3, 0.5],
152
+ "top": [0.95, 0.92, 0.88],
153
+ "width": [0.05, 0.02, 0.02],
154
+ "height": [0.01, 0.02, 0.02],
155
+ "line": [1, 2, 3],
156
  }
157
  df = pd.DataFrame(ocr_data)
158
  df.to_csv(
.github/workflows/ci.yml CHANGED
@@ -2,12 +2,18 @@ name: CI/CD Pipeline
2
 
3
  on:
4
  push:
5
- branches: [ main, dev ]
6
  pull_request:
7
- branches: [ main, dev ]
8
- schedule:
9
- # Run tests daily at 2 AM UTC
10
- - cron: '0 2 * * *'
 
 
 
 
 
 
11
 
12
  env:
13
  PYTHON_VERSION: "3.11"
@@ -38,7 +44,7 @@ jobs:
38
  runs-on: ubuntu-latest
39
  strategy:
40
  matrix:
41
- python-version: [3.10, 3.11, 3.12]
42
 
43
  steps:
44
  - uses: actions/checkout@v4
@@ -180,9 +186,9 @@ jobs:
180
  python -m pip install --upgrade pip
181
  pip install safety bandit
182
 
183
- - name: Run safety check
184
  run: |
185
- safety check -r requirements.txt
186
 
187
  - name: Run bandit security check
188
  run: |
 
2
 
3
  on:
4
  push:
5
+ branches: [ main ]
6
  pull_request:
7
+ branches: [ main ]
8
+ #schedule:
9
+ # Run tests daily at 2 AM UTC
10
+ # - cron: '0 2 * * *'
11
+
12
+ permissions:
13
+ contents: read
14
+ actions: read
15
+ pull-requests: write
16
+ issues: write
17
 
18
  env:
19
  PYTHON_VERSION: "3.11"
 
44
  runs-on: ubuntu-latest
45
  strategy:
46
  matrix:
47
+ python-version: [3.11, 3.12, 3.13]
48
 
49
  steps:
50
  - uses: actions/checkout@v4
 
186
  python -m pip install --upgrade pip
187
  pip install safety bandit
188
 
189
+ - name: Run safety scan
190
  run: |
191
+ safety scan -r requirements.txt
192
 
193
  - name: Run bandit security check
194
  run: |
.github/workflows/multi-os-test.yml CHANGED
@@ -2,23 +2,27 @@ name: Multi-OS Test
2
 
3
  on:
4
  push:
5
- branches: [ main, dev ]
6
  pull_request:
7
- branches: [ main, dev ]
 
 
 
 
8
 
9
  jobs:
10
  test:
11
  runs-on: ${{ matrix.os }}
12
  strategy:
13
  matrix:
14
- os: [ubuntu-latest, windows-latest, macos-latest]
15
  python-version: ["3.10", "3.11", "3.12"]
16
  exclude:
17
  # Exclude some combinations to reduce CI time
18
- - os: windows-latest
19
- python-version: "3.10"
20
  - os: macos-latest
21
- python-version: "3.12"
22
 
23
  steps:
24
  - uses: actions/checkout@v4
 
2
 
3
  on:
4
  push:
5
+ branches: [ main ]
6
  pull_request:
7
+ branches: [ main ]
8
+
9
+ permissions:
10
+ contents: read
11
+ actions: read
12
 
13
  jobs:
14
  test:
15
  runs-on: ${{ matrix.os }}
16
  strategy:
17
  matrix:
18
+ os: [ubuntu-latest, macos-latest] # windows-latest removed for now as I have not been able to install tesseract on Windows using this method
19
  python-version: ["3.10", "3.11", "3.12"]
20
  exclude:
21
  # Exclude some combinations to reduce CI time
22
+ #- os: windows-latest
23
+ # python-version: "3.10"
24
  - os: macos-latest
25
+ python-version: "3.11"
26
 
27
  steps:
28
  - uses: actions/checkout@v4
.github/workflows/simple-test.yml CHANGED
@@ -6,6 +6,10 @@ on:
6
  pull_request:
7
  branches: [ main, dev ]
8
 
 
 
 
 
9
  jobs:
10
  test:
11
  runs-on: ubuntu-latest
 
6
  pull_request:
7
  branches: [ main, dev ]
8
 
9
+ permissions:
10
+ contents: read
11
+ actions: read
12
+
13
  jobs:
14
  test:
15
  runs-on: ubuntu-latest
.github/workflows/test.yml CHANGED
@@ -6,6 +6,11 @@ on:
6
  pull_request:
7
  branches: [ main, dev ]
8
 
 
 
 
 
 
9
  jobs:
10
  test:
11
  runs-on: ubuntu-latest
 
6
  pull_request:
7
  branches: [ main, dev ]
8
 
9
+ permissions:
10
+ contents: read
11
+ actions: read
12
+ pull-requests: write
13
+
14
  jobs:
15
  test:
16
  runs-on: ubuntu-latest
.gitignore CHANGED
@@ -29,3 +29,11 @@ cdk.context.json
29
  .quarto/*
30
  /.quarto/
31
  /_site/
 
 
 
 
 
 
 
 
 
29
  .quarto/*
30
  /.quarto/
31
  /_site/
32
+ test/config/*
33
+ test/feedback/*
34
+ test/input/*
35
+ test/logs/*
36
+ test/output/*
37
+ test/tmp/*
38
+ test/usage/*
39
+ .ruff_cache/*
cdk/cdk_functions.py CHANGED
@@ -856,14 +856,14 @@ def check_for_secret(secret_name: str, secret_value: dict = ""):
856
  try:
857
  # Try to get the secret. If it doesn't exist, a ResourceNotFoundException will be raised.
858
  secret_value = secretsmanager_client.get_secret_value(SecretId=secret_name)
859
- print(f"Secret '{secret_name}' already exists.")
860
  return True, secret_value
861
  except secretsmanager_client.exceptions.ResourceNotFoundException:
862
  print("Secret not found")
863
  return False, {}
864
  except Exception as e:
865
  # Handle other potential exceptions during the get operation
866
- print(f"Error checking for secret '{secret_name}': {e}")
867
  return False, {}
868
 
869
 
 
856
  try:
857
  # Try to get the secret. If it doesn't exist, a ResourceNotFoundException will be raised.
858
  secret_value = secretsmanager_client.get_secret_value(SecretId=secret_name)
859
+ print("Secret already exists.")
860
  return True, secret_value
861
  except secretsmanager_client.exceptions.ResourceNotFoundException:
862
  print("Secret not found")
863
  return False, {}
864
  except Exception as e:
865
  # Handle other potential exceptions during the get operation
866
+ print(f"Error checking for secret: {e}")
867
  return False, {}
868
 
869
 
test/run_tests.py CHANGED
@@ -12,7 +12,7 @@ import sys
12
  # Add the parent directory to the path so we can import the test module
13
  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
14
 
15
- from test.test import run_all_tests
16
 
17
  if __name__ == "__main__":
18
  print("Starting CLI Redaction Test Suite...")
 
12
  # Add the parent directory to the path so we can import the test module
13
  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
14
 
15
+ from test import run_all_tests
16
 
17
  if __name__ == "__main__":
18
  print("Starting CLI Redaction Test Suite...")
test/test.py CHANGED
@@ -1,11 +1,10 @@
1
  import os
2
  import shutil
3
  import subprocess
4
- import tempfile
5
- import unittest
6
  import sys
 
7
  import threading
8
- import time
9
  from typing import List, Optional
10
 
11
 
@@ -893,35 +892,40 @@ class TestGUIApp(unittest.TestCase):
893
  cls.app_path = os.path.join(
894
  os.path.dirname(os.path.dirname(__file__)), "app.py"
895
  )
896
-
897
  # Verify app.py exists
898
  if not os.path.isfile(cls.app_path):
899
  raise FileNotFoundError(f"App file not found: {cls.app_path}")
900
-
901
  print(f"GUI test setup complete. App: {cls.app_path}")
902
 
903
  def test_app_import_and_initialization(self):
904
  """Test: Import app.py and check if the Gradio app object is created successfully."""
905
  print("\n=== Testing GUI app import and initialization ===")
906
-
907
  try:
908
  # Add the parent directory to the path so we can import app
909
  parent_dir = os.path.dirname(os.path.dirname(__file__))
910
  if parent_dir not in sys.path:
911
  sys.path.insert(0, parent_dir)
912
-
913
  # Import the app module
914
  import app
915
-
916
  # Check if the app object exists and is a Gradio Blocks object
917
- self.assertTrue(hasattr(app, 'app'), "App object should exist in the module")
918
-
 
 
919
  # Check if it's a Gradio Blocks instance
920
  import gradio as gr
921
- self.assertIsInstance(app.app, gr.Blocks, "App should be a Gradio Blocks instance")
922
-
 
 
 
923
  print("✅ GUI app import and initialization passed")
924
-
925
  except ImportError as e:
926
  error_msg = f"Failed to import app module: {e}"
927
  if "gradio_image_annotation" in str(e):
@@ -935,41 +939,40 @@ class TestGUIApp(unittest.TestCase):
935
  def test_app_launch_headless(self):
936
  """Test: Launch the app in headless mode to verify it starts without errors."""
937
  print("\n=== Testing GUI app launch in headless mode ===")
938
-
939
  try:
940
  # Add the parent directory to the path
941
  parent_dir = os.path.dirname(os.path.dirname(__file__))
942
  if parent_dir not in sys.path:
943
  sys.path.insert(0, parent_dir)
944
-
945
  # Import the app module
 
946
  import app
947
- import gradio as gr
948
-
949
  # Set up a flag to track if the app launched successfully
950
  app_launched = threading.Event()
951
  launch_error = None
952
-
953
  def launch_app():
954
  try:
955
  # Launch the app in headless mode with a short timeout
956
  app.app.launch(
957
  show_error=True,
958
  inbrowser=False, # Don't open browser
959
- server_port=0, # Use any available port
960
- quiet=True, # Suppress output
961
- prevent_thread_lock=True # Don't block the main thread
962
  )
963
  app_launched.set()
964
- except Exception as e:
965
- launch_error = e
966
  app_launched.set()
967
-
968
  # Start the app in a separate thread
969
  launch_thread = threading.Thread(target=launch_app)
970
  launch_thread.daemon = True
971
  launch_thread.start()
972
-
973
  # Wait for the app to launch (with timeout)
974
  if app_launched.wait(timeout=10): # 10 second timeout
975
  if launch_error:
@@ -978,7 +981,7 @@ class TestGUIApp(unittest.TestCase):
978
  print("✅ GUI app launch in headless mode passed")
979
  else:
980
  self.fail("App launch timed out after 10 seconds")
981
-
982
  except Exception as e:
983
  error_msg = f"Unexpected error during app launch test: {e}"
984
  if "gradio_image_annotation" in str(e):
@@ -990,33 +993,39 @@ class TestGUIApp(unittest.TestCase):
990
  def test_app_configuration_loading(self):
991
  """Test: Verify that the app can load its configuration without errors."""
992
  print("\n=== Testing GUI app configuration loading ===")
993
-
994
  try:
995
  # Add the parent directory to the path
996
  parent_dir = os.path.dirname(os.path.dirname(__file__))
997
  if parent_dir not in sys.path:
998
  sys.path.insert(0, parent_dir)
999
-
1000
- # Import the app module
1001
- import app
1002
-
1003
  # Check if key configuration variables are accessible
1004
  # These should be imported from tools.config
1005
  from tools.config import (
 
1006
  GRADIO_SERVER_PORT,
1007
  MAX_FILE_SIZE,
1008
- DEFAULT_LANGUAGE,
1009
- PII_DETECTION_MODELS
1010
  )
1011
-
1012
  # Verify these are not None/empty
1013
- self.assertIsNotNone(GRADIO_SERVER_PORT, "GRADIO_SERVER_PORT should be configured")
 
 
1014
  self.assertIsNotNone(MAX_FILE_SIZE, "MAX_FILE_SIZE should be configured")
1015
- self.assertIsNotNone(DEFAULT_LANGUAGE, "DEFAULT_LANGUAGE should be configured")
1016
- self.assertIsNotNone(PII_DETECTION_MODELS, "PII_DETECTION_MODELS should be configured")
1017
-
 
 
 
 
1018
  print("✅ GUI app configuration loading passed")
1019
-
1020
  except ImportError as e:
1021
  error_msg = f"Failed to import configuration: {e}"
1022
  if "gradio_image_annotation" in str(e):
@@ -1048,11 +1057,11 @@ def run_all_tests():
1048
  # Create test suite
1049
  loader = unittest.TestLoader()
1050
  suite = unittest.TestSuite()
1051
-
1052
  # Add CLI tests
1053
  cli_suite = loader.loadTestsFromTestCase(TestCLIRedactExamples)
1054
  suite.addTests(cli_suite)
1055
-
1056
  # Add GUI tests
1057
  gui_suite = loader.loadTestsFromTestCase(TestGUIApp)
1058
  suite.addTests(gui_suite)
 
1
  import os
2
  import shutil
3
  import subprocess
 
 
4
  import sys
5
+ import tempfile
6
  import threading
7
+ import unittest
8
  from typing import List, Optional
9
 
10
 
 
892
  cls.app_path = os.path.join(
893
  os.path.dirname(os.path.dirname(__file__)), "app.py"
894
  )
895
+
896
  # Verify app.py exists
897
  if not os.path.isfile(cls.app_path):
898
  raise FileNotFoundError(f"App file not found: {cls.app_path}")
899
+
900
  print(f"GUI test setup complete. App: {cls.app_path}")
901
 
902
  def test_app_import_and_initialization(self):
903
  """Test: Import app.py and check if the Gradio app object is created successfully."""
904
  print("\n=== Testing GUI app import and initialization ===")
905
+
906
  try:
907
  # Add the parent directory to the path so we can import app
908
  parent_dir = os.path.dirname(os.path.dirname(__file__))
909
  if parent_dir not in sys.path:
910
  sys.path.insert(0, parent_dir)
911
+
912
  # Import the app module
913
  import app
914
+
915
  # Check if the app object exists and is a Gradio Blocks object
916
+ self.assertTrue(
917
+ hasattr(app, "app"), "App object should exist in the module"
918
+ )
919
+
920
  # Check if it's a Gradio Blocks instance
921
  import gradio as gr
922
+
923
+ self.assertIsInstance(
924
+ app.app, gr.Blocks, "App should be a Gradio Blocks instance"
925
+ )
926
+
927
  print("✅ GUI app import and initialization passed")
928
+
929
  except ImportError as e:
930
  error_msg = f"Failed to import app module: {e}"
931
  if "gradio_image_annotation" in str(e):
 
939
  def test_app_launch_headless(self):
940
  """Test: Launch the app in headless mode to verify it starts without errors."""
941
  print("\n=== Testing GUI app launch in headless mode ===")
942
+
943
  try:
944
  # Add the parent directory to the path
945
  parent_dir = os.path.dirname(os.path.dirname(__file__))
946
  if parent_dir not in sys.path:
947
  sys.path.insert(0, parent_dir)
948
+
949
  # Import the app module
950
+
951
  import app
952
+
 
953
  # Set up a flag to track if the app launched successfully
954
  app_launched = threading.Event()
955
  launch_error = None
956
+
957
  def launch_app():
958
  try:
959
  # Launch the app in headless mode with a short timeout
960
  app.app.launch(
961
  show_error=True,
962
  inbrowser=False, # Don't open browser
963
+ server_port=0, # Use any available port
964
+ quiet=True, # Suppress output
965
+ prevent_thread_lock=True, # Don't block the main thread
966
  )
967
  app_launched.set()
968
+ except Exception:
 
969
  app_launched.set()
970
+
971
  # Start the app in a separate thread
972
  launch_thread = threading.Thread(target=launch_app)
973
  launch_thread.daemon = True
974
  launch_thread.start()
975
+
976
  # Wait for the app to launch (with timeout)
977
  if app_launched.wait(timeout=10): # 10 second timeout
978
  if launch_error:
 
981
  print("✅ GUI app launch in headless mode passed")
982
  else:
983
  self.fail("App launch timed out after 10 seconds")
984
+
985
  except Exception as e:
986
  error_msg = f"Unexpected error during app launch test: {e}"
987
  if "gradio_image_annotation" in str(e):
 
993
  def test_app_configuration_loading(self):
994
  """Test: Verify that the app can load its configuration without errors."""
995
  print("\n=== Testing GUI app configuration loading ===")
996
+
997
  try:
998
  # Add the parent directory to the path
999
  parent_dir = os.path.dirname(os.path.dirname(__file__))
1000
  if parent_dir not in sys.path:
1001
  sys.path.insert(0, parent_dir)
1002
+
1003
+ # Import the app module (not needed?)
1004
+ # import app
1005
+
1006
  # Check if key configuration variables are accessible
1007
  # These should be imported from tools.config
1008
  from tools.config import (
1009
+ DEFAULT_LANGUAGE,
1010
  GRADIO_SERVER_PORT,
1011
  MAX_FILE_SIZE,
1012
+ PII_DETECTION_MODELS,
 
1013
  )
1014
+
1015
  # Verify these are not None/empty
1016
+ self.assertIsNotNone(
1017
+ GRADIO_SERVER_PORT, "GRADIO_SERVER_PORT should be configured"
1018
+ )
1019
  self.assertIsNotNone(MAX_FILE_SIZE, "MAX_FILE_SIZE should be configured")
1020
+ self.assertIsNotNone(
1021
+ DEFAULT_LANGUAGE, "DEFAULT_LANGUAGE should be configured"
1022
+ )
1023
+ self.assertIsNotNone(
1024
+ PII_DETECTION_MODELS, "PII_DETECTION_MODELS should be configured"
1025
+ )
1026
+
1027
  print("✅ GUI app configuration loading passed")
1028
+
1029
  except ImportError as e:
1030
  error_msg = f"Failed to import configuration: {e}"
1031
  if "gradio_image_annotation" in str(e):
 
1057
  # Create test suite
1058
  loader = unittest.TestLoader()
1059
  suite = unittest.TestSuite()
1060
+
1061
  # Add CLI tests
1062
  cli_suite = loader.loadTestsFromTestCase(TestCLIRedactExamples)
1063
  suite.addTests(cli_suite)
1064
+
1065
  # Add GUI tests
1066
  gui_suite = loader.loadTestsFromTestCase(TestGUIApp)
1067
  suite.addTests(gui_suite)
test/test_gui_only.py CHANGED
@@ -8,9 +8,8 @@ Run this script to verify that the Gradio interface can be imported and initiali
8
 
9
  import os
10
  import sys
11
- import unittest
12
  import threading
13
- import time
14
 
15
  # Add the parent directory to the path so we can import the app
16
  parent_dir = os.path.dirname(os.path.dirname(__file__))
@@ -25,30 +24,35 @@ class TestGUIAppOnly(unittest.TestCase):
25
  def setUpClass(cls):
26
  """Set up test environment for GUI tests."""
27
  cls.app_path = os.path.join(parent_dir, "app.py")
28
-
29
  # Verify app.py exists
30
  if not os.path.isfile(cls.app_path):
31
  raise FileNotFoundError(f"App file not found: {cls.app_path}")
32
-
33
  print(f"GUI test setup complete. App: {cls.app_path}")
34
 
35
  def test_app_import_and_initialization(self):
36
  """Test: Import app.py and check if the Gradio app object is created successfully."""
37
  print("\n=== Testing GUI app import and initialization ===")
38
-
39
  try:
40
  # Import the app module
41
  import app
42
-
43
  # Check if the app object exists and is a Gradio Blocks object
44
- self.assertTrue(hasattr(app, 'app'), "App object should exist in the module")
45
-
 
 
46
  # Check if it's a Gradio Blocks instance
47
  import gradio as gr
48
- self.assertIsInstance(app.app, gr.Blocks, "App should be a Gradio Blocks instance")
49
-
 
 
 
50
  print("✅ GUI app import and initialization passed")
51
-
52
  except ImportError as e:
53
  error_msg = f"Failed to import app module: {e}"
54
  if "gradio_image_annotation" in str(e):
@@ -62,36 +66,35 @@ class TestGUIAppOnly(unittest.TestCase):
62
  def test_app_launch_headless(self):
63
  """Test: Launch the app in headless mode to verify it starts without errors."""
64
  print("\n=== Testing GUI app launch in headless mode ===")
65
-
66
  try:
67
  # Import the app module
 
68
  import app
69
- import gradio as gr
70
-
71
  # Set up a flag to track if the app launched successfully
72
  app_launched = threading.Event()
73
  launch_error = None
74
-
75
  def launch_app():
76
  try:
77
  # Launch the app in headless mode with a short timeout
78
  app.app.launch(
79
  show_error=True,
80
  inbrowser=False, # Don't open browser
81
- server_port=0, # Use any available port
82
- quiet=True, # Suppress output
83
- prevent_thread_lock=True # Don't block the main thread
84
  )
85
  app_launched.set()
86
- except Exception as e:
87
- launch_error = e
88
  app_launched.set()
89
-
90
  # Start the app in a separate thread
91
  launch_thread = threading.Thread(target=launch_app)
92
  launch_thread.daemon = True
93
  launch_thread.start()
94
-
95
  # Wait for the app to launch (with timeout)
96
  if app_launched.wait(timeout=10): # 10 second timeout
97
  if launch_error:
@@ -100,7 +103,7 @@ class TestGUIAppOnly(unittest.TestCase):
100
  print("✅ GUI app launch in headless mode passed")
101
  else:
102
  self.fail("App launch timed out after 10 seconds")
103
-
104
  except Exception as e:
105
  error_msg = f"Unexpected error during app launch test: {e}"
106
  if "gradio_image_annotation" in str(e):
@@ -112,28 +115,34 @@ class TestGUIAppOnly(unittest.TestCase):
112
  def test_app_configuration_loading(self):
113
  """Test: Verify that the app can load its configuration without errors."""
114
  print("\n=== Testing GUI app configuration loading ===")
115
-
116
  try:
117
- # Import the app module
118
- import app
119
-
120
  # Check if key configuration variables are accessible
121
  # These should be imported from tools.config
122
  from tools.config import (
 
123
  GRADIO_SERVER_PORT,
124
  MAX_FILE_SIZE,
125
- DEFAULT_LANGUAGE,
126
- PII_DETECTION_MODELS
127
  )
128
-
129
  # Verify these are not None/empty
130
- self.assertIsNotNone(GRADIO_SERVER_PORT, "GRADIO_SERVER_PORT should be configured")
 
 
131
  self.assertIsNotNone(MAX_FILE_SIZE, "MAX_FILE_SIZE should be configured")
132
- self.assertIsNotNone(DEFAULT_LANGUAGE, "DEFAULT_LANGUAGE should be configured")
133
- self.assertIsNotNone(PII_DETECTION_MODELS, "PII_DETECTION_MODELS should be configured")
134
-
 
 
 
 
135
  print("✅ GUI app configuration loading passed")
136
-
137
  except ImportError as e:
138
  error_msg = f"Failed to import configuration: {e}"
139
  if "gradio_image_annotation" in str(e):
 
8
 
9
  import os
10
  import sys
 
11
  import threading
12
+ import unittest
13
 
14
  # Add the parent directory to the path so we can import the app
15
  parent_dir = os.path.dirname(os.path.dirname(__file__))
 
24
  def setUpClass(cls):
25
  """Set up test environment for GUI tests."""
26
  cls.app_path = os.path.join(parent_dir, "app.py")
27
+
28
  # Verify app.py exists
29
  if not os.path.isfile(cls.app_path):
30
  raise FileNotFoundError(f"App file not found: {cls.app_path}")
31
+
32
  print(f"GUI test setup complete. App: {cls.app_path}")
33
 
34
  def test_app_import_and_initialization(self):
35
  """Test: Import app.py and check if the Gradio app object is created successfully."""
36
  print("\n=== Testing GUI app import and initialization ===")
37
+
38
  try:
39
  # Import the app module
40
  import app
41
+
42
  # Check if the app object exists and is a Gradio Blocks object
43
+ self.assertTrue(
44
+ hasattr(app, "app"), "App object should exist in the module"
45
+ )
46
+
47
  # Check if it's a Gradio Blocks instance
48
  import gradio as gr
49
+
50
+ self.assertIsInstance(
51
+ app.app, gr.Blocks, "App should be a Gradio Blocks instance"
52
+ )
53
+
54
  print("✅ GUI app import and initialization passed")
55
+
56
  except ImportError as e:
57
  error_msg = f"Failed to import app module: {e}"
58
  if "gradio_image_annotation" in str(e):
 
66
  def test_app_launch_headless(self):
67
  """Test: Launch the app in headless mode to verify it starts without errors."""
68
  print("\n=== Testing GUI app launch in headless mode ===")
69
+
70
  try:
71
  # Import the app module
72
+
73
  import app
74
+
 
75
  # Set up a flag to track if the app launched successfully
76
  app_launched = threading.Event()
77
  launch_error = None
78
+
79
  def launch_app():
80
  try:
81
  # Launch the app in headless mode with a short timeout
82
  app.app.launch(
83
  show_error=True,
84
  inbrowser=False, # Don't open browser
85
+ server_port=0, # Use any available port
86
+ quiet=True, # Suppress output
87
+ prevent_thread_lock=True, # Don't block the main thread
88
  )
89
  app_launched.set()
90
+ except Exception:
 
91
  app_launched.set()
92
+
93
  # Start the app in a separate thread
94
  launch_thread = threading.Thread(target=launch_app)
95
  launch_thread.daemon = True
96
  launch_thread.start()
97
+
98
  # Wait for the app to launch (with timeout)
99
  if app_launched.wait(timeout=10): # 10 second timeout
100
  if launch_error:
 
103
  print("✅ GUI app launch in headless mode passed")
104
  else:
105
  self.fail("App launch timed out after 10 seconds")
106
+
107
  except Exception as e:
108
  error_msg = f"Unexpected error during app launch test: {e}"
109
  if "gradio_image_annotation" in str(e):
 
115
  def test_app_configuration_loading(self):
116
  """Test: Verify that the app can load its configuration without errors."""
117
  print("\n=== Testing GUI app configuration loading ===")
118
+
119
  try:
120
+ # Import the app module (not necessary here?)
121
+ # import app
122
+
123
  # Check if key configuration variables are accessible
124
  # These should be imported from tools.config
125
  from tools.config import (
126
+ DEFAULT_LANGUAGE,
127
  GRADIO_SERVER_PORT,
128
  MAX_FILE_SIZE,
129
+ PII_DETECTION_MODELS,
 
130
  )
131
+
132
  # Verify these are not None/empty
133
+ self.assertIsNotNone(
134
+ GRADIO_SERVER_PORT, "GRADIO_SERVER_PORT should be configured"
135
+ )
136
  self.assertIsNotNone(MAX_FILE_SIZE, "MAX_FILE_SIZE should be configured")
137
+ self.assertIsNotNone(
138
+ DEFAULT_LANGUAGE, "DEFAULT_LANGUAGE should be configured"
139
+ )
140
+ self.assertIsNotNone(
141
+ PII_DETECTION_MODELS, "PII_DETECTION_MODELS should be configured"
142
+ )
143
+
144
  print("✅ GUI app configuration loading passed")
145
+
146
  except ImportError as e:
147
  error_msg = f"Failed to import configuration: {e}"
148
  if "gradio_image_annotation" in str(e):
tools/aws_functions.py CHANGED
@@ -10,6 +10,7 @@ from tools.config import (
10
  RUN_AWS_FUNCTIONS,
11
  SAVE_LOGS_TO_CSV,
12
  )
 
13
 
14
  PandasDataFrame = Type[pd.DataFrame]
15
 
@@ -90,7 +91,7 @@ def download_folder_from_s3(
90
  for obj in response.get("Contents", []):
91
  # Extract object key and construct local file path
92
  object_key = obj["Key"]
93
- local_file_path = os.path.join(
94
  local_folder, os.path.relpath(object_key, s3_folder)
95
  )
96
 
@@ -143,8 +144,8 @@ def download_files_from_s3(
143
  print("Found filenames in AWS folder: ", filenames)
144
 
145
  for filename in filenames:
146
- object_key = os.path.join(s3_folder, filename)
147
- local_file_path = os.path.join(local_folder, filename)
148
 
149
  # Create directories if necessary
150
  os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
 
10
  RUN_AWS_FUNCTIONS,
11
  SAVE_LOGS_TO_CSV,
12
  )
13
+ from tools.secure_path_utils import secure_join
14
 
15
  PandasDataFrame = Type[pd.DataFrame]
16
 
 
91
  for obj in response.get("Contents", []):
92
  # Extract object key and construct local file path
93
  object_key = obj["Key"]
94
+ local_file_path = secure_join(
95
  local_folder, os.path.relpath(object_key, s3_folder)
96
  )
97
 
 
144
  print("Found filenames in AWS folder: ", filenames)
145
 
146
  for filename in filenames:
147
+ object_key = secure_join(s3_folder, filename)
148
+ local_file_path = secure_join(local_folder, filename)
149
 
150
  # Create directories if necessary
151
  os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
tools/aws_textract.py CHANGED
@@ -16,6 +16,7 @@ from tools.config import (
16
  RUN_AWS_FUNCTIONS,
17
  )
18
  from tools.custom_image_analyser_engine import CustomImageRecognizerResult, OCRResult
 
19
 
20
 
21
  def extract_textract_metadata(response: object):
@@ -478,8 +479,8 @@ def load_and_convert_textract_json(
478
  log_files_output_paths.append(textract_json_file_path)
479
 
480
  try:
481
- with open(textract_json_file_path, "r", encoding="utf-8") as json_file:
482
- textract_data = json.load(json_file)
483
  except json.JSONDecodeError:
484
  print("Error: Failed to parse Textract JSON file. Returning empty data.")
485
  return {}, True, log_files_output_paths # Indicate failure
 
16
  RUN_AWS_FUNCTIONS,
17
  )
18
  from tools.custom_image_analyser_engine import CustomImageRecognizerResult, OCRResult
19
+ from tools.secure_path_utils import secure_file_read
20
 
21
 
22
  def extract_textract_metadata(response: object):
 
479
  log_files_output_paths.append(textract_json_file_path)
480
 
481
  try:
482
+ json_content = secure_file_read(textract_json_file_path, encoding="utf-8")
483
+ textract_data = json.loads(json_content)
484
  except json.JSONDecodeError:
485
  print("Error: Failed to parse Textract JSON file. Returning empty data.")
486
  return {}, True, log_files_output_paths # Indicate failure
tools/config.py CHANGED
@@ -382,7 +382,7 @@ CHOSEN_LOCAL_OCR_MODEL = get_or_create_env_var(
382
  ) # Choose between "tesseract", "hybrid", and "paddle". "paddle" will only return whole line text extraction, and so will only work for OCR, not redaction. "hybrid" is a combination of the two - first pass through the redactions will be done with Tesseract, and then a second pass will be done with PaddleOCR on words with low confidence.
383
 
384
  PREPROCESS_LOCAL_OCR_IMAGES = get_or_create_env_var(
385
- "PREPROCESS_LOCAL_OCR_IMAGES", "True"
386
  ) # Whether to try and preprocess images before extracting text. NOTE: I have found in testing that this doesn't necessarily imporove results, and greatly slows down extraction.
387
 
388
  # Entities for redaction
 
382
  ) # Choose between "tesseract", "hybrid", and "paddle". "paddle" will only return whole line text extraction, and so will only work for OCR, not redaction. "hybrid" is a combination of the two - first pass through the redactions will be done with Tesseract, and then a second pass will be done with PaddleOCR on words with low confidence.
383
 
384
  PREPROCESS_LOCAL_OCR_IMAGES = get_or_create_env_var(
385
+ "PREPROCESS_LOCAL_OCR_IMAGES", "False"
386
  ) # Whether to try and preprocess images before extracting text. NOTE: I have found in testing that this doesn't necessarily imporove results, and greatly slows down extraction.
387
 
388
  # Entities for redaction
tools/custom_csvlogger.py CHANGED
@@ -2,7 +2,6 @@ from __future__ import annotations
2
 
3
  import csv
4
  import os
5
- import re
6
  import time
7
  import uuid
8
  from collections.abc import Sequence
@@ -105,10 +104,17 @@ class CSVLogger_custom(FlaggingCallback):
105
  self.dataset_filepath = self.flagging_dir / self.dataset_file_name
106
  elif dataset_files:
107
  try:
 
 
 
 
108
  latest_file = max(
109
- dataset_files, key=lambda f: int(re.findall(r"\d+", f.stem)[0])
 
 
 
 
110
  )
111
- latest_num = int(re.findall(r"\d+", latest_file.stem)[0])
112
 
113
  with open(latest_file, newline="", encoding="utf-8") as csvfile:
114
  reader = csv.reader(csvfile)
 
2
 
3
  import csv
4
  import os
 
5
  import time
6
  import uuid
7
  from collections.abc import Sequence
 
104
  self.dataset_filepath = self.flagging_dir / self.dataset_file_name
105
  elif dataset_files:
106
  try:
107
+ from tools.secure_regex_utils import (
108
+ safe_extract_latest_number_from_filename,
109
+ )
110
+
111
  latest_file = max(
112
+ dataset_files,
113
+ key=lambda f: safe_extract_latest_number_from_filename(f.stem) or 0,
114
+ )
115
+ latest_num = (
116
+ safe_extract_latest_number_from_filename(latest_file.stem) or 0
117
  )
 
118
 
119
  with open(latest_file, newline="", encoding="utf-8") as csvfile:
120
  reader = csv.reader(csvfile)
tools/custom_image_analyser_engine.py CHANGED
@@ -524,12 +524,9 @@ class CustomImageAnalyzerEngine:
524
  # Remove or replace invalid filename characters
525
  # Windows: < > : " | ? * \ /
526
  # Unix: / (forward slash)
527
- # Also remove control characters and other problematic chars
528
- invalid_chars = r'[<>:"|?*\\/\x00-\x1f\x7f-\x9f]'
529
- sanitized = re.sub(invalid_chars, "_", text)
530
 
531
- # Replace multiple consecutive underscores with a single one
532
- sanitized = re.sub(r"_+", "_", sanitized)
533
 
534
  # Remove leading/trailing underscores and spaces
535
  sanitized = sanitized.strip("_ ")
 
524
  # Remove or replace invalid filename characters
525
  # Windows: < > : " | ? * \ /
526
  # Unix: / (forward slash)
527
+ from tools.secure_regex_utils import safe_sanitize_text
 
 
528
 
529
+ sanitized = safe_sanitize_text(text)
 
530
 
531
  # Remove leading/trailing underscores and spaces
532
  sanitized = sanitized.strip("_ ")
tools/data_anonymise.py CHANGED
@@ -1,6 +1,5 @@
1
  import base64
2
  import os
3
- import re
4
  import secrets
5
  import time
6
  import unicodedata
@@ -20,7 +19,7 @@ from presidio_analyzer import (
20
  AnalyzerEngine,
21
  BatchAnalyzerEngine,
22
  DictAnalyzerResult,
23
- RecognizerResult
24
  )
25
  from presidio_anonymizer import AnonymizerEngine, BatchAnonymizerEngine
26
  from presidio_anonymizer.entities import OperatorConfig
@@ -57,6 +56,7 @@ from tools.load_spacy_model_custom_recognisers import (
57
 
58
  # Use custom version of analyze_dict to be able to track progress
59
  from tools.presidio_analyzer_custom import analyze_dict
 
60
 
61
  if DO_INITIAL_TABULAR_DATA_CLEAN == "True":
62
  DO_INITIAL_TABULAR_DATA_CLEAN = True
@@ -406,22 +406,21 @@ def handle_docx_anonymisation(
406
  base_name = os.path.basename(file_path)
407
  file_name_without_ext = os.path.splitext(base_name)[0]
408
 
409
- output_docx_path = os.path.join(
410
  output_folder, f"{file_name_without_ext}_redacted.docx"
411
  )
412
- log_file_path = os.path.join(
413
  output_folder, f"{file_name_without_ext}_redacted_log.txt"
414
  )
415
 
416
- output_xlsx_path = os.path.join(
417
  output_folder, f"{file_name_without_ext}_redacted.csv"
418
  )
419
 
420
  anonymised_df.to_csv(output_xlsx_path, encoding="utf-8-sig", index=None)
421
  doc.save(output_docx_path)
422
 
423
- with open(log_file_path, "w", encoding="utf-8-sig") as f:
424
- f.write(decision_log)
425
 
426
  return output_docx_path, log_file_path, output_xlsx_path, comprehend_query_number
427
 
@@ -542,8 +541,6 @@ def anonymise_files_with_open_text(
542
  print(
543
  "Connecting to Comprehend using AWS access key and secret keys from textboxes."
544
  )
545
- print("aws_access_key_textbox:", aws_access_key_textbox)
546
- print("aws_secret_access_key:", aws_secret_key_textbox)
547
  comprehend_client = boto3.client(
548
  "comprehend",
549
  aws_access_key_id=aws_access_key_textbox,
@@ -801,7 +798,10 @@ def anonymise_files_with_open_text(
801
  + "\n\nGo to to the Redaction settings tab to see redaction logs. Please give feedback on the results below to help improve this app."
802
  )
803
 
804
- out_message_out = re.sub(r"^\n+|^\. ", "", out_message_out).strip()
 
 
 
805
 
806
  return (
807
  out_message_out,
@@ -1004,8 +1004,7 @@ def tabular_anonymise_wrapper_func(
1004
  + excel_sheet_name
1005
  + "_decision_process_output.txt"
1006
  )
1007
- with open(decision_process_log_output_file, "w") as f:
1008
- f.write(decision_process_output_str)
1009
 
1010
  else:
1011
  anon_export_file_name = (
@@ -1016,8 +1015,7 @@ def tabular_anonymise_wrapper_func(
1016
  decision_process_log_output_file = (
1017
  anon_export_file_name + "_decision_process_output.txt"
1018
  )
1019
- with open(decision_process_log_output_file, "w") as f:
1020
- f.write(decision_process_output_str)
1021
 
1022
  out_file_paths.append(anon_export_file_name)
1023
  log_files_output_paths.append(decision_process_log_output_file)
@@ -1296,11 +1294,9 @@ def anonymise_script(
1296
  redact_config = {"DEFAULT": OperatorConfig("redact")}
1297
  hash_config = {"DEFAULT": OperatorConfig("hash")}
1298
  mask_config = {
1299
- "DEFAULT": OperatorConfig("mask", {
1300
- "masking_char": "*",
1301
- "chars_to_mask": 100,
1302
- "from_end": True
1303
- })
1304
  }
1305
  people_encrypt_config = {
1306
  "PERSON": OperatorConfig("encrypt", {"key": key_string})
@@ -1343,7 +1339,8 @@ def anonymise_script(
1343
  combined_config = {**chosen_mask_config}
1344
 
1345
  anonymizer_results = batch_anonymizer.anonymize_dict(
1346
- analyzer_results, operators=combined_config)
 
1347
 
1348
  scrubbed_df = pd.DataFrame(anonymizer_results)
1349
 
 
1
  import base64
2
  import os
 
3
  import secrets
4
  import time
5
  import unicodedata
 
19
  AnalyzerEngine,
20
  BatchAnalyzerEngine,
21
  DictAnalyzerResult,
22
+ RecognizerResult,
23
  )
24
  from presidio_anonymizer import AnonymizerEngine, BatchAnonymizerEngine
25
  from presidio_anonymizer.entities import OperatorConfig
 
56
 
57
  # Use custom version of analyze_dict to be able to track progress
58
  from tools.presidio_analyzer_custom import analyze_dict
59
+ from tools.secure_path_utils import secure_file_write, secure_join
60
 
61
  if DO_INITIAL_TABULAR_DATA_CLEAN == "True":
62
  DO_INITIAL_TABULAR_DATA_CLEAN = True
 
406
  base_name = os.path.basename(file_path)
407
  file_name_without_ext = os.path.splitext(base_name)[0]
408
 
409
+ output_docx_path = secure_join(
410
  output_folder, f"{file_name_without_ext}_redacted.docx"
411
  )
412
+ log_file_path = secure_join(
413
  output_folder, f"{file_name_without_ext}_redacted_log.txt"
414
  )
415
 
416
+ output_xlsx_path = secure_join(
417
  output_folder, f"{file_name_without_ext}_redacted.csv"
418
  )
419
 
420
  anonymised_df.to_csv(output_xlsx_path, encoding="utf-8-sig", index=None)
421
  doc.save(output_docx_path)
422
 
423
+ secure_file_write(log_file_path, decision_log, encoding="utf-8-sig")
 
424
 
425
  return output_docx_path, log_file_path, output_xlsx_path, comprehend_query_number
426
 
 
541
  print(
542
  "Connecting to Comprehend using AWS access key and secret keys from textboxes."
543
  )
 
 
544
  comprehend_client = boto3.client(
545
  "comprehend",
546
  aws_access_key_id=aws_access_key_textbox,
 
798
  + "\n\nGo to to the Redaction settings tab to see redaction logs. Please give feedback on the results below to help improve this app."
799
  )
800
 
801
+ from tools.secure_regex_utils import safe_remove_leading_newlines
802
+
803
+ out_message_out = safe_remove_leading_newlines(out_message_out)
804
+ out_message_out = out_message_out.lstrip(". ")
805
 
806
  return (
807
  out_message_out,
 
1004
  + excel_sheet_name
1005
  + "_decision_process_output.txt"
1006
  )
1007
+ secure_file_write(decision_process_log_output_file, decision_process_output_str)
 
1008
 
1009
  else:
1010
  anon_export_file_name = (
 
1015
  decision_process_log_output_file = (
1016
  anon_export_file_name + "_decision_process_output.txt"
1017
  )
1018
+ secure_file_write(decision_process_log_output_file, decision_process_output_str)
 
1019
 
1020
  out_file_paths.append(anon_export_file_name)
1021
  log_files_output_paths.append(decision_process_log_output_file)
 
1294
  redact_config = {"DEFAULT": OperatorConfig("redact")}
1295
  hash_config = {"DEFAULT": OperatorConfig("hash")}
1296
  mask_config = {
1297
+ "DEFAULT": OperatorConfig(
1298
+ "mask", {"masking_char": "*", "chars_to_mask": 100, "from_end": True}
1299
+ )
 
 
1300
  }
1301
  people_encrypt_config = {
1302
  "PERSON": OperatorConfig("encrypt", {"key": key_string})
 
1339
  combined_config = {**chosen_mask_config}
1340
 
1341
  anonymizer_results = batch_anonymizer.anonymize_dict(
1342
+ analyzer_results, operators=combined_config
1343
+ )
1344
 
1345
  scrubbed_df = pd.DataFrame(anonymizer_results)
1346
 
tools/file_conversion.py CHANGED
@@ -34,6 +34,7 @@ from tools.config import (
34
  TEXTRACT_TEXT_EXTRACT_OPTION,
35
  )
36
  from tools.helper_functions import get_file_name_without_type, read_file
 
37
 
38
  # from tools.aws_textract import load_and_convert_textract_json
39
 
@@ -143,8 +144,8 @@ def process_single_page_for_image_conversion(
143
  if create_images is True:
144
  try:
145
  # Construct the full output directory path
146
- image_output_dir = os.path.join(os.getcwd(), input_folder)
147
- out_path = os.path.join(
148
  image_output_dir, f"{os.path.basename(pdf_path)}_{page_num}.png"
149
  )
150
  os.makedirs(os.path.dirname(out_path), exist_ok=True)
@@ -914,8 +915,8 @@ def prepare_image_or_pdf(
914
 
915
  if (file_extension in [".json"]) & (prepare_for_review is True):
916
  if isinstance(file_path, str):
917
- with open(file_path, "r") as json_file:
918
- all_annotations_object = json.load(json_file)
919
  else:
920
  # Assuming file_path is a NamedString or similar
921
  all_annotations_object = json.loads(
@@ -936,7 +937,7 @@ def prepare_image_or_pdf(
936
  else:
937
  output_textract_json_file_name = file_path_without_ext + ".json"
938
 
939
- out_textract_path = os.path.join(
940
  output_folder, output_textract_json_file_name
941
  )
942
 
@@ -956,7 +957,7 @@ def prepare_image_or_pdf(
956
  # if not file_path.endswith("_ocr_results_with_words.json"): output_ocr_results_with_words_json_file_name = file_path_without_ext + "_ocr_results_with_words.json"
957
  # else: output_ocr_results_with_words_json_file_name = file_path_without_ext + ".json"
958
 
959
- out_ocr_results_with_words_path = os.path.join(
960
  output_folder, output_ocr_results_with_words_json_file_name
961
  )
962
 
@@ -1026,10 +1027,12 @@ def prepare_image_or_pdf(
1026
  if all_annotations_object:
1027
 
1028
  # Get list of page numbers
 
 
1029
  image_file_paths_pages = [
1030
- int(re.search(r"_(\d+)\.png$", os.path.basename(s)).group(1))
1031
  for s in image_file_paths
1032
- if re.search(r"_(\d+)\.png$", os.path.basename(s))
1033
  ]
1034
  image_file_paths_pages = [int(i) for i in image_file_paths_pages]
1035
 
@@ -1046,15 +1049,19 @@ def prepare_image_or_pdf(
1046
  try:
1047
  if not annotation:
1048
  annotation = {"image": "", "boxes": []}
1049
- annotation_page_number = int(
1050
- re.search(r"_(\d+)\.png$", image_file_path).group(1)
1051
  )
 
 
1052
  else:
1053
- annotation_page_number = int(
1054
- re.search(
1055
- r"_(\d+)\.png$", annotation["image"]
1056
- ).group(1)
1057
  )
 
 
1058
  except Exception as e:
1059
  print("Extracting page number from image failed due to:", e)
1060
  annotation_page_number = 0
@@ -1110,7 +1117,7 @@ def prepare_image_or_pdf(
1110
  if file_extension in [".zip"]:
1111
 
1112
  # Assume it's a Textract response object. Copy it to the output folder so it can be used later.
1113
- out_folder = os.path.join(
1114
  output_folder, file_path_without_ext + "_textract.json"
1115
  )
1116
 
@@ -1125,7 +1132,7 @@ def prepare_image_or_pdf(
1125
  json_filename = json_files[0]
1126
 
1127
  # Extract the JSON file to the same directory as the ZIP file
1128
- extracted_path = os.path.join(
1129
  os.path.dirname(file_path), json_filename
1130
  )
1131
  zip_ref.extract(json_filename, os.path.dirname(file_path))
 
34
  TEXTRACT_TEXT_EXTRACT_OPTION,
35
  )
36
  from tools.helper_functions import get_file_name_without_type, read_file
37
+ from tools.secure_path_utils import secure_file_read, secure_join
38
 
39
  # from tools.aws_textract import load_and_convert_textract_json
40
 
 
144
  if create_images is True:
145
  try:
146
  # Construct the full output directory path
147
+ image_output_dir = secure_join(os.getcwd(), input_folder)
148
+ out_path = secure_join(
149
  image_output_dir, f"{os.path.basename(pdf_path)}_{page_num}.png"
150
  )
151
  os.makedirs(os.path.dirname(out_path), exist_ok=True)
 
915
 
916
  if (file_extension in [".json"]) & (prepare_for_review is True):
917
  if isinstance(file_path, str):
918
+ json_content = secure_file_read(file_path)
919
+ all_annotations_object = json.loads(json_content)
920
  else:
921
  # Assuming file_path is a NamedString or similar
922
  all_annotations_object = json.loads(
 
937
  else:
938
  output_textract_json_file_name = file_path_without_ext + ".json"
939
 
940
+ out_textract_path = secure_join(
941
  output_folder, output_textract_json_file_name
942
  )
943
 
 
957
  # if not file_path.endswith("_ocr_results_with_words.json"): output_ocr_results_with_words_json_file_name = file_path_without_ext + "_ocr_results_with_words.json"
958
  # else: output_ocr_results_with_words_json_file_name = file_path_without_ext + ".json"
959
 
960
+ out_ocr_results_with_words_path = secure_join(
961
  output_folder, output_ocr_results_with_words_json_file_name
962
  )
963
 
 
1027
  if all_annotations_object:
1028
 
1029
  # Get list of page numbers
1030
+ from tools.secure_regex_utils import safe_extract_page_number_from_path
1031
+
1032
  image_file_paths_pages = [
1033
+ safe_extract_page_number_from_path(s)
1034
  for s in image_file_paths
1035
+ if safe_extract_page_number_from_path(s) is not None
1036
  ]
1037
  image_file_paths_pages = [int(i) for i in image_file_paths_pages]
1038
 
 
1049
  try:
1050
  if not annotation:
1051
  annotation = {"image": "", "boxes": []}
1052
+ annotation_page_number = (
1053
+ safe_extract_page_number_from_path(image_file_path)
1054
  )
1055
+ if annotation_page_number is None:
1056
+ continue
1057
  else:
1058
+ annotation_page_number = (
1059
+ safe_extract_page_number_from_path(
1060
+ annotation["image"]
1061
+ )
1062
  )
1063
+ if annotation_page_number is None:
1064
+ continue
1065
  except Exception as e:
1066
  print("Extracting page number from image failed due to:", e)
1067
  annotation_page_number = 0
 
1117
  if file_extension in [".zip"]:
1118
 
1119
  # Assume it's a Textract response object. Copy it to the output folder so it can be used later.
1120
+ out_folder = secure_join(
1121
  output_folder, file_path_without_ext + "_textract.json"
1122
  )
1123
 
 
1132
  json_filename = json_files[0]
1133
 
1134
  # Extract the JSON file to the same directory as the ZIP file
1135
+ extracted_path = secure_join(
1136
  os.path.dirname(file_path), json_filename
1137
  )
1138
  zip_ref.extract(json_filename, os.path.dirname(file_path))
tools/file_redaction.py CHANGED
@@ -2,7 +2,6 @@ import copy
2
  import io
3
  import json
4
  import os
5
- import re
6
  import time
7
  from collections import defaultdict # For efficient grouping
8
  from typing import Any, Dict, List, Optional, Tuple
@@ -94,6 +93,7 @@ from tools.load_spacy_model_custom_recognisers import (
94
  nlp_analyser,
95
  score_threshold,
96
  )
 
97
 
98
  ImageFile.LOAD_TRUNCATED_IMAGES = LOAD_TRUNCATED_IMAGES.lower() == "true"
99
  if not MAX_IMAGE_PIXELS:
@@ -130,11 +130,10 @@ def sum_numbers_before_seconds(string: str):
130
  The sum of all numbers before 'seconds' in the string.
131
  """
132
 
133
- # Extract numbers before 'seconds' using regular expression
134
- numbers = re.findall(r"(\d+\.\d+)?\s*seconds", string)
135
 
136
- # Extract the numbers from the matches
137
- numbers = [float(num.split()[0]) for num in numbers]
138
 
139
  # Sum up the extracted numbers
140
  sum_of_numbers = round(sum(numbers), 1)
@@ -445,7 +444,9 @@ def choose_and_run_redactor(
445
  elif out_message:
446
  combined_out_message = combined_out_message + "\n" + out_message
447
 
448
- combined_out_message = re.sub(r"^\n+", "", combined_out_message).strip()
 
 
449
 
450
  end_message = "\n\nPlease review and modify the suggested redaction outputs on the 'Review redactions' tab of the app (you can find this under the introduction text at the top of the page)."
451
 
@@ -1304,8 +1305,9 @@ def choose_and_run_redactor(
1304
  output_folder + pdf_file_name_without_ext + "_textract_metadata.txt"
1305
  )
1306
 
1307
- with open(all_textract_request_metadata_file_path, "w") as f:
1308
- f.write(all_request_metadata_str)
 
1309
 
1310
  # Add the request metadata to the log outputs if not there already
1311
  if all_textract_request_metadata_file_path not in log_files_output_paths:
@@ -2785,10 +2787,10 @@ def redact_image_pdf(
2785
  if text_extraction_method == TEXTRACT_TEXT_EXTRACT_OPTION:
2786
  if original_textract_data != textract_data:
2787
  # Write the updated existing textract data back to the JSON file
2788
- with open(textract_json_file_path, "w") as json_file:
2789
- json.dump(
2790
- textract_data, json_file, separators=(",", ":")
2791
- ) # indent=4 makes the JSON file pretty-printed
2792
 
2793
  if textract_json_file_path not in log_files_output_paths:
2794
  log_files_output_paths.append(textract_json_file_path)
@@ -2848,10 +2850,10 @@ def redact_image_pdf(
2848
  if text_extraction_method == TEXTRACT_TEXT_EXTRACT_OPTION:
2849
  # Write the updated existing textract data back to the JSON file
2850
  if original_textract_data != textract_data:
2851
- with open(textract_json_file_path, "w") as json_file:
2852
- json.dump(
2853
- textract_data, json_file, separators=(",", ":")
2854
- ) # indent=4 makes the JSON file pretty-printed
2855
 
2856
  if textract_json_file_path not in log_files_output_paths:
2857
  log_files_output_paths.append(textract_json_file_path)
@@ -2907,10 +2909,10 @@ def redact_image_pdf(
2907
  # Write the updated existing textract data back to the JSON file
2908
 
2909
  if original_textract_data != textract_data:
2910
- with open(textract_json_file_path, "w") as json_file:
2911
- json.dump(
2912
- textract_data, json_file, separators=(",", ":")
2913
- ) # indent=4 makes the JSON file pretty-printed
2914
 
2915
  if textract_json_file_path not in log_files_output_paths:
2916
  log_files_output_paths.append(textract_json_file_path)
 
2
  import io
3
  import json
4
  import os
 
5
  import time
6
  from collections import defaultdict # For efficient grouping
7
  from typing import Any, Dict, List, Optional, Tuple
 
93
  nlp_analyser,
94
  score_threshold,
95
  )
96
+ from tools.secure_path_utils import secure_file_write
97
 
98
  ImageFile.LOAD_TRUNCATED_IMAGES = LOAD_TRUNCATED_IMAGES.lower() == "true"
99
  if not MAX_IMAGE_PIXELS:
 
130
  The sum of all numbers before 'seconds' in the string.
131
  """
132
 
133
+ # Extract numbers before 'seconds' using secure regex
134
+ from tools.secure_regex_utils import safe_extract_numbers_with_seconds
135
 
136
+ numbers = safe_extract_numbers_with_seconds(string)
 
137
 
138
  # Sum up the extracted numbers
139
  sum_of_numbers = round(sum(numbers), 1)
 
444
  elif out_message:
445
  combined_out_message = combined_out_message + "\n" + out_message
446
 
447
+ from tools.secure_regex_utils import safe_remove_leading_newlines
448
+
449
+ combined_out_message = safe_remove_leading_newlines(combined_out_message)
450
 
451
  end_message = "\n\nPlease review and modify the suggested redaction outputs on the 'Review redactions' tab of the app (you can find this under the introduction text at the top of the page)."
452
 
 
1305
  output_folder + pdf_file_name_without_ext + "_textract_metadata.txt"
1306
  )
1307
 
1308
+ secure_file_write(
1309
+ all_textract_request_metadata_file_path, all_request_metadata_str
1310
+ )
1311
 
1312
  # Add the request metadata to the log outputs if not there already
1313
  if all_textract_request_metadata_file_path not in log_files_output_paths:
 
2787
  if text_extraction_method == TEXTRACT_TEXT_EXTRACT_OPTION:
2788
  if original_textract_data != textract_data:
2789
  # Write the updated existing textract data back to the JSON file
2790
+ secure_file_write(
2791
+ textract_json_file_path,
2792
+ json.dumps(textract_data, separators=(",", ":")),
2793
+ )
2794
 
2795
  if textract_json_file_path not in log_files_output_paths:
2796
  log_files_output_paths.append(textract_json_file_path)
 
2850
  if text_extraction_method == TEXTRACT_TEXT_EXTRACT_OPTION:
2851
  # Write the updated existing textract data back to the JSON file
2852
  if original_textract_data != textract_data:
2853
+ secure_file_write(
2854
+ textract_json_file_path,
2855
+ json.dumps(textract_data, separators=(",", ":")),
2856
+ )
2857
 
2858
  if textract_json_file_path not in log_files_output_paths:
2859
  log_files_output_paths.append(textract_json_file_path)
 
2909
  # Write the updated existing textract data back to the JSON file
2910
 
2911
  if original_textract_data != textract_data:
2912
+ secure_file_write(
2913
+ textract_json_file_path,
2914
+ json.dumps(textract_data, separators=(",", ":")),
2915
+ )
2916
 
2917
  if textract_json_file_path not in log_files_output_paths:
2918
  log_files_output_paths.append(textract_json_file_path)
tools/find_duplicate_pages.py CHANGED
@@ -521,8 +521,9 @@ def clean_and_stem_text_series(df: pd.DataFrame, column: str):
521
  """
522
 
523
  def _clean_text(raw_text):
524
- # Remove HTML tags
525
- clean = re.sub(r"<.*?>", "", raw_text)
 
526
  clean = " ".join(clean.split())
527
  # Join the cleaned words back into a string
528
  return clean
@@ -1271,9 +1272,11 @@ def apply_whole_page_redactions_from_list(
1271
 
1272
  list_whole_pages_to_redact = []
1273
  for annotation in new_annotations_with_bounding_boxes:
1274
- match = re.search(r"_(\d+)\.png$", annotation["image"])
1275
- if match:
1276
- page = int(match.group(1)) + 1
 
 
1277
  list_whole_pages_to_redact.append(page)
1278
  else:
1279
  print(
 
521
  """
522
 
523
  def _clean_text(raw_text):
524
+ from tools.secure_regex_utils import safe_clean_text
525
+
526
+ clean = safe_clean_text(raw_text, remove_html=True)
527
  clean = " ".join(clean.split())
528
  # Join the cleaned words back into a string
529
  return clean
 
1272
 
1273
  list_whole_pages_to_redact = []
1274
  for annotation in new_annotations_with_bounding_boxes:
1275
+ from tools.secure_regex_utils import safe_extract_page_number_from_path
1276
+
1277
+ page_num = safe_extract_page_number_from_path(annotation["image"])
1278
+ if page_num is not None:
1279
+ page = page_num + 1
1280
  list_whole_pages_to_redact.append(page)
1281
  else:
1282
  print(
tools/find_duplicate_tabular.py CHANGED
@@ -1,5 +1,4 @@
1
  import os
2
- import re
3
  import time
4
  from pathlib import Path
5
  from typing import Dict, List, Tuple
@@ -19,6 +18,7 @@ from tools.config import (
19
  from tools.data_anonymise import initial_clean
20
  from tools.helper_functions import OUTPUT_FOLDER, read_file
21
  from tools.load_spacy_model_custom_recognisers import nlp
 
22
 
23
  if REMOVE_DUPLICATE_ROWS == "True":
24
  REMOVE_DUPLICATE_ROWS = True
@@ -345,9 +345,12 @@ def save_tabular_duplicate_results(
345
  original_file_extension = os.path.splitext(original_file)[-1]
346
  if original_file_extension in [".xlsx", ".xls"]:
347
 
348
- # Split the string using a regex to handle both .xlsx_ and .xls_ delimiters
349
- # The regex r'\.xlsx_|\.xls_' correctly matches either ".xlsx_" or ".xls_" as a delimiter.
350
- parts = re.split(r"\.xlsx_|\.xls_", os.path.basename(file_name))
 
 
 
351
  # The sheet name is the last part after splitting
352
  file_sheet_name = parts[-1]
353
 
@@ -430,12 +433,12 @@ def save_tabular_duplicate_results(
430
  file_ext = os.path.splitext(file_name)[-1]
431
 
432
  if file_ext in [".parquet"]:
433
- output_path = os.path.join(
434
  output_folder, f"{file_base_name}_deduplicated.parquet"
435
  )
436
  df_cleaned.to_parquet(output_path, index=False)
437
  else:
438
- output_path = os.path.join(
439
  output_folder, f"{file_base_name}_deduplicated.csv"
440
  )
441
  df_cleaned.to_csv(
@@ -451,7 +454,7 @@ def save_tabular_duplicate_results(
451
  # Create output filename
452
  file_base_name = os.path.splitext(os.path.basename(file_path))[0]
453
  file_ext = os.path.splitext(file_path)[-1]
454
- output_path = os.path.join(
455
  output_folder, f"{file_base_name}_deduplicated{file_ext}"
456
  )
457
 
@@ -513,7 +516,7 @@ def remove_duplicate_rows_from_tabular_data(
513
  file_stem = os.path.splitext(file_name)[0]
514
  file_ext = os.path.splitext(file_name)[-1]
515
 
516
- output_path = os.path.join(output_folder, f"{file_stem}_deduplicated{file_ext}")
517
 
518
  if file_ext in [".xlsx", ".xls"]:
519
  df_cleaned.to_excel(
 
1
  import os
 
2
  import time
3
  from pathlib import Path
4
  from typing import Dict, List, Tuple
 
18
  from tools.data_anonymise import initial_clean
19
  from tools.helper_functions import OUTPUT_FOLDER, read_file
20
  from tools.load_spacy_model_custom_recognisers import nlp
21
+ from tools.secure_path_utils import secure_join
22
 
23
  if REMOVE_DUPLICATE_ROWS == "True":
24
  REMOVE_DUPLICATE_ROWS = True
 
345
  original_file_extension = os.path.splitext(original_file)[-1]
346
  if original_file_extension in [".xlsx", ".xls"]:
347
 
348
+ # Split the string using secure regex to handle both .xlsx_ and .xls_ delimiters
349
+ from tools.secure_regex_utils import safe_split_filename
350
+
351
+ parts = safe_split_filename(
352
+ os.path.basename(file_name), [".xlsx_", ".xls_"]
353
+ )
354
  # The sheet name is the last part after splitting
355
  file_sheet_name = parts[-1]
356
 
 
433
  file_ext = os.path.splitext(file_name)[-1]
434
 
435
  if file_ext in [".parquet"]:
436
+ output_path = secure_join(
437
  output_folder, f"{file_base_name}_deduplicated.parquet"
438
  )
439
  df_cleaned.to_parquet(output_path, index=False)
440
  else:
441
+ output_path = secure_join(
442
  output_folder, f"{file_base_name}_deduplicated.csv"
443
  )
444
  df_cleaned.to_csv(
 
454
  # Create output filename
455
  file_base_name = os.path.splitext(os.path.basename(file_path))[0]
456
  file_ext = os.path.splitext(file_path)[-1]
457
+ output_path = secure_join(
458
  output_folder, f"{file_base_name}_deduplicated{file_ext}"
459
  )
460
 
 
516
  file_stem = os.path.splitext(file_name)[0]
517
  file_ext = os.path.splitext(file_name)[-1]
518
 
519
+ output_path = secure_join(output_folder, f"{file_stem}_deduplicated{file_ext}")
520
 
521
  if file_ext in [".xlsx", ".xls"]:
522
  df_cleaned.to_excel(
tools/helper_functions.py CHANGED
@@ -1,5 +1,4 @@
1
  import os
2
- import re
3
  import unicodedata
4
  from math import ceil
5
  from typing import List
@@ -33,6 +32,7 @@ from tools.config import (
33
  aws_comprehend_language_choices,
34
  textract_language_choices,
35
  )
 
36
 
37
 
38
  def _get_env_list(env_var_name: str) -> List[str]:
@@ -348,7 +348,7 @@ def put_columns_in_df(in_file: List[str]):
348
  def check_for_existing_textract_file(
349
  doc_file_name_no_extension_textbox: str, output_folder: str = OUTPUT_FOLDER
350
  ):
351
- textract_output_path = os.path.join(
352
  output_folder, doc_file_name_no_extension_textbox + "_textract.json"
353
  )
354
 
@@ -377,7 +377,7 @@ def check_for_relevant_ocr_output_with_words(
377
 
378
  doc_file_with_ending = doc_file_name_no_extension_textbox + file_ending
379
 
380
- local_ocr_output_path = os.path.join(output_folder, doc_file_with_ending)
381
 
382
  if os.path.exists(local_ocr_output_path):
383
  print("Existing OCR with words analysis output file found.")
@@ -591,7 +591,9 @@ def clean_unicode_text(text: str):
591
  # Step 3: Optionally remove non-ASCII characters if needed
592
  # This regex removes any remaining non-ASCII characters, if desired.
593
  # Comment this line if you want to keep all Unicode characters.
594
- cleaned_text = re.sub(r"[^\x00-\x7F]+", "", normalized_text)
 
 
595
 
596
  return cleaned_text
597
 
@@ -603,7 +605,7 @@ def load_all_output_files(folder_path: str = OUTPUT_FOLDER) -> List[str]:
603
  # List all files in the specified folder
604
  for filename in os.listdir(folder_path):
605
  # Construct full file path
606
- full_path = os.path.join(folder_path, filename)
607
  # Check if it's a file (not a directory)
608
  if os.path.isfile(full_path):
609
  file_paths.append(full_path)
 
1
  import os
 
2
  import unicodedata
3
  from math import ceil
4
  from typing import List
 
32
  aws_comprehend_language_choices,
33
  textract_language_choices,
34
  )
35
+ from tools.secure_path_utils import secure_join
36
 
37
 
38
  def _get_env_list(env_var_name: str) -> List[str]:
 
348
  def check_for_existing_textract_file(
349
  doc_file_name_no_extension_textbox: str, output_folder: str = OUTPUT_FOLDER
350
  ):
351
+ textract_output_path = secure_join(
352
  output_folder, doc_file_name_no_extension_textbox + "_textract.json"
353
  )
354
 
 
377
 
378
  doc_file_with_ending = doc_file_name_no_extension_textbox + file_ending
379
 
380
+ local_ocr_output_path = secure_join(output_folder, doc_file_with_ending)
381
 
382
  if os.path.exists(local_ocr_output_path):
383
  print("Existing OCR with words analysis output file found.")
 
591
  # Step 3: Optionally remove non-ASCII characters if needed
592
  # This regex removes any remaining non-ASCII characters, if desired.
593
  # Comment this line if you want to keep all Unicode characters.
594
+ from tools.secure_regex_utils import safe_remove_non_ascii
595
+
596
+ cleaned_text = safe_remove_non_ascii(normalized_text)
597
 
598
  return cleaned_text
599
 
 
605
  # List all files in the specified folder
606
  for filename in os.listdir(folder_path):
607
  # Construct full file path
608
+ full_path = secure_join(folder_path, filename)
609
  # Check if it's a file (not a directory)
610
  if os.path.isfile(full_path):
611
  file_paths.append(full_path)
tools/redaction_review.py CHANGED
@@ -1,6 +1,5 @@
1
  import os
2
  import random
3
- import re
4
  import string
5
  import uuid
6
  from datetime import datetime, timedelta, timezone
@@ -37,6 +36,9 @@ from tools.file_conversion import (
37
  )
38
  from tools.file_redaction import redact_page_with_pymupdf
39
  from tools.helper_functions import detect_file_type, get_file_name_without_type
 
 
 
40
 
41
  if not MAX_IMAGE_PIXELS:
42
  Image.MAX_IMAGE_PIXELS = None
@@ -535,10 +537,14 @@ def update_annotator_page_from_review_df(
535
  for i, page_state_entry in enumerate(out_image_annotations_state):
536
  # Assuming page_state_entry has a 'page' key (1-based)
537
 
538
- match = re.search(r"(\d+)\.png$", page_state_entry["image"])
539
- if match:
540
- page_no = int(match.group(1))
541
- else:
 
 
 
 
542
  page_no = 0
543
 
544
  if (
@@ -834,15 +840,11 @@ def create_annotation_objects_from_filtered_ocr_results_with_words(
834
  valid = False
835
  if isinstance(colour_label, str):
836
  label_str = colour_label.strip()
837
- match = re.match(
838
- r"^\(\s*(\d{1,3})\s*,\s*(\d{1,3})\s*,\s*(\d{1,3})\s*,?\s*\)$", label_str
839
- )
840
- if match:
841
- r_val, g_val, b_val = (
842
- int(match.group(1)),
843
- int(match.group(2)),
844
- int(match.group(3)),
845
- )
846
  if 0 <= r_val <= 255 and 0 <= g_val <= 255 and 0 <= b_val <= 255:
847
  valid = True
848
  elif isinstance(colour_label, (tuple, list)) and len(colour_label) == 3:
@@ -2568,9 +2570,9 @@ def create_xfdf(
2568
  pymupdf_page = pymupdf_doc.load_page(page_python_format)
2569
 
2570
  if document_cropboxes and page_python_format < len(document_cropboxes):
2571
- match = re.findall(
2572
- r"[-+]?\d*\.\d+|\d+", document_cropboxes[page_python_format]
2573
- )
2574
  if match and len(match) == 4:
2575
  rect_values = list(map(float, match))
2576
  pymupdf_page.set_cropbox(Rect(*rect_values))
@@ -2722,8 +2724,7 @@ def convert_df_to_xfdf(
2722
 
2723
  output_path = output_folder + file_path_name + "_adobe.xfdf"
2724
 
2725
- with open(output_path, "w", encoding="utf-8") as f:
2726
- f.write(xfdf_content)
2727
 
2728
  output_paths.append(output_path)
2729
 
 
1
  import os
2
  import random
 
3
  import string
4
  import uuid
5
  from datetime import datetime, timedelta, timezone
 
36
  )
37
  from tools.file_redaction import redact_page_with_pymupdf
38
  from tools.helper_functions import detect_file_type, get_file_name_without_type
39
+ from tools.secure_path_utils import (
40
+ secure_file_write,
41
+ )
42
 
43
  if not MAX_IMAGE_PIXELS:
44
  Image.MAX_IMAGE_PIXELS = None
 
537
  for i, page_state_entry in enumerate(out_image_annotations_state):
538
  # Assuming page_state_entry has a 'page' key (1-based)
539
 
540
+ from tools.secure_regex_utils import (
541
+ safe_extract_page_number_from_filename,
542
+ )
543
+
544
+ page_no = safe_extract_page_number_from_filename(
545
+ page_state_entry["image"]
546
+ )
547
+ if page_no is None:
548
  page_no = 0
549
 
550
  if (
 
840
  valid = False
841
  if isinstance(colour_label, str):
842
  label_str = colour_label.strip()
843
+ from tools.secure_regex_utils import safe_extract_rgb_values
844
+
845
+ rgb_values = safe_extract_rgb_values(label_str)
846
+ if rgb_values:
847
+ r_val, g_val, b_val = rgb_values
 
 
 
 
848
  if 0 <= r_val <= 255 and 0 <= g_val <= 255 and 0 <= b_val <= 255:
849
  valid = True
850
  elif isinstance(colour_label, (tuple, list)) and len(colour_label) == 3:
 
2570
  pymupdf_page = pymupdf_doc.load_page(page_python_format)
2571
 
2572
  if document_cropboxes and page_python_format < len(document_cropboxes):
2573
+ from tools.secure_regex_utils import safe_extract_numbers
2574
+
2575
+ match = safe_extract_numbers(document_cropboxes[page_python_format])
2576
  if match and len(match) == 4:
2577
  rect_values = list(map(float, match))
2578
  pymupdf_page.set_cropbox(Rect(*rect_values))
 
2724
 
2725
  output_path = output_folder + file_path_name + "_adobe.xfdf"
2726
 
2727
+ secure_file_write(output_path, xfdf_content, encoding="utf-8")
 
2728
 
2729
  output_paths.append(output_path)
2730
 
tools/secure_path_utils.py ADDED
@@ -0,0 +1,267 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Secure path utilities to prevent path injection attacks.
3
+
4
+ This module provides secure alternatives to os.path operations that validate
5
+ and sanitize file paths to prevent directory traversal and other path-based attacks.
6
+ """
7
+
8
+ import logging
9
+ import os
10
+ import re
11
+ from pathlib import Path
12
+ from typing import Optional, Union
13
+
14
+ logger = logging.getLogger(__name__)
15
+
16
+
17
+ def sanitize_filename(filename: str, max_length: int = 255) -> str:
18
+ """
19
+ Sanitize a filename to prevent path injection attacks.
20
+
21
+ Args:
22
+ filename: The filename to sanitize
23
+ max_length: Maximum length of the sanitized filename
24
+
25
+ Returns:
26
+ A sanitized filename safe for use in file operations
27
+
28
+ Raises:
29
+ ValueError: If the filename cannot be sanitized safely
30
+ """
31
+ if not filename or not isinstance(filename, str):
32
+ raise ValueError("Filename must be a non-empty string")
33
+
34
+ # Remove any path separators and normalize
35
+ filename = os.path.basename(filename)
36
+
37
+ # Remove or replace dangerous characters
38
+ # Keep alphanumeric, dots, hyphens, underscores, spaces, parentheses, brackets, and other safe chars
39
+ # Only remove truly dangerous characters like path separators and control chars
40
+ sanitized = re.sub(r'[<>:"|?*\x00-\x1f]', "_", filename)
41
+
42
+ # Remove multiple consecutive dots (except for file extensions)
43
+ sanitized = re.sub(r"\.{2,}", ".", sanitized)
44
+
45
+ # Remove leading/trailing dots and spaces
46
+ sanitized = sanitized.strip(". ")
47
+
48
+ # Ensure it's not empty after sanitization
49
+ if not sanitized:
50
+ sanitized = "sanitized_file"
51
+
52
+ # Truncate if too long, preserving extension
53
+ if len(sanitized) > max_length:
54
+ name, ext = os.path.splitext(sanitized)
55
+ max_name_length = max_length - len(ext)
56
+ sanitized = name[:max_name_length] + ext
57
+
58
+ return sanitized
59
+
60
+
61
+ def secure_path_join(base_path: Union[str, Path], *path_parts: str) -> Path:
62
+ """
63
+ Safely join paths while preventing directory traversal attacks.
64
+
65
+ Args:
66
+ base_path: The base directory path
67
+ *path_parts: Additional path components to join
68
+
69
+ Returns:
70
+ A Path object representing the safe joined path
71
+
72
+ Raises:
73
+ ValueError: If any path component contains dangerous characters
74
+ PermissionError: If the resulting path would escape the base directory
75
+ """
76
+ base_path = Path(base_path).resolve()
77
+
78
+ # Sanitize each path part - only sanitize if it contains dangerous patterns
79
+ sanitized_parts = []
80
+ for part in path_parts:
81
+ if not part:
82
+ continue
83
+ # Only sanitize if the part contains dangerous patterns
84
+ if re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', part):
85
+ sanitized_part = sanitize_filename(part)
86
+ else:
87
+ sanitized_part = part
88
+ sanitized_parts.append(sanitized_part)
89
+
90
+ # Join the paths
91
+ result_path = base_path
92
+ for part in sanitized_parts:
93
+ result_path = result_path / part
94
+
95
+ # Resolve the final path
96
+ result_path = result_path.resolve()
97
+
98
+ # Security check: ensure the result is within the base directory
99
+ try:
100
+ result_path.relative_to(base_path)
101
+ except ValueError:
102
+ raise PermissionError(f"Path would escape base directory: {result_path}")
103
+
104
+ return result_path
105
+
106
+
107
+ def secure_file_write(
108
+ file_path: Union[str, Path],
109
+ content: str,
110
+ mode: str = "w",
111
+ encoding: Optional[str] = None,
112
+ **kwargs,
113
+ ) -> None:
114
+ """
115
+ Safely write content to a file with path validation.
116
+
117
+ Args:
118
+ file_path: The file path to write to
119
+ content: The content to write
120
+ mode: File open mode (default: 'w')
121
+ encoding: Text encoding (default: None for binary mode)
122
+ **kwargs: Additional arguments for open()
123
+ """
124
+ file_path = Path(file_path)
125
+
126
+ # Ensure the parent directory exists
127
+ file_path.parent.mkdir(parents=True, exist_ok=True)
128
+
129
+ # Validate the path is safe
130
+ if not file_path.is_absolute():
131
+ file_path = file_path.resolve()
132
+
133
+ # Write the file
134
+ open_kwargs = {"mode": mode}
135
+ if encoding:
136
+ open_kwargs["encoding"] = encoding
137
+ open_kwargs.update(kwargs)
138
+
139
+ with open(file_path, **open_kwargs) as f:
140
+ f.write(content)
141
+
142
+
143
+ def secure_file_read(
144
+ file_path: Union[str, Path],
145
+ mode: str = "r",
146
+ encoding: Optional[str] = None,
147
+ **kwargs,
148
+ ) -> str:
149
+ """
150
+ Safely read content from a file with path validation.
151
+
152
+ Args:
153
+ file_path: The file path to read from
154
+ mode: File open mode (default: 'r')
155
+ encoding: Text encoding (default: None for binary mode)
156
+ **kwargs: Additional arguments for open()
157
+
158
+ Returns:
159
+ The file content
160
+ """
161
+ file_path = Path(file_path)
162
+
163
+ # Validate the path exists and is a file
164
+ if not file_path.exists():
165
+ raise FileNotFoundError(f"File not found: {file_path}")
166
+
167
+ if not file_path.is_file():
168
+ raise ValueError(f"Path is not a file: {file_path}")
169
+
170
+ # Read the file
171
+ open_kwargs = {"mode": mode}
172
+ if encoding:
173
+ open_kwargs["encoding"] = encoding
174
+ open_kwargs.update(kwargs)
175
+
176
+ with open(file_path, **open_kwargs) as f:
177
+ return f.read()
178
+
179
+
180
+ def validate_path_safety(
181
+ path: Union[str, Path], base_path: Optional[Union[str, Path]] = None
182
+ ) -> bool:
183
+ """
184
+ Validate that a path is safe and doesn't contain dangerous patterns.
185
+
186
+ Args:
187
+ path: The path to validate
188
+ base_path: Optional base path to check against
189
+
190
+ Returns:
191
+ True if the path is safe, False otherwise
192
+ """
193
+ try:
194
+ path = Path(path)
195
+
196
+ # Check for dangerous patterns
197
+ path_str = str(path)
198
+
199
+ # Check for directory traversal patterns
200
+ dangerous_patterns = [
201
+ "..", # Parent directory
202
+ "//", # Double slashes
203
+ "\\", # Backslashes (on Unix systems)
204
+ ]
205
+
206
+ for pattern in dangerous_patterns:
207
+ if pattern in path_str:
208
+ return False
209
+
210
+ # If base path is provided, ensure the path is within it
211
+ if base_path:
212
+ base_path = Path(base_path).resolve()
213
+ path = path.resolve()
214
+ try:
215
+ path.relative_to(base_path)
216
+ except ValueError:
217
+ return False
218
+
219
+ return True
220
+
221
+ except Exception:
222
+ return False
223
+
224
+
225
+ # Backward compatibility functions that maintain the same interface as os.path
226
+ def secure_join(*paths: str) -> str:
227
+ """
228
+ Secure alternative to os.path.join that prevents path injection.
229
+
230
+ Args:
231
+ *paths: Path components to join
232
+
233
+ Returns:
234
+ A safe joined path string
235
+ """
236
+ if not paths:
237
+ return ""
238
+
239
+ # Use the first path as base, others as components
240
+ base_path = Path(paths[0])
241
+ path_parts = paths[1:]
242
+
243
+ # Only use secure_path_join if there are potentially dangerous patterns
244
+ if any(re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', part) for part in path_parts):
245
+ result_path = secure_path_join(base_path, *path_parts)
246
+ return str(result_path)
247
+ else:
248
+ # Use normal path joining for safe paths
249
+ return str(Path(*paths))
250
+
251
+
252
+ def secure_basename(path: str) -> str:
253
+ """
254
+ Secure alternative to os.path.basename that sanitizes the result.
255
+
256
+ Args:
257
+ path: The path to get the basename from
258
+
259
+ Returns:
260
+ A sanitized basename
261
+ """
262
+ basename = os.path.basename(path)
263
+ # Only sanitize if the basename contains dangerous patterns
264
+ if re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', basename):
265
+ return sanitize_filename(basename)
266
+ else:
267
+ return basename
tools/secure_regex_utils.py ADDED
@@ -0,0 +1,292 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Secure regex utilities to prevent ReDoS (Regular Expression Denial of Service) attacks.
3
+
4
+ This module provides safe alternatives to common regex patterns that can cause
5
+ catastrophic backtracking and performance issues.
6
+ """
7
+
8
+ import re
9
+ from typing import List, Optional
10
+
11
+
12
+ def safe_extract_numbers_with_seconds(text: str) -> List[float]:
13
+ """
14
+ Safely extract numbers before 'seconds' from text without ReDoS vulnerability.
15
+
16
+ Args:
17
+ text: The text to search for numbers followed by 'seconds'
18
+
19
+ Returns:
20
+ List of float numbers found before 'seconds'
21
+ """
22
+ if not text or not isinstance(text, str):
23
+ return []
24
+
25
+ # Use a more specific pattern that avoids catastrophic backtracking
26
+ # Look for digits, optional decimal part, optional whitespace, then 'seconds'
27
+ pattern = r"\b(\d+(?:\.\d+)?)\s*seconds\b"
28
+
29
+ matches = re.findall(pattern, text)
30
+ try:
31
+ return [float(match) for match in matches]
32
+ except (ValueError, TypeError):
33
+ return []
34
+
35
+
36
+ def safe_extract_numbers(text: str) -> List[float]:
37
+ """
38
+ Safely extract all numbers from text without ReDoS vulnerability.
39
+
40
+ Args:
41
+ text: The text to extract numbers from
42
+
43
+ Returns:
44
+ List of float numbers found in the text
45
+ """
46
+ if not text or not isinstance(text, str):
47
+ return []
48
+
49
+ # Use a simple, safe pattern that doesn't cause backtracking
50
+ # Match digits, optional decimal point and more digits
51
+ pattern = r"\b\d+(?:\.\d+)?\b"
52
+
53
+ matches = re.findall(pattern, text)
54
+ try:
55
+ return [float(match) for match in matches]
56
+ except (ValueError, TypeError):
57
+ return []
58
+
59
+
60
+ def safe_extract_page_number_from_filename(filename: str) -> Optional[int]:
61
+ """
62
+ Safely extract page number from filename ending with .png.
63
+
64
+ Args:
65
+ filename: The filename to extract page number from
66
+
67
+ Returns:
68
+ Page number if found, None otherwise
69
+ """
70
+ if not filename or not isinstance(filename, str):
71
+ return None
72
+
73
+ # Use a simple, safe pattern
74
+ pattern = r"(\d+)\.png$"
75
+ match = re.search(pattern, filename)
76
+
77
+ if match:
78
+ try:
79
+ return int(match.group(1))
80
+ except (ValueError, TypeError):
81
+ return None
82
+
83
+ return None
84
+
85
+
86
+ def safe_extract_page_number_from_path(path: str) -> Optional[int]:
87
+ """
88
+ Safely extract page number from path containing _(\d+).png pattern.
89
+
90
+ Args:
91
+ path: The path to extract page number from
92
+
93
+ Returns:
94
+ Page number if found, None otherwise
95
+ """
96
+ if not path or not isinstance(path, str):
97
+ return None
98
+
99
+ # Use a simple, safe pattern
100
+ pattern = r"_(\d+)\.png$"
101
+ match = re.search(pattern, path)
102
+
103
+ if match:
104
+ try:
105
+ return int(match.group(1))
106
+ except (ValueError, TypeError):
107
+ return None
108
+
109
+ return None
110
+
111
+
112
+ def safe_clean_text(text: str, remove_html: bool = True) -> str:
113
+ """
114
+ Safely clean text without ReDoS vulnerability.
115
+
116
+ Args:
117
+ text: The text to clean
118
+ remove_html: Whether to remove HTML tags
119
+
120
+ Returns:
121
+ Cleaned text
122
+ """
123
+ if not text or not isinstance(text, str):
124
+ return ""
125
+
126
+ cleaned = text
127
+
128
+ if remove_html:
129
+ # Use a simple pattern that doesn't cause backtracking
130
+ cleaned = re.sub(r"<[^>]*>", "", cleaned)
131
+
132
+ # Clean up whitespace
133
+ cleaned = re.sub(r"\s+", " ", cleaned).strip()
134
+
135
+ return cleaned
136
+
137
+
138
+ def safe_extract_rgb_values(text: str) -> Optional[tuple]:
139
+ """
140
+ Safely extract RGB values from text like "(255, 255, 255)".
141
+
142
+ Args:
143
+ text: The text to extract RGB values from
144
+
145
+ Returns:
146
+ Tuple of (r, g, b) values if found, None otherwise
147
+ """
148
+ if not text or not isinstance(text, str):
149
+ return None
150
+
151
+ # Use a simple, safe pattern
152
+ pattern = r"\(\s*(\d{1,3})\s*,\s*(\d{1,3})\s*,\s*(\d{1,3})\s*\)"
153
+ match = re.match(pattern, text.strip())
154
+
155
+ if match:
156
+ try:
157
+ r = int(match.group(1))
158
+ g = int(match.group(2))
159
+ b = int(match.group(3))
160
+
161
+ # Validate RGB values
162
+ if 0 <= r <= 255 and 0 <= g <= 255 and 0 <= b <= 255:
163
+ return (r, g, b)
164
+ except (ValueError, TypeError):
165
+ pass
166
+
167
+ return None
168
+
169
+
170
+ def safe_split_filename(filename: str, delimiters: List[str]) -> List[str]:
171
+ """
172
+ Safely split filename by delimiters without ReDoS vulnerability.
173
+
174
+ Args:
175
+ filename: The filename to split
176
+ delimiters: List of delimiter patterns to split on
177
+
178
+ Returns:
179
+ List of filename parts
180
+ """
181
+ if not filename or not isinstance(filename, str):
182
+ return []
183
+
184
+ if not delimiters:
185
+ return [filename]
186
+
187
+ # Escape special regex characters in delimiters
188
+ escaped_delimiters = [re.escape(delim) for delim in delimiters]
189
+
190
+ # Create a safe pattern
191
+ pattern = "|".join(escaped_delimiters)
192
+
193
+ try:
194
+ return re.split(pattern, filename)
195
+ except re.error:
196
+ # Fallback to simple string operations if regex fails
197
+ result = [filename]
198
+ for delim in delimiters:
199
+ new_result = []
200
+ for part in result:
201
+ new_result.extend(part.split(delim))
202
+ result = new_result
203
+ return result
204
+
205
+
206
+ def safe_remove_leading_newlines(text: str) -> str:
207
+ """
208
+ Safely remove leading newlines without ReDoS vulnerability.
209
+
210
+ Args:
211
+ text: The text to clean
212
+
213
+ Returns:
214
+ Text with leading newlines removed
215
+ """
216
+ if not text or not isinstance(text, str):
217
+ return ""
218
+
219
+ # Use a simple pattern
220
+ return re.sub(r"^\n+", "", text).strip()
221
+
222
+
223
+ def safe_remove_non_ascii(text: str) -> str:
224
+ """
225
+ Safely remove non-ASCII characters without ReDoS vulnerability.
226
+
227
+ Args:
228
+ text: The text to clean
229
+
230
+ Returns:
231
+ Text with non-ASCII characters removed
232
+ """
233
+ if not text or not isinstance(text, str):
234
+ return ""
235
+
236
+ # Use a simple pattern
237
+ return re.sub(r"[^\x00-\x7F]", "", text)
238
+
239
+
240
+ def safe_extract_latest_number_from_filename(filename: str) -> Optional[int]:
241
+ """
242
+ Safely extract the latest/largest number from filename without ReDoS vulnerability.
243
+
244
+ Args:
245
+ filename: The filename to extract number from
246
+
247
+ Returns:
248
+ The largest number found, or None if no numbers found
249
+ """
250
+ if not filename or not isinstance(filename, str):
251
+ return None
252
+
253
+ # Use a simple pattern to find all numbers
254
+ pattern = r"\d+"
255
+ matches = re.findall(pattern, filename)
256
+
257
+ if not matches:
258
+ return None
259
+
260
+ try:
261
+ # Convert to integers and return the maximum
262
+ numbers = [int(match) for match in matches]
263
+ return max(numbers)
264
+ except (ValueError, TypeError):
265
+ return None
266
+
267
+
268
+ def safe_sanitize_text(text: str, replacement: str = "_") -> str:
269
+ """
270
+ Safely sanitize text by removing dangerous characters without ReDoS vulnerability.
271
+
272
+ Args:
273
+ text: The text to sanitize
274
+ replacement: Character to replace dangerous characters with
275
+
276
+ Returns:
277
+ Sanitized text
278
+ """
279
+ if not text or not isinstance(text, str):
280
+ return ""
281
+
282
+ # Use a simple pattern for dangerous characters
283
+ dangerous_chars = r'[<>:"|?*\\/\x00-\x1f\x7f-\x9f]'
284
+ sanitized = re.sub(dangerous_chars, replacement, text)
285
+
286
+ # Remove multiple consecutive replacements
287
+ sanitized = re.sub(f"{re.escape(replacement)}+", replacement, sanitized)
288
+
289
+ # Remove leading/trailing replacements
290
+ sanitized = sanitized.strip(replacement)
291
+
292
+ return sanitized
tools/textract_batch_call.py CHANGED
@@ -32,6 +32,11 @@ from tools.config import (
32
  )
33
  from tools.file_conversion import get_input_file_names
34
  from tools.helper_functions import get_file_name_without_type
 
 
 
 
 
35
 
36
  DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS = int(DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS)
37
 
@@ -115,8 +120,8 @@ def analyse_document_with_textract_api(
115
  textract_client = session.client("textract")
116
 
117
  # --- 1. Upload PDF to S3 ---
118
- pdf_filename = os.path.basename(local_pdf_path)
119
- s3_input_key = os.path.join(s3_input_prefix, pdf_filename).replace(
120
  "\\", "/"
121
  ) # Ensure forward slashes for S3
122
 
@@ -262,14 +267,13 @@ def analyse_document_with_textract_api(
262
  )
263
 
264
  # File path
265
- log_file_path = os.path.join(local_output_dir, "textract_document_jobs.csv")
266
- log_file_path_job_id = os.path.join(
267
  local_output_dir, pdf_filename + "_textract_document_jobs_job_id.txt"
268
  )
269
 
270
  # Write latest job ID to local text file
271
- with open(log_file_path_job_id, "w") as f:
272
- f.write(job_id)
273
 
274
  # Check if file exists
275
  file_exists = os.path.exists(log_file_path)
@@ -447,10 +451,9 @@ def download_textract_job_files(
447
  output_filename_base = os.path.basename(pdf_filename)
448
  output_filename_base_no_ext = os.path.splitext(output_filename_base)[0]
449
  local_output_filename = f"{output_filename_base_no_ext}_textract.json"
450
- local_output_path = os.path.join(local_output_dir, local_output_filename)
451
 
452
- with open(local_output_path, "w") as f:
453
- json.dump(combined_output, f)
454
 
455
  print(f"Combined Textract output written to {local_output_path}")
456
 
@@ -484,12 +487,12 @@ def load_pdf_job_file_from_s3(
484
  pdf_file_location = ""
485
  doc_file_name_no_extension_textbox = ""
486
 
487
- s3_input_key_prefix = os.path.join(
488
- load_s3_jobs_input_loc, pdf_filename
489
- ).replace("\\", "/")
490
  s3_input_key_prefix = s3_input_key_prefix + ".pdf"
491
 
492
- local_input_file_path = os.path.join(local_output_dir, pdf_filename)
493
  local_input_file_path = local_input_file_path + ".pdf"
494
 
495
  download_file_from_s3(
@@ -705,7 +708,7 @@ def poll_whole_document_textract_analysis_progress_and_download(
705
  # For robust handling, list objects and find the JSON(s).
706
 
707
  s3_output_key_prefix = (
708
- os.path.join(s3_output_prefix, job_id).replace("\\", "/") + "/"
709
  )
710
  logging.info(
711
  f"Searching for output files in s3://{s3_bucket_name}/{s3_output_key_prefix}"
@@ -848,7 +851,7 @@ def download_textract_output(
848
 
849
  # Find output ZIP file in S3
850
  output_file_key = f"{output_prefix}/{job_id}.zip"
851
- local_file_path = os.path.join(local_folder, f"{job_id}.zip")
852
 
853
  # Download file
854
  try:
 
32
  )
33
  from tools.file_conversion import get_input_file_names
34
  from tools.helper_functions import get_file_name_without_type
35
+ from tools.secure_path_utils import (
36
+ secure_basename,
37
+ secure_file_write,
38
+ secure_join,
39
+ )
40
 
41
  DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS = int(DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS)
42
 
 
120
  textract_client = session.client("textract")
121
 
122
  # --- 1. Upload PDF to S3 ---
123
+ pdf_filename = secure_basename(local_pdf_path)
124
+ s3_input_key = secure_join(s3_input_prefix, pdf_filename).replace(
125
  "\\", "/"
126
  ) # Ensure forward slashes for S3
127
 
 
267
  )
268
 
269
  # File path
270
+ log_file_path = secure_join(local_output_dir, "textract_document_jobs.csv")
271
+ log_file_path_job_id = secure_join(
272
  local_output_dir, pdf_filename + "_textract_document_jobs_job_id.txt"
273
  )
274
 
275
  # Write latest job ID to local text file
276
+ secure_file_write(log_file_path_job_id, job_id)
 
277
 
278
  # Check if file exists
279
  file_exists = os.path.exists(log_file_path)
 
451
  output_filename_base = os.path.basename(pdf_filename)
452
  output_filename_base_no_ext = os.path.splitext(output_filename_base)[0]
453
  local_output_filename = f"{output_filename_base_no_ext}_textract.json"
454
+ local_output_path = secure_join(local_output_dir, local_output_filename)
455
 
456
+ secure_file_write(local_output_path, json.dumps(combined_output))
 
457
 
458
  print(f"Combined Textract output written to {local_output_path}")
459
 
 
487
  pdf_file_location = ""
488
  doc_file_name_no_extension_textbox = ""
489
 
490
+ s3_input_key_prefix = secure_join(load_s3_jobs_input_loc, pdf_filename).replace(
491
+ "\\", "/"
492
+ )
493
  s3_input_key_prefix = s3_input_key_prefix + ".pdf"
494
 
495
+ local_input_file_path = secure_join(local_output_dir, pdf_filename)
496
  local_input_file_path = local_input_file_path + ".pdf"
497
 
498
  download_file_from_s3(
 
708
  # For robust handling, list objects and find the JSON(s).
709
 
710
  s3_output_key_prefix = (
711
+ secure_join(s3_output_prefix, job_id).replace("\\", "/") + "/"
712
  )
713
  logging.info(
714
  f"Searching for output files in s3://{s3_bucket_name}/{s3_output_key_prefix}"
 
851
 
852
  # Find output ZIP file in S3
853
  output_file_key = f"{output_prefix}/{job_id}.zip"
854
+ local_file_path = secure_join(local_folder, f"{job_id}.zip")
855
 
856
  # Download file
857
  try: