Spaces:
Runtime error
Runtime error
import datetime | |
import io | |
import logging | |
import os | |
import os.path as osp | |
import posixpath | |
import re | |
import shutil | |
import stat | |
import tempfile | |
from fsspec import AbstractFileSystem | |
from fsspec.compression import compr | |
from fsspec.core import get_compression | |
from fsspec.utils import isfilelike, stringify_path | |
logger = logging.getLogger("fsspec.local") | |
class LocalFileSystem(AbstractFileSystem): | |
"""Interface to files on local storage | |
Parameters | |
---------- | |
auto_mkdir: bool | |
Whether, when opening a file, the directory containing it should | |
be created (if it doesn't already exist). This is assumed by pyarrow | |
code. | |
""" | |
root_marker = "/" | |
protocol = "file" | |
local_file = True | |
def __init__(self, auto_mkdir=False, **kwargs): | |
super().__init__(**kwargs) | |
self.auto_mkdir = auto_mkdir | |
def fsid(self): | |
return "local" | |
def mkdir(self, path, create_parents=True, **kwargs): | |
path = self._strip_protocol(path) | |
if self.exists(path): | |
raise FileExistsError(path) | |
if create_parents: | |
self.makedirs(path, exist_ok=True) | |
else: | |
os.mkdir(path, **kwargs) | |
def makedirs(self, path, exist_ok=False): | |
path = self._strip_protocol(path) | |
os.makedirs(path, exist_ok=exist_ok) | |
def rmdir(self, path): | |
path = self._strip_protocol(path) | |
os.rmdir(path) | |
def ls(self, path, detail=False, **kwargs): | |
path = self._strip_protocol(path) | |
if detail: | |
with os.scandir(path) as it: | |
return [self.info(f) for f in it] | |
else: | |
return [posixpath.join(path, f) for f in os.listdir(path)] | |
def glob(self, path, **kwargs): | |
path = self._strip_protocol(path) | |
return super().glob(path, **kwargs) | |
def info(self, path, **kwargs): | |
if isinstance(path, os.DirEntry): | |
# scandir DirEntry | |
out = path.stat(follow_symlinks=False) | |
link = path.is_symlink() | |
if path.is_dir(follow_symlinks=False): | |
t = "directory" | |
elif path.is_file(follow_symlinks=False): | |
t = "file" | |
else: | |
t = "other" | |
path = self._strip_protocol(path.path) | |
else: | |
# str or path-like | |
path = self._strip_protocol(path) | |
out = os.stat(path, follow_symlinks=False) | |
link = stat.S_ISLNK(out.st_mode) | |
if link: | |
out = os.stat(path, follow_symlinks=True) | |
if stat.S_ISDIR(out.st_mode): | |
t = "directory" | |
elif stat.S_ISREG(out.st_mode): | |
t = "file" | |
else: | |
t = "other" | |
result = { | |
"name": path, | |
"size": out.st_size, | |
"type": t, | |
"created": out.st_ctime, | |
"islink": link, | |
} | |
for field in ["mode", "uid", "gid", "mtime", "ino", "nlink"]: | |
result[field] = getattr(out, "st_" + field) | |
if result["islink"]: | |
result["destination"] = os.readlink(path) | |
try: | |
out2 = os.stat(path, follow_symlinks=True) | |
result["size"] = out2.st_size | |
except OSError: | |
result["size"] = 0 | |
return result | |
def lexists(self, path, **kwargs): | |
return osp.lexists(path) | |
def cp_file(self, path1, path2, **kwargs): | |
path1 = self._strip_protocol(path1).rstrip("/") | |
path2 = self._strip_protocol(path2).rstrip("/") | |
if self.auto_mkdir: | |
self.makedirs(self._parent(path2), exist_ok=True) | |
if self.isfile(path1): | |
shutil.copyfile(path1, path2) | |
elif self.isdir(path1): | |
self.mkdirs(path2, exist_ok=True) | |
else: | |
raise FileNotFoundError(path1) | |
def get_file(self, path1, path2, callback=None, **kwargs): | |
if isfilelike(path2): | |
with open(path1, "rb") as f: | |
shutil.copyfileobj(f, path2) | |
else: | |
return self.cp_file(path1, path2, **kwargs) | |
def put_file(self, path1, path2, callback=None, **kwargs): | |
return self.cp_file(path1, path2, **kwargs) | |
def mv_file(self, path1, path2, **kwargs): | |
path1 = self._strip_protocol(path1).rstrip("/") | |
path2 = self._strip_protocol(path2).rstrip("/") | |
shutil.move(path1, path2) | |
def link(self, src, dst, **kwargs): | |
src = self._strip_protocol(src) | |
dst = self._strip_protocol(dst) | |
os.link(src, dst, **kwargs) | |
def symlink(self, src, dst, **kwargs): | |
src = self._strip_protocol(src) | |
dst = self._strip_protocol(dst) | |
os.symlink(src, dst, **kwargs) | |
def islink(self, path) -> bool: | |
return os.path.islink(self._strip_protocol(path)) | |
def rm_file(self, path): | |
os.remove(self._strip_protocol(path)) | |
def rm(self, path, recursive=False, maxdepth=None): | |
if not isinstance(path, list): | |
path = [path] | |
for p in path: | |
p = self._strip_protocol(p).rstrip("/") | |
if self.isdir(p): | |
if not recursive: | |
raise ValueError("Cannot delete directory, set recursive=True") | |
if osp.abspath(p) == os.getcwd(): | |
raise ValueError("Cannot delete current working directory") | |
shutil.rmtree(p) | |
else: | |
os.remove(p) | |
def unstrip_protocol(self, name): | |
name = self._strip_protocol(name) # normalise for local/win/... | |
return f"file://{name}" | |
def _open(self, path, mode="rb", block_size=None, **kwargs): | |
path = self._strip_protocol(path) | |
if self.auto_mkdir and "w" in mode: | |
self.makedirs(self._parent(path), exist_ok=True) | |
return LocalFileOpener(path, mode, fs=self, **kwargs) | |
def touch(self, path, truncate=True, **kwargs): | |
path = self._strip_protocol(path) | |
if self.auto_mkdir: | |
self.makedirs(self._parent(path), exist_ok=True) | |
if self.exists(path): | |
os.utime(path, None) | |
else: | |
open(path, "a").close() | |
if truncate: | |
os.truncate(path, 0) | |
def created(self, path): | |
info = self.info(path=path) | |
return datetime.datetime.utcfromtimestamp(info["created"]) | |
def modified(self, path): | |
info = self.info(path=path) | |
return datetime.datetime.utcfromtimestamp(info["mtime"]) | |
def _parent(cls, path): | |
path = cls._strip_protocol(path).rstrip("/") | |
if "/" in path: | |
return path.rsplit("/", 1)[0] | |
else: | |
return cls.root_marker | |
def _strip_protocol(cls, path): | |
path = stringify_path(path) | |
if path.startswith("file://"): | |
path = path[7:] | |
elif path.startswith("file:"): | |
path = path[5:] | |
return make_path_posix(path).rstrip("/") or cls.root_marker | |
def _isfilestore(self): | |
# Inheriting from DaskFileSystem makes this False (S3, etc. were) | |
# the original motivation. But we are a posix-like file system. | |
# See https://github.com/dask/dask/issues/5526 | |
return True | |
def chmod(self, path, mode): | |
path = stringify_path(path) | |
return os.chmod(path, mode) | |
def make_path_posix(path, sep=os.sep): | |
"""Make path generic""" | |
if isinstance(path, (list, set, tuple)): | |
return type(path)(make_path_posix(p) for p in path) | |
if "~" in path: | |
path = osp.expanduser(path) | |
if sep == "/": | |
# most common fast case for posix | |
if path.startswith("/"): | |
return path | |
if path.startswith("./"): | |
path = path[2:] | |
return os.getcwd() + "/" + path | |
if ( | |
(sep not in path and "/" not in path) | |
or (sep == "/" and not path.startswith("/")) | |
or (sep == "\\" and ":" not in path and not path.startswith("\\\\")) | |
): | |
# relative path like "path" or "rel\\path" (win) or rel/path" | |
if os.sep == "\\": | |
# abspath made some more '\\' separators | |
return make_path_posix(osp.abspath(path)) | |
else: | |
return os.getcwd() + "/" + path | |
if path.startswith("file://"): | |
path = path[7:] | |
if re.match("/[A-Za-z]:", path): | |
# for windows file URI like "file:///C:/folder/file" | |
# or "file:///C:\\dir\\file" | |
path = path[1:].replace("\\", "/").replace("//", "/") | |
if path.startswith("\\\\"): | |
# special case for windows UNC/DFS-style paths, do nothing, | |
# just flip the slashes around (case below does not work!) | |
return path.replace("\\", "/") | |
if re.match("[A-Za-z]:", path): | |
# windows full path like "C:\\local\\path" | |
return path.lstrip("\\").replace("\\", "/").replace("//", "/") | |
if path.startswith("\\"): | |
# windows network path like "\\server\\path" | |
return "/" + path.lstrip("\\").replace("\\", "/").replace("//", "/") | |
return path | |
def trailing_sep(path): | |
"""Return True if the path ends with a path separator. | |
A forward slash is always considered a path separator, even on Operating | |
Systems that normally use a backslash. | |
""" | |
# TODO: if all incoming paths were posix-compliant then separator would | |
# always be a forward slash, simplifying this function. | |
# See https://github.com/fsspec/filesystem_spec/pull/1250 | |
return path.endswith(os.sep) or (os.altsep is not None and path.endswith(os.altsep)) | |
def trailing_sep_maybe_asterisk(path): | |
"""Return True if the path ends with a path separator and optionally an | |
asterisk. | |
A forward slash is always considered a path separator, even on Operating | |
Systems that normally use a backslash. | |
""" | |
# TODO: if all incoming paths were posix-compliant then separator would | |
# always be a forward slash, simplifying this function. | |
# See https://github.com/fsspec/filesystem_spec/pull/1250 | |
return path.endswith((os.sep, os.sep + "*")) or ( | |
os.altsep is not None and path.endswith((os.altsep, os.altsep + "*")) | |
) | |
class LocalFileOpener(io.IOBase): | |
def __init__( | |
self, path, mode, autocommit=True, fs=None, compression=None, **kwargs | |
): | |
logger.debug("open file: %s", path) | |
self.path = path | |
self.mode = mode | |
self.fs = fs | |
self.f = None | |
self.autocommit = autocommit | |
self.compression = get_compression(path, compression) | |
self.blocksize = io.DEFAULT_BUFFER_SIZE | |
self._open() | |
def _open(self): | |
if self.f is None or self.f.closed: | |
if self.autocommit or "w" not in self.mode: | |
self.f = open(self.path, mode=self.mode) | |
if self.compression: | |
compress = compr[self.compression] | |
self.f = compress(self.f, mode=self.mode) | |
else: | |
# TODO: check if path is writable? | |
i, name = tempfile.mkstemp() | |
os.close(i) # we want normal open and normal buffered file | |
self.temp = name | |
self.f = open(name, mode=self.mode) | |
if "w" not in self.mode: | |
self.size = self.f.seek(0, 2) | |
self.f.seek(0) | |
self.f.size = self.size | |
def _fetch_range(self, start, end): | |
# probably only used by cached FS | |
if "r" not in self.mode: | |
raise ValueError | |
self._open() | |
self.f.seek(start) | |
return self.f.read(end - start) | |
def __setstate__(self, state): | |
self.f = None | |
loc = state.pop("loc", None) | |
self.__dict__.update(state) | |
if "r" in state["mode"]: | |
self.f = None | |
self._open() | |
self.f.seek(loc) | |
def __getstate__(self): | |
d = self.__dict__.copy() | |
d.pop("f") | |
if "r" in self.mode: | |
d["loc"] = self.f.tell() | |
else: | |
if not self.f.closed: | |
raise ValueError("Cannot serialise open write-mode local file") | |
return d | |
def commit(self): | |
if self.autocommit: | |
raise RuntimeError("Can only commit if not already set to autocommit") | |
shutil.move(self.temp, self.path) | |
def discard(self): | |
if self.autocommit: | |
raise RuntimeError("Cannot discard if set to autocommit") | |
os.remove(self.temp) | |
def readable(self) -> bool: | |
return True | |
def writable(self) -> bool: | |
return "r" not in self.mode | |
def read(self, *args, **kwargs): | |
return self.f.read(*args, **kwargs) | |
def write(self, *args, **kwargs): | |
return self.f.write(*args, **kwargs) | |
def tell(self, *args, **kwargs): | |
return self.f.tell(*args, **kwargs) | |
def seek(self, *args, **kwargs): | |
return self.f.seek(*args, **kwargs) | |
def seekable(self, *args, **kwargs): | |
return self.f.seekable(*args, **kwargs) | |
def readline(self, *args, **kwargs): | |
return self.f.readline(*args, **kwargs) | |
def readlines(self, *args, **kwargs): | |
return self.f.readlines(*args, **kwargs) | |
def close(self): | |
return self.f.close() | |
def closed(self): | |
return self.f.closed | |
def fileno(self): | |
return self.raw.fileno() | |
def flush(self) -> None: | |
self.f.flush() | |
def __iter__(self): | |
return self.f.__iter__() | |
def __getattr__(self, item): | |
return getattr(self.f, item) | |
def __enter__(self): | |
self._incontext = True | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self._incontext = False | |
self.f.__exit__(exc_type, exc_value, traceback) | |