|
""" |
|
Secure path utilities to prevent path injection attacks. |
|
|
|
This module provides secure alternatives to os.path operations that validate |
|
and sanitize file paths to prevent directory traversal and other path-based attacks. |
|
""" |
|
|
|
import logging |
|
import os |
|
import re |
|
from pathlib import Path |
|
from typing import Optional, Union |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def sanitize_filename(filename: str, max_length: int = 255) -> str: |
|
""" |
|
Sanitize a filename to prevent path injection attacks. |
|
|
|
Args: |
|
filename: The filename to sanitize |
|
max_length: Maximum length of the sanitized filename |
|
|
|
Returns: |
|
A sanitized filename safe for use in file operations |
|
|
|
Raises: |
|
ValueError: If the filename cannot be sanitized safely |
|
""" |
|
if not filename or not isinstance(filename, str): |
|
raise ValueError("Filename must be a non-empty string") |
|
|
|
|
|
filename = os.path.basename(filename) |
|
|
|
|
|
|
|
|
|
sanitized = re.sub(r'[<>:"|?*\x00-\x1f]', "_", filename) |
|
|
|
|
|
sanitized = re.sub(r"\.{2,}", ".", sanitized) |
|
|
|
|
|
sanitized = sanitized.strip(". ") |
|
|
|
|
|
if not sanitized: |
|
sanitized = "sanitized_file" |
|
|
|
|
|
if len(sanitized) > max_length: |
|
name, ext = os.path.splitext(sanitized) |
|
max_name_length = max_length - len(ext) |
|
sanitized = name[:max_name_length] + ext |
|
|
|
return sanitized |
|
|
|
|
|
def secure_path_join(base_path: Union[str, Path], *path_parts: str) -> Path: |
|
""" |
|
Safely join paths while preventing directory traversal attacks. |
|
|
|
Args: |
|
base_path: The base directory path |
|
*path_parts: Additional path components to join |
|
|
|
Returns: |
|
A Path object representing the safe joined path |
|
|
|
Raises: |
|
ValueError: If any path component contains dangerous characters |
|
PermissionError: If the resulting path would escape the base directory |
|
""" |
|
base_path = Path(base_path).resolve() |
|
|
|
|
|
sanitized_parts = [] |
|
for part in path_parts: |
|
if not part: |
|
continue |
|
|
|
if re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', part): |
|
sanitized_part = sanitize_filename(part) |
|
else: |
|
sanitized_part = part |
|
sanitized_parts.append(sanitized_part) |
|
|
|
|
|
result_path = base_path |
|
for part in sanitized_parts: |
|
result_path = result_path / part |
|
|
|
|
|
result_path = result_path.resolve() |
|
|
|
|
|
try: |
|
result_path.relative_to(base_path) |
|
except ValueError: |
|
raise PermissionError(f"Path would escape base directory: {result_path}") |
|
|
|
return result_path |
|
|
|
|
|
def secure_file_write( |
|
base_path: Union[str, Path], |
|
filename: str, |
|
content: str, |
|
mode: str = "w", |
|
encoding: Optional[str] = None, |
|
**kwargs, |
|
) -> None: |
|
""" |
|
Safely write content to a file within a base directory with path validation. |
|
|
|
Args: |
|
base_path: The base directory under which to write the file |
|
filename: The target file name or relative path (untrusted) |
|
content: The content to write |
|
mode: File open mode (default: 'w') |
|
encoding: Text encoding (default: None for binary mode) |
|
**kwargs: Additional arguments for open() |
|
""" |
|
|
|
file_path = secure_path_join(base_path, filename) |
|
|
|
|
|
file_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
open_kwargs = {"mode": mode} |
|
if encoding: |
|
open_kwargs["encoding"] = encoding |
|
open_kwargs.update(kwargs) |
|
|
|
with open(file_path, **open_kwargs) as f: |
|
f.write(content) |
|
|
|
|
|
def secure_file_read( |
|
base_path: Union[str, Path], |
|
filename: str, |
|
mode: str = "r", |
|
encoding: Optional[str] = None, |
|
**kwargs, |
|
) -> str: |
|
""" |
|
Safely read content from a file within a base directory with path validation. |
|
|
|
Args: |
|
base_path: The base directory under which to read the file |
|
filename: The target file name or relative path (untrusted) |
|
mode: File open mode (default: 'r') |
|
encoding: Text encoding (default: None for binary mode) |
|
**kwargs: Additional arguments for open() |
|
|
|
Returns: |
|
The file content |
|
""" |
|
|
|
file_path = secure_path_join(base_path, filename) |
|
|
|
|
|
if not file_path.exists(): |
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
|
if not file_path.is_file(): |
|
raise ValueError(f"Path is not a file: {file_path}") |
|
|
|
|
|
open_kwargs = {"mode": mode} |
|
if encoding: |
|
open_kwargs["encoding"] = encoding |
|
open_kwargs.update(kwargs) |
|
|
|
with open(file_path, **open_kwargs) as f: |
|
return f.read() |
|
|
|
|
|
def validate_path_safety( |
|
path: Union[str, Path], base_path: Optional[Union[str, Path]] = None |
|
) -> bool: |
|
""" |
|
Validate that a path is safe and doesn't contain dangerous patterns. |
|
|
|
Args: |
|
path: The path to validate |
|
base_path: Optional base path to check against |
|
|
|
Returns: |
|
True if the path is safe, False otherwise |
|
""" |
|
try: |
|
path = Path(path) |
|
|
|
|
|
path_str = str(path) |
|
|
|
|
|
dangerous_patterns = [ |
|
"..", |
|
"//", |
|
"\\", |
|
] |
|
|
|
for pattern in dangerous_patterns: |
|
if pattern in path_str: |
|
return False |
|
|
|
|
|
if base_path: |
|
base_path = Path(base_path).resolve() |
|
path = path.resolve() |
|
try: |
|
path.relative_to(base_path) |
|
except ValueError: |
|
return False |
|
|
|
return True |
|
|
|
except Exception: |
|
return False |
|
|
|
|
|
|
|
def secure_join(*paths: str) -> str: |
|
""" |
|
Secure alternative to os.path.join that prevents path injection. |
|
|
|
Args: |
|
*paths: Path components to join |
|
|
|
Returns: |
|
A safe joined path string |
|
""" |
|
if not paths: |
|
return "" |
|
|
|
|
|
base_path = Path(paths[0]) |
|
path_parts = paths[1:] |
|
|
|
|
|
if any(re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', part) for part in path_parts): |
|
result_path = secure_path_join(base_path, *path_parts) |
|
return str(result_path) |
|
else: |
|
|
|
return str(Path(*paths)) |
|
|
|
|
|
def secure_basename(path: str) -> str: |
|
""" |
|
Secure alternative to os.path.basename that sanitizes the result. |
|
|
|
Args: |
|
path: The path to get the basename from |
|
|
|
Returns: |
|
A sanitized basename |
|
""" |
|
basename = os.path.basename(path) |
|
|
|
if re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', basename): |
|
return sanitize_filename(basename) |
|
else: |
|
return basename |
|
|