Spaces:
Sleeping
Sleeping
# Copyright (C) 2008, 2009 Michael Trier ([email protected]) and contributors | |
# | |
# This module is part of GitPython and is released under the | |
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ | |
"""Parser for reading and writing configuration files.""" | |
__all__ = ["GitConfigParser", "SectionConstraint"] | |
import abc | |
import configparser as cp | |
import fnmatch | |
from functools import wraps | |
import inspect | |
from io import BufferedReader, IOBase | |
import logging | |
import os | |
import os.path as osp | |
import re | |
import sys | |
from git.compat import defenc, force_text | |
from git.util import LockFile | |
# typing------------------------------------------------------- | |
from typing import ( | |
Any, | |
Callable, | |
Generic, | |
IO, | |
List, | |
Dict, | |
Sequence, | |
TYPE_CHECKING, | |
Tuple, | |
TypeVar, | |
Union, | |
cast, | |
) | |
from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, assert_never, _T | |
if TYPE_CHECKING: | |
from io import BytesIO | |
from git.repo.base import Repo | |
T_ConfigParser = TypeVar("T_ConfigParser", bound="GitConfigParser") | |
T_OMD_value = TypeVar("T_OMD_value", str, bytes, int, float, bool) | |
if sys.version_info[:3] < (3, 7, 2): | |
# typing.Ordereddict not added until Python 3.7.2. | |
from collections import OrderedDict | |
OrderedDict_OMD = OrderedDict | |
else: | |
from typing import OrderedDict | |
OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] # type: ignore[assignment, misc] | |
# ------------------------------------------------------------- | |
_logger = logging.getLogger(__name__) | |
CONFIG_LEVELS: ConfigLevels_Tup = ("system", "user", "global", "repository") | |
"""The configuration level of a configuration file.""" | |
CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch):(.+)\"") | |
"""Section pattern to detect conditional includes. | |
See: https://git-scm.com/docs/git-config#_conditional_includes | |
""" | |
class MetaParserBuilder(abc.ABCMeta): # noqa: B024 | |
"""Utility class wrapping base-class methods into decorators that assure read-only | |
properties.""" | |
def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> "MetaParserBuilder": | |
"""Equip all base-class methods with a needs_values decorator, and all non-const | |
methods with a :func:`set_dirty_and_flush_changes` decorator in addition to | |
that. | |
""" | |
kmm = "_mutating_methods_" | |
if kmm in clsdict: | |
mutating_methods = clsdict[kmm] | |
for base in bases: | |
methods = (t for t in inspect.getmembers(base, inspect.isroutine) if not t[0].startswith("_")) | |
for name, method in methods: | |
if name in clsdict: | |
continue | |
method_with_values = needs_values(method) | |
if name in mutating_methods: | |
method_with_values = set_dirty_and_flush_changes(method_with_values) | |
# END mutating methods handling | |
clsdict[name] = method_with_values | |
# END for each name/method pair | |
# END for each base | |
# END if mutating methods configuration is set | |
new_type = super().__new__(cls, name, bases, clsdict) | |
return new_type | |
def needs_values(func: Callable[..., _T]) -> Callable[..., _T]: | |
"""Return a method for ensuring we read values (on demand) before we try to access | |
them.""" | |
def assure_data_present(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: | |
self.read() | |
return func(self, *args, **kwargs) | |
# END wrapper method | |
return assure_data_present | |
def set_dirty_and_flush_changes(non_const_func: Callable[..., _T]) -> Callable[..., _T]: | |
"""Return a method that checks whether given non constant function may be called. | |
If so, the instance will be set dirty. Additionally, we flush the changes right to | |
disk. | |
""" | |
def flush_changes(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: | |
rval = non_const_func(self, *args, **kwargs) | |
self._dirty = True | |
self.write() | |
return rval | |
# END wrapper method | |
flush_changes.__name__ = non_const_func.__name__ | |
return flush_changes | |
class SectionConstraint(Generic[T_ConfigParser]): | |
"""Constrains a ConfigParser to only option commands which are constrained to | |
always use the section we have been initialized with. | |
It supports all ConfigParser methods that operate on an option. | |
:note: | |
If used as a context manager, will release the wrapped ConfigParser. | |
""" | |
__slots__ = ("_config", "_section_name") | |
_valid_attrs_ = ( | |
"get_value", | |
"set_value", | |
"get", | |
"set", | |
"getint", | |
"getfloat", | |
"getboolean", | |
"has_option", | |
"remove_section", | |
"remove_option", | |
"options", | |
) | |
def __init__(self, config: T_ConfigParser, section: str) -> None: | |
self._config = config | |
self._section_name = section | |
def __del__(self) -> None: | |
# Yes, for some reason, we have to call it explicitly for it to work in PY3 ! | |
# Apparently __del__ doesn't get call anymore if refcount becomes 0 | |
# Ridiculous ... . | |
self._config.release() | |
def __getattr__(self, attr: str) -> Any: | |
if attr in self._valid_attrs_: | |
return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs) | |
return super().__getattribute__(attr) | |
def _call_config(self, method: str, *args: Any, **kwargs: Any) -> Any: | |
"""Call the configuration at the given method which must take a section name as | |
first argument.""" | |
return getattr(self._config, method)(self._section_name, *args, **kwargs) | |
def config(self) -> T_ConfigParser: | |
"""return: ConfigParser instance we constrain""" | |
return self._config | |
def release(self) -> None: | |
"""Equivalent to :meth:`GitConfigParser.release`, which is called on our | |
underlying parser instance.""" | |
return self._config.release() | |
def __enter__(self) -> "SectionConstraint[T_ConfigParser]": | |
self._config.__enter__() | |
return self | |
def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None: | |
self._config.__exit__(exception_type, exception_value, traceback) | |
class _OMD(OrderedDict_OMD): | |
"""Ordered multi-dict.""" | |
def __setitem__(self, key: str, value: _T) -> None: | |
super().__setitem__(key, [value]) | |
def add(self, key: str, value: Any) -> None: | |
if key not in self: | |
super().__setitem__(key, [value]) | |
return | |
super().__getitem__(key).append(value) | |
def setall(self, key: str, values: List[_T]) -> None: | |
super().__setitem__(key, values) | |
def __getitem__(self, key: str) -> Any: | |
return super().__getitem__(key)[-1] | |
def getlast(self, key: str) -> Any: | |
return super().__getitem__(key)[-1] | |
def setlast(self, key: str, value: Any) -> None: | |
if key not in self: | |
super().__setitem__(key, [value]) | |
return | |
prior = super().__getitem__(key) | |
prior[-1] = value | |
def get(self, key: str, default: Union[_T, None] = None) -> Union[_T, None]: | |
return super().get(key, [default])[-1] | |
def getall(self, key: str) -> List[_T]: | |
return super().__getitem__(key) | |
def items(self) -> List[Tuple[str, _T]]: # type: ignore[override] | |
"""List of (key, last value for key).""" | |
return [(k, self[k]) for k in self] | |
def items_all(self) -> List[Tuple[str, List[_T]]]: | |
"""List of (key, list of values for key).""" | |
return [(k, self.getall(k)) for k in self] | |
def get_config_path(config_level: Lit_config_levels) -> str: | |
# We do not support an absolute path of the gitconfig on Windows. | |
# Use the global config instead. | |
if sys.platform == "win32" and config_level == "system": | |
config_level = "global" | |
if config_level == "system": | |
return "/etc/gitconfig" | |
elif config_level == "user": | |
config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config") | |
return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config"))) | |
elif config_level == "global": | |
return osp.normpath(osp.expanduser("~/.gitconfig")) | |
elif config_level == "repository": | |
raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path") | |
else: | |
# Should not reach here. Will raise ValueError if does. Static typing will warn | |
# about missing elifs. | |
assert_never( # type: ignore[unreachable] | |
config_level, | |
ValueError(f"Invalid configuration level: {config_level!r}"), | |
) | |
class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): | |
"""Implements specifics required to read git style configuration files. | |
This variation behaves much like the :manpage:`git-config(1)` command, such that the | |
configuration will be read on demand based on the filepath given during | |
initialization. | |
The changes will automatically be written once the instance goes out of scope, but | |
can be triggered manually as well. | |
The configuration file will be locked if you intend to change values preventing | |
other instances to write concurrently. | |
:note: | |
The config is case-sensitive even when queried, hence section and option names | |
must match perfectly. | |
:note: | |
If used as a context manager, this will release the locked file. | |
""" | |
# { Configuration | |
t_lock = LockFile | |
"""The lock type determines the type of lock to use in new configuration readers. | |
They must be compatible to the :class:`~git.util.LockFile` interface. | |
A suitable alternative would be the :class:`~git.util.BlockingLockFile`. | |
""" | |
re_comment = re.compile(r"^\s*[#;]") | |
# } END configuration | |
optvalueonly_source = r"\s*(?P<option>[^:=\s][^:=]*)" | |
OPTVALUEONLY = re.compile(optvalueonly_source) | |
OPTCRE = re.compile(optvalueonly_source + r"\s*(?P<vi>[:=])\s*" + r"(?P<value>.*)$") | |
del optvalueonly_source | |
_mutating_methods_ = ("add_section", "remove_section", "remove_option", "set") | |
"""Names of :class:`~configparser.RawConfigParser` methods able to change the | |
instance.""" | |
def __init__( | |
self, | |
file_or_files: Union[None, PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = None, | |
read_only: bool = True, | |
merge_includes: bool = True, | |
config_level: Union[Lit_config_levels, None] = None, | |
repo: Union["Repo", None] = None, | |
) -> None: | |
"""Initialize a configuration reader to read the given `file_or_files` and to | |
possibly allow changes to it by setting `read_only` False. | |
:param file_or_files: | |
A file path or file object, or a sequence of possibly more than one of them. | |
:param read_only: | |
If ``True``, the ConfigParser may only read the data, but not change it. | |
If ``False``, only a single file path or file object may be given. We will | |
write back the changes when they happen, or when the ConfigParser is | |
released. This will not happen if other configuration files have been | |
included. | |
:param merge_includes: | |
If ``True``, we will read files mentioned in ``[include]`` sections and | |
merge their contents into ours. This makes it impossible to write back an | |
individual configuration file. Thus, if you want to modify a single | |
configuration file, turn this off to leave the original dataset unaltered | |
when reading it. | |
:param repo: | |
Reference to repository to use if ``[includeIf]`` sections are found in | |
configuration files. | |
""" | |
cp.RawConfigParser.__init__(self, dict_type=_OMD) | |
self._dict: Callable[..., _OMD] | |
self._defaults: _OMD | |
self._sections: _OMD | |
# Used in Python 3. Needs to stay in sync with sections for underlying | |
# implementation to work. | |
if not hasattr(self, "_proxies"): | |
self._proxies = self._dict() | |
if file_or_files is not None: | |
self._file_or_files: Union[PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = file_or_files | |
else: | |
if config_level is None: | |
if read_only: | |
self._file_or_files = [ | |
get_config_path(cast(Lit_config_levels, f)) for f in CONFIG_LEVELS if f != "repository" | |
] | |
else: | |
raise ValueError("No configuration level or configuration files specified") | |
else: | |
self._file_or_files = [get_config_path(config_level)] | |
self._read_only = read_only | |
self._dirty = False | |
self._is_initialized = False | |
self._merge_includes = merge_includes | |
self._repo = repo | |
self._lock: Union["LockFile", None] = None | |
self._acquire_lock() | |
def _acquire_lock(self) -> None: | |
if not self._read_only: | |
if not self._lock: | |
if isinstance(self._file_or_files, (str, os.PathLike)): | |
file_or_files = self._file_or_files | |
elif isinstance(self._file_or_files, (tuple, list, Sequence)): | |
raise ValueError( | |
"Write-ConfigParsers can operate on a single file only, multiple files have been passed" | |
) | |
else: | |
file_or_files = self._file_or_files.name | |
# END get filename from handle/stream | |
# Initialize lock base - we want to write. | |
self._lock = self.t_lock(file_or_files) | |
# END lock check | |
self._lock._obtain_lock() | |
# END read-only check | |
def __del__(self) -> None: | |
"""Write pending changes if required and release locks.""" | |
# NOTE: Only consistent in Python 2. | |
self.release() | |
def __enter__(self) -> "GitConfigParser": | |
self._acquire_lock() | |
return self | |
def __exit__(self, *args: Any) -> None: | |
self.release() | |
def release(self) -> None: | |
"""Flush changes and release the configuration write lock. This instance must | |
not be used anymore afterwards. | |
In Python 3, it's required to explicitly release locks and flush changes, as | |
``__del__`` is not called deterministically anymore. | |
""" | |
# Checking for the lock here makes sure we do not raise during write() | |
# in case an invalid parser was created who could not get a lock. | |
if self.read_only or (self._lock and not self._lock._has_lock()): | |
return | |
try: | |
self.write() | |
except IOError: | |
_logger.error("Exception during destruction of GitConfigParser", exc_info=True) | |
except ReferenceError: | |
# This happens in Python 3... and usually means that some state cannot be | |
# written as the sections dict cannot be iterated. This usually happens when | |
# the interpreter is shutting down. Can it be fixed? | |
pass | |
finally: | |
if self._lock is not None: | |
self._lock._release_lock() | |
def optionxform(self, optionstr: str) -> str: | |
"""Do not transform options in any way when writing.""" | |
return optionstr | |
def _read(self, fp: Union[BufferedReader, IO[bytes]], fpname: str) -> None: | |
"""Originally a direct copy of the Python 2.4 version of | |
:meth:`RawConfigParser._read <configparser.RawConfigParser._read>`, to ensure it | |
uses ordered dicts. | |
The ordering bug was fixed in Python 2.4, and dict itself keeps ordering since | |
Python 3.7. This has some other changes, especially that it ignores initial | |
whitespace, since git uses tabs. (Big comments are removed to be more compact.) | |
""" | |
cursect = None # None, or a dictionary. | |
optname = None | |
lineno = 0 | |
is_multi_line = False | |
e = None # None, or an exception. | |
def string_decode(v: str) -> str: | |
if v and v.endswith("\\"): | |
v = v[:-1] | |
# END cut trailing escapes to prevent decode error | |
return v.encode(defenc).decode("unicode_escape") | |
# END string_decode | |
while True: | |
# We assume to read binary! | |
line = fp.readline().decode(defenc) | |
if not line: | |
break | |
lineno = lineno + 1 | |
# Comment or blank line? | |
if line.strip() == "" or self.re_comment.match(line): | |
continue | |
if line.split(None, 1)[0].lower() == "rem" and line[0] in "rR": | |
# No leading whitespace. | |
continue | |
# Is it a section header? | |
mo = self.SECTCRE.match(line.strip()) | |
if not is_multi_line and mo: | |
sectname: str = mo.group("header").strip() | |
if sectname in self._sections: | |
cursect = self._sections[sectname] | |
elif sectname == cp.DEFAULTSECT: | |
cursect = self._defaults | |
else: | |
cursect = self._dict((("__name__", sectname),)) | |
self._sections[sectname] = cursect | |
self._proxies[sectname] = None | |
# So sections can't start with a continuation line. | |
optname = None | |
# No section header in the file? | |
elif cursect is None: | |
raise cp.MissingSectionHeaderError(fpname, lineno, line) | |
# An option line? | |
elif not is_multi_line: | |
mo = self.OPTCRE.match(line) | |
if mo: | |
# We might just have handled the last line, which could contain a quotation we want to remove. | |
optname, vi, optval = mo.group("option", "vi", "value") | |
if vi in ("=", ":") and ";" in optval and not optval.strip().startswith('"'): | |
pos = optval.find(";") | |
if pos != -1 and optval[pos - 1].isspace(): | |
optval = optval[:pos] | |
optval = optval.strip() | |
if optval == '""': | |
optval = "" | |
# END handle empty string | |
optname = self.optionxform(optname.rstrip()) | |
if len(optval) > 1 and optval[0] == '"' and optval[-1] != '"': | |
is_multi_line = True | |
optval = string_decode(optval[1:]) | |
# END handle multi-line | |
# Preserves multiple values for duplicate optnames. | |
cursect.add(optname, optval) | |
else: | |
# Check if it's an option with no value - it's just ignored by git. | |
if not self.OPTVALUEONLY.match(line): | |
if not e: | |
e = cp.ParsingError(fpname) | |
e.append(lineno, repr(line)) | |
continue | |
else: | |
line = line.rstrip() | |
if line.endswith('"'): | |
is_multi_line = False | |
line = line[:-1] | |
# END handle quotations | |
optval = cursect.getlast(optname) | |
cursect.setlast(optname, optval + string_decode(line)) | |
# END parse section or option | |
# END while reading | |
# If any parsing errors occurred, raise an exception. | |
if e: | |
raise e | |
def _has_includes(self) -> Union[bool, int]: | |
return self._merge_includes and len(self._included_paths()) | |
def _included_paths(self) -> List[Tuple[str, str]]: | |
"""List all paths that must be included to configuration. | |
:return: | |
The list of paths, where each path is a tuple of (option, value). | |
""" | |
paths = [] | |
for section in self.sections(): | |
if section == "include": | |
paths += self.items(section) | |
match = CONDITIONAL_INCLUDE_REGEXP.search(section) | |
if match is None or self._repo is None: | |
continue | |
keyword = match.group(1) | |
value = match.group(2).strip() | |
if keyword in ["gitdir", "gitdir/i"]: | |
value = osp.expanduser(value) | |
if not any(value.startswith(s) for s in ["./", "/"]): | |
value = "**/" + value | |
if value.endswith("/"): | |
value += "**" | |
# Ensure that glob is always case insensitive if required. | |
if keyword.endswith("/i"): | |
value = re.sub( | |
r"[a-zA-Z]", | |
lambda m: "[{}{}]".format(m.group().lower(), m.group().upper()), | |
value, | |
) | |
if self._repo.git_dir: | |
if fnmatch.fnmatchcase(str(self._repo.git_dir), value): | |
paths += self.items(section) | |
elif keyword == "onbranch": | |
try: | |
branch_name = self._repo.active_branch.name | |
except TypeError: | |
# Ignore section if active branch cannot be retrieved. | |
continue | |
if fnmatch.fnmatchcase(branch_name, value): | |
paths += self.items(section) | |
return paths | |
def read(self) -> None: # type: ignore[override] | |
"""Read the data stored in the files we have been initialized with. | |
This will ignore files that cannot be read, possibly leaving an empty | |
configuration. | |
:raise IOError: | |
If a file cannot be handled. | |
""" | |
if self._is_initialized: | |
return | |
self._is_initialized = True | |
files_to_read: List[Union[PathLike, IO]] = [""] | |
if isinstance(self._file_or_files, (str, os.PathLike)): | |
# For str or Path, as str is a type of Sequence. | |
files_to_read = [self._file_or_files] | |
elif not isinstance(self._file_or_files, (tuple, list, Sequence)): | |
# Could merge with above isinstance once runtime type known. | |
files_to_read = [self._file_or_files] | |
else: # For lists or tuples. | |
files_to_read = list(self._file_or_files) | |
# END ensure we have a copy of the paths to handle | |
seen = set(files_to_read) | |
num_read_include_files = 0 | |
while files_to_read: | |
file_path = files_to_read.pop(0) | |
file_ok = False | |
if hasattr(file_path, "seek"): | |
# Must be a file-object. | |
# TODO: Replace cast with assert to narrow type, once sure. | |
file_path = cast(IO[bytes], file_path) | |
self._read(file_path, file_path.name) | |
else: | |
# Assume a path if it is not a file-object. | |
file_path = cast(PathLike, file_path) | |
try: | |
with open(file_path, "rb") as fp: | |
file_ok = True | |
self._read(fp, fp.name) | |
except IOError: | |
continue | |
# Read includes and append those that we didn't handle yet. We expect all | |
# paths to be normalized and absolute (and will ensure that is the case). | |
if self._has_includes(): | |
for _, include_path in self._included_paths(): | |
if include_path.startswith("~"): | |
include_path = osp.expanduser(include_path) | |
if not osp.isabs(include_path): | |
if not file_ok: | |
continue | |
# END ignore relative paths if we don't know the configuration file path | |
file_path = cast(PathLike, file_path) | |
assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work" | |
include_path = osp.join(osp.dirname(file_path), include_path) | |
# END make include path absolute | |
include_path = osp.normpath(include_path) | |
if include_path in seen or not os.access(include_path, os.R_OK): | |
continue | |
seen.add(include_path) | |
# Insert included file to the top to be considered first. | |
files_to_read.insert(0, include_path) | |
num_read_include_files += 1 | |
# END each include path in configuration file | |
# END handle includes | |
# END for each file object to read | |
# If there was no file included, we can safely write back (potentially) the | |
# configuration file without altering its meaning. | |
if num_read_include_files == 0: | |
self._merge_includes = False | |
def _write(self, fp: IO) -> None: | |
"""Write an .ini-format representation of the configuration state in | |
git compatible format.""" | |
def write_section(name: str, section_dict: _OMD) -> None: | |
fp.write(("[%s]\n" % name).encode(defenc)) | |
values: Sequence[str] # Runtime only gets str in tests, but should be whatever _OMD stores. | |
v: str | |
for key, values in section_dict.items_all(): | |
if key == "__name__": | |
continue | |
for v in values: | |
fp.write(("\t%s = %s\n" % (key, self._value_to_string(v).replace("\n", "\n\t"))).encode(defenc)) | |
# END if key is not __name__ | |
# END section writing | |
if self._defaults: | |
write_section(cp.DEFAULTSECT, self._defaults) | |
value: _OMD | |
for name, value in self._sections.items(): | |
write_section(name, value) | |
def items(self, section_name: str) -> List[Tuple[str, str]]: # type: ignore[override] | |
""":return: list((option, value), ...) pairs of all items in the given section""" | |
return [(k, v) for k, v in super().items(section_name) if k != "__name__"] | |
def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]: | |
""":return: list((option, [values...]), ...) pairs of all items in the given section""" | |
rv = _OMD(self._defaults) | |
for k, vs in self._sections[section_name].items_all(): | |
if k == "__name__": | |
continue | |
if k in rv and rv.getall(k) == vs: | |
continue | |
for v in vs: | |
rv.add(k, v) | |
return rv.items_all() | |
def write(self) -> None: | |
"""Write changes to our file, if there are changes at all. | |
:raise IOError: | |
If this is a read-only writer instance or if we could not obtain a file | |
lock. | |
""" | |
self._assure_writable("write") | |
if not self._dirty: | |
return | |
if isinstance(self._file_or_files, (list, tuple)): | |
raise AssertionError( | |
"Cannot write back if there is not exactly a single file to write to, have %i files" | |
% len(self._file_or_files) | |
) | |
# END assert multiple files | |
if self._has_includes(): | |
_logger.debug( | |
"Skipping write-back of configuration file as include files were merged in." | |
+ "Set merge_includes=False to prevent this." | |
) | |
return | |
# END stop if we have include files | |
fp = self._file_or_files | |
# We have a physical file on disk, so get a lock. | |
is_file_lock = isinstance(fp, (str, os.PathLike, IOBase)) # TODO: Use PathLike (having dropped 3.5). | |
if is_file_lock and self._lock is not None: # Else raise error? | |
self._lock._obtain_lock() | |
if not hasattr(fp, "seek"): | |
fp = cast(PathLike, fp) | |
with open(fp, "wb") as fp_open: | |
self._write(fp_open) | |
else: | |
fp = cast("BytesIO", fp) | |
fp.seek(0) | |
# Make sure we do not overwrite into an existing file. | |
if hasattr(fp, "truncate"): | |
fp.truncate() | |
self._write(fp) | |
def _assure_writable(self, method_name: str) -> None: | |
if self.read_only: | |
raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name)) | |
def add_section(self, section: str) -> None: | |
"""Assures added options will stay in order.""" | |
return super().add_section(section) | |
def read_only(self) -> bool: | |
""":return: ``True`` if this instance may change the configuration file""" | |
return self._read_only | |
# FIXME: Figure out if default or return type can really include bool. | |
def get_value( | |
self, | |
section: str, | |
option: str, | |
default: Union[int, float, str, bool, None] = None, | |
) -> Union[int, float, str, bool]: | |
"""Get an option's value. | |
If multiple values are specified for this option in the section, the last one | |
specified is returned. | |
:param default: | |
If not ``None``, the given default value will be returned in case the option | |
did not exist. | |
:return: | |
A properly typed value, either int, float or string | |
:raise TypeError: | |
In case the value could not be understood. | |
Otherwise the exceptions known to the ConfigParser will be raised. | |
""" | |
try: | |
valuestr = self.get(section, option) | |
except Exception: | |
if default is not None: | |
return default | |
raise | |
return self._string_to_value(valuestr) | |
def get_values( | |
self, | |
section: str, | |
option: str, | |
default: Union[int, float, str, bool, None] = None, | |
) -> List[Union[int, float, str, bool]]: | |
"""Get an option's values. | |
If multiple values are specified for this option in the section, all are | |
returned. | |
:param default: | |
If not ``None``, a list containing the given default value will be returned | |
in case the option did not exist. | |
:return: | |
A list of properly typed values, either int, float or string | |
:raise TypeError: | |
In case the value could not be understood. | |
Otherwise the exceptions known to the ConfigParser will be raised. | |
""" | |
try: | |
self.sections() | |
lst = self._sections[section].getall(option) | |
except Exception: | |
if default is not None: | |
return [default] | |
raise | |
return [self._string_to_value(valuestr) for valuestr in lst] | |
def _string_to_value(self, valuestr: str) -> Union[int, float, str, bool]: | |
types = (int, float) | |
for numtype in types: | |
try: | |
val = numtype(valuestr) | |
# truncated value ? | |
if val != float(valuestr): | |
continue | |
return val | |
except (ValueError, TypeError): | |
continue | |
# END for each numeric type | |
# Try boolean values as git uses them. | |
vl = valuestr.lower() | |
if vl == "false": | |
return False | |
if vl == "true": | |
return True | |
if not isinstance(valuestr, str): | |
raise TypeError( | |
"Invalid value type: only int, long, float and str are allowed", | |
valuestr, | |
) | |
return valuestr | |
def _value_to_string(self, value: Union[str, bytes, int, float, bool]) -> str: | |
if isinstance(value, (int, float, bool)): | |
return str(value) | |
return force_text(value) | |
def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser": | |
"""Set the given option in section to the given value. | |
This will create the section if required, and will not throw as opposed to the | |
default ConfigParser ``set`` method. | |
:param section: | |
Name of the section in which the option resides or should reside. | |
:param option: | |
Name of the options whose value to set. | |
:param value: | |
Value to set the option to. It must be a string or convertible to a string. | |
:return: | |
This instance | |
""" | |
if not self.has_section(section): | |
self.add_section(section) | |
self.set(section, option, self._value_to_string(value)) | |
return self | |
def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser": | |
"""Add a value for the given option in section. | |
This will create the section if required, and will not throw as opposed to the | |
default ConfigParser ``set`` method. The value becomes the new value of the | |
option as returned by :meth:`get_value`, and appends to the list of values | |
returned by :meth:`get_values`. | |
:param section: | |
Name of the section in which the option resides or should reside. | |
:param option: | |
Name of the option. | |
:param value: | |
Value to add to option. It must be a string or convertible to a string. | |
:return: | |
This instance | |
""" | |
if not self.has_section(section): | |
self.add_section(section) | |
self._sections[section].add(option, self._value_to_string(value)) | |
return self | |
def rename_section(self, section: str, new_name: str) -> "GitConfigParser": | |
"""Rename the given section to `new_name`. | |
:raise ValueError: | |
If: | |
* `section` doesn't exist. | |
* A section with `new_name` does already exist. | |
:return: | |
This instance | |
""" | |
if not self.has_section(section): | |
raise ValueError("Source section '%s' doesn't exist" % section) | |
if self.has_section(new_name): | |
raise ValueError("Destination section '%s' already exists" % new_name) | |
super().add_section(new_name) | |
new_section = self._sections[new_name] | |
for k, vs in self.items_all(section): | |
new_section.setall(k, vs) | |
# END for each value to copy | |
# This call writes back the changes, which is why we don't have the respective | |
# decorator. | |
self.remove_section(section) | |
return self | |