|
import asyncio |
|
import functools |
|
import inspect |
|
|
|
from fsspec.asyn import AsyncFileSystem, running_async |
|
|
|
|
|
def async_wrapper(func, obj=None): |
|
""" |
|
Wraps a synchronous function to make it awaitable. |
|
|
|
Parameters |
|
---------- |
|
func : callable |
|
The synchronous function to wrap. |
|
obj : object, optional |
|
The instance to bind the function to, if applicable. |
|
|
|
Returns |
|
------- |
|
coroutine |
|
An awaitable version of the function. |
|
""" |
|
|
|
@functools.wraps(func) |
|
async def wrapper(*args, **kwargs): |
|
return await asyncio.to_thread(func, *args, **kwargs) |
|
|
|
return wrapper |
|
|
|
|
|
class AsyncFileSystemWrapper(AsyncFileSystem): |
|
""" |
|
A wrapper class to convert a synchronous filesystem into an asynchronous one. |
|
|
|
This class takes an existing synchronous filesystem implementation and wraps all |
|
its methods to provide an asynchronous interface. |
|
|
|
Parameters |
|
---------- |
|
sync_fs : AbstractFileSystem |
|
The synchronous filesystem instance to wrap. |
|
""" |
|
|
|
protocol = "async_wrapper" |
|
cachable = False |
|
|
|
def __init__(self, fs, *args, asynchronous=None, **kwargs): |
|
if asynchronous is None: |
|
asynchronous = running_async() |
|
super().__init__(*args, asynchronous=asynchronous, **kwargs) |
|
self.sync_fs = fs |
|
self.protocol = self.sync_fs.protocol |
|
self._wrap_all_sync_methods() |
|
|
|
@property |
|
def fsid(self): |
|
return f"async_{self.sync_fs.fsid}" |
|
|
|
def _wrap_all_sync_methods(self): |
|
""" |
|
Wrap all synchronous methods of the underlying filesystem with asynchronous versions. |
|
""" |
|
excluded_methods = {"open"} |
|
for method_name in dir(self.sync_fs): |
|
if method_name.startswith("_") or method_name in excluded_methods: |
|
continue |
|
|
|
attr = inspect.getattr_static(self.sync_fs, method_name) |
|
if isinstance(attr, property): |
|
continue |
|
|
|
method = getattr(self.sync_fs, method_name) |
|
if callable(method) and not asyncio.iscoroutinefunction(method): |
|
async_method = async_wrapper(method, obj=self) |
|
setattr(self, f"_{method_name}", async_method) |
|
|
|
@classmethod |
|
def wrap_class(cls, sync_fs_class): |
|
""" |
|
Create a new class that can be used to instantiate an AsyncFileSystemWrapper |
|
with lazy instantiation of the underlying synchronous filesystem. |
|
|
|
Parameters |
|
---------- |
|
sync_fs_class : type |
|
The class of the synchronous filesystem to wrap. |
|
|
|
Returns |
|
------- |
|
type |
|
A new class that wraps the provided synchronous filesystem class. |
|
""" |
|
|
|
class GeneratedAsyncFileSystemWrapper(cls): |
|
def __init__(self, *args, **kwargs): |
|
sync_fs = sync_fs_class(*args, **kwargs) |
|
super().__init__(sync_fs) |
|
|
|
GeneratedAsyncFileSystemWrapper.__name__ = ( |
|
f"Async{sync_fs_class.__name__}Wrapper" |
|
) |
|
return GeneratedAsyncFileSystemWrapper |
|
|