|
import io |
|
|
|
import numpy as np |
|
import pytest |
|
|
|
from pandas import ( |
|
DataFrame, |
|
date_range, |
|
read_csv, |
|
read_excel, |
|
read_feather, |
|
read_json, |
|
read_parquet, |
|
read_pickle, |
|
read_stata, |
|
read_table, |
|
) |
|
import pandas._testing as tm |
|
from pandas.util import _test_decorators as td |
|
|
|
pytestmark = pytest.mark.filterwarnings( |
|
"ignore:Passing a BlockManager to DataFrame:DeprecationWarning" |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def fsspectest(): |
|
pytest.importorskip("fsspec") |
|
from fsspec import register_implementation |
|
from fsspec.implementations.memory import MemoryFileSystem |
|
from fsspec.registry import _registry as registry |
|
|
|
class TestMemoryFS(MemoryFileSystem): |
|
protocol = "testmem" |
|
test = [None] |
|
|
|
def __init__(self, **kwargs) -> None: |
|
self.test[0] = kwargs.pop("test", None) |
|
super().__init__(**kwargs) |
|
|
|
register_implementation("testmem", TestMemoryFS, clobber=True) |
|
yield TestMemoryFS() |
|
registry.pop("testmem", None) |
|
TestMemoryFS.test[0] = None |
|
TestMemoryFS.store.clear() |
|
|
|
|
|
@pytest.fixture |
|
def df1(): |
|
return DataFrame( |
|
{ |
|
"int": [1, 3], |
|
"float": [2.0, np.nan], |
|
"str": ["t", "s"], |
|
"dt": date_range("2018-06-18", periods=2), |
|
} |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def cleared_fs(): |
|
fsspec = pytest.importorskip("fsspec") |
|
|
|
memfs = fsspec.filesystem("memory") |
|
yield memfs |
|
memfs.store.clear() |
|
|
|
|
|
def test_read_csv(cleared_fs, df1): |
|
text = str(df1.to_csv(index=False)).encode() |
|
with cleared_fs.open("test/test.csv", "wb") as w: |
|
w.write(text) |
|
df2 = read_csv("memory://test/test.csv", parse_dates=["dt"]) |
|
|
|
tm.assert_frame_equal(df1, df2) |
|
|
|
|
|
def test_reasonable_error(monkeypatch, cleared_fs): |
|
from fsspec.registry import known_implementations |
|
|
|
with pytest.raises(ValueError, match="nosuchprotocol"): |
|
read_csv("nosuchprotocol://test/test.csv") |
|
err_msg = "test error message" |
|
monkeypatch.setitem( |
|
known_implementations, |
|
"couldexist", |
|
{"class": "unimportable.CouldExist", "err": err_msg}, |
|
) |
|
with pytest.raises(ImportError, match=err_msg): |
|
read_csv("couldexist://test/test.csv") |
|
|
|
|
|
def test_to_csv(cleared_fs, df1): |
|
df1.to_csv("memory://test/test.csv", index=True) |
|
|
|
df2 = read_csv("memory://test/test.csv", parse_dates=["dt"], index_col=0) |
|
|
|
tm.assert_frame_equal(df1, df2) |
|
|
|
|
|
def test_to_excel(cleared_fs, df1): |
|
pytest.importorskip("openpyxl") |
|
ext = "xlsx" |
|
path = f"memory://test/test.{ext}" |
|
df1.to_excel(path, index=True) |
|
|
|
df2 = read_excel(path, parse_dates=["dt"], index_col=0) |
|
|
|
tm.assert_frame_equal(df1, df2) |
|
|
|
|
|
@pytest.mark.parametrize("binary_mode", [False, True]) |
|
def test_to_csv_fsspec_object(cleared_fs, binary_mode, df1): |
|
fsspec = pytest.importorskip("fsspec") |
|
|
|
path = "memory://test/test.csv" |
|
mode = "wb" if binary_mode else "w" |
|
with fsspec.open(path, mode=mode).open() as fsspec_object: |
|
df1.to_csv(fsspec_object, index=True) |
|
assert not fsspec_object.closed |
|
|
|
mode = mode.replace("w", "r") |
|
with fsspec.open(path, mode=mode) as fsspec_object: |
|
df2 = read_csv( |
|
fsspec_object, |
|
parse_dates=["dt"], |
|
index_col=0, |
|
) |
|
assert not fsspec_object.closed |
|
|
|
tm.assert_frame_equal(df1, df2) |
|
|
|
|
|
def test_csv_options(fsspectest): |
|
df = DataFrame({"a": [0]}) |
|
df.to_csv( |
|
"testmem://test/test.csv", storage_options={"test": "csv_write"}, index=False |
|
) |
|
assert fsspectest.test[0] == "csv_write" |
|
read_csv("testmem://test/test.csv", storage_options={"test": "csv_read"}) |
|
assert fsspectest.test[0] == "csv_read" |
|
|
|
|
|
def test_read_table_options(fsspectest): |
|
|
|
df = DataFrame({"a": [0]}) |
|
df.to_csv( |
|
"testmem://test/test.csv", storage_options={"test": "csv_write"}, index=False |
|
) |
|
assert fsspectest.test[0] == "csv_write" |
|
read_table("testmem://test/test.csv", storage_options={"test": "csv_read"}) |
|
assert fsspectest.test[0] == "csv_read" |
|
|
|
|
|
def test_excel_options(fsspectest): |
|
pytest.importorskip("openpyxl") |
|
extension = "xlsx" |
|
|
|
df = DataFrame({"a": [0]}) |
|
|
|
path = f"testmem://test/test.{extension}" |
|
|
|
df.to_excel(path, storage_options={"test": "write"}, index=False) |
|
assert fsspectest.test[0] == "write" |
|
read_excel(path, storage_options={"test": "read"}) |
|
assert fsspectest.test[0] == "read" |
|
|
|
|
|
def test_to_parquet_new_file(cleared_fs, df1): |
|
"""Regression test for writing to a not-yet-existent GCS Parquet file.""" |
|
pytest.importorskip("fastparquet") |
|
|
|
df1.to_parquet( |
|
"memory://test/test.csv", index=True, engine="fastparquet", compression=None |
|
) |
|
|
|
|
|
def test_arrowparquet_options(fsspectest): |
|
"""Regression test for writing to a not-yet-existent GCS Parquet file.""" |
|
pytest.importorskip("pyarrow") |
|
df = DataFrame({"a": [0]}) |
|
df.to_parquet( |
|
"testmem://test/test.csv", |
|
engine="pyarrow", |
|
compression=None, |
|
storage_options={"test": "parquet_write"}, |
|
) |
|
assert fsspectest.test[0] == "parquet_write" |
|
read_parquet( |
|
"testmem://test/test.csv", |
|
engine="pyarrow", |
|
storage_options={"test": "parquet_read"}, |
|
) |
|
assert fsspectest.test[0] == "parquet_read" |
|
|
|
|
|
@td.skip_array_manager_not_yet_implemented |
|
def test_fastparquet_options(fsspectest): |
|
"""Regression test for writing to a not-yet-existent GCS Parquet file.""" |
|
pytest.importorskip("fastparquet") |
|
|
|
df = DataFrame({"a": [0]}) |
|
df.to_parquet( |
|
"testmem://test/test.csv", |
|
engine="fastparquet", |
|
compression=None, |
|
storage_options={"test": "parquet_write"}, |
|
) |
|
assert fsspectest.test[0] == "parquet_write" |
|
read_parquet( |
|
"testmem://test/test.csv", |
|
engine="fastparquet", |
|
storage_options={"test": "parquet_read"}, |
|
) |
|
assert fsspectest.test[0] == "parquet_read" |
|
|
|
|
|
@pytest.mark.single_cpu |
|
def test_from_s3_csv(s3_public_bucket_with_data, tips_file, s3so): |
|
pytest.importorskip("s3fs") |
|
tm.assert_equal( |
|
read_csv( |
|
f"s3://{s3_public_bucket_with_data.name}/tips.csv", storage_options=s3so |
|
), |
|
read_csv(tips_file), |
|
) |
|
|
|
tm.assert_equal( |
|
read_csv( |
|
f"s3://{s3_public_bucket_with_data.name}/tips.csv.gz", storage_options=s3so |
|
), |
|
read_csv(tips_file), |
|
) |
|
tm.assert_equal( |
|
read_csv( |
|
f"s3://{s3_public_bucket_with_data.name}/tips.csv.bz2", storage_options=s3so |
|
), |
|
read_csv(tips_file), |
|
) |
|
|
|
|
|
@pytest.mark.single_cpu |
|
@pytest.mark.parametrize("protocol", ["s3", "s3a", "s3n"]) |
|
def test_s3_protocols(s3_public_bucket_with_data, tips_file, protocol, s3so): |
|
pytest.importorskip("s3fs") |
|
tm.assert_equal( |
|
read_csv( |
|
f"{protocol}://{s3_public_bucket_with_data.name}/tips.csv", |
|
storage_options=s3so, |
|
), |
|
read_csv(tips_file), |
|
) |
|
|
|
|
|
@pytest.mark.single_cpu |
|
@td.skip_array_manager_not_yet_implemented |
|
def test_s3_parquet(s3_public_bucket, s3so, df1): |
|
pytest.importorskip("fastparquet") |
|
pytest.importorskip("s3fs") |
|
|
|
fn = f"s3://{s3_public_bucket.name}/test.parquet" |
|
df1.to_parquet( |
|
fn, index=False, engine="fastparquet", compression=None, storage_options=s3so |
|
) |
|
df2 = read_parquet(fn, engine="fastparquet", storage_options=s3so) |
|
tm.assert_equal(df1, df2) |
|
|
|
|
|
@td.skip_if_installed("fsspec") |
|
def test_not_present_exception(): |
|
msg = "Missing optional dependency 'fsspec'|fsspec library is required" |
|
with pytest.raises(ImportError, match=msg): |
|
read_csv("memory://test/test.csv") |
|
|
|
|
|
def test_feather_options(fsspectest): |
|
pytest.importorskip("pyarrow") |
|
df = DataFrame({"a": [0]}) |
|
df.to_feather("testmem://mockfile", storage_options={"test": "feather_write"}) |
|
assert fsspectest.test[0] == "feather_write" |
|
out = read_feather("testmem://mockfile", storage_options={"test": "feather_read"}) |
|
assert fsspectest.test[0] == "feather_read" |
|
tm.assert_frame_equal(df, out) |
|
|
|
|
|
def test_pickle_options(fsspectest): |
|
df = DataFrame({"a": [0]}) |
|
df.to_pickle("testmem://mockfile", storage_options={"test": "pickle_write"}) |
|
assert fsspectest.test[0] == "pickle_write" |
|
out = read_pickle("testmem://mockfile", storage_options={"test": "pickle_read"}) |
|
assert fsspectest.test[0] == "pickle_read" |
|
tm.assert_frame_equal(df, out) |
|
|
|
|
|
def test_json_options(fsspectest, compression): |
|
df = DataFrame({"a": [0]}) |
|
df.to_json( |
|
"testmem://mockfile", |
|
compression=compression, |
|
storage_options={"test": "json_write"}, |
|
) |
|
assert fsspectest.test[0] == "json_write" |
|
out = read_json( |
|
"testmem://mockfile", |
|
compression=compression, |
|
storage_options={"test": "json_read"}, |
|
) |
|
assert fsspectest.test[0] == "json_read" |
|
tm.assert_frame_equal(df, out) |
|
|
|
|
|
def test_stata_options(fsspectest): |
|
df = DataFrame({"a": [0]}) |
|
df.to_stata( |
|
"testmem://mockfile", storage_options={"test": "stata_write"}, write_index=False |
|
) |
|
assert fsspectest.test[0] == "stata_write" |
|
out = read_stata("testmem://mockfile", storage_options={"test": "stata_read"}) |
|
assert fsspectest.test[0] == "stata_read" |
|
tm.assert_frame_equal(df, out.astype("int64")) |
|
|
|
|
|
def test_markdown_options(fsspectest): |
|
pytest.importorskip("tabulate") |
|
df = DataFrame({"a": [0]}) |
|
df.to_markdown("testmem://mockfile", storage_options={"test": "md_write"}) |
|
assert fsspectest.test[0] == "md_write" |
|
assert fsspectest.cat("testmem://mockfile") |
|
|
|
|
|
def test_non_fsspec_options(): |
|
pytest.importorskip("pyarrow") |
|
with pytest.raises(ValueError, match="storage_options"): |
|
read_csv("localfile", storage_options={"a": True}) |
|
with pytest.raises(ValueError, match="storage_options"): |
|
|
|
read_parquet("localfile", storage_options={"a": True}) |
|
by = io.BytesIO() |
|
|
|
with pytest.raises(ValueError, match="storage_options"): |
|
read_csv(by, storage_options={"a": True}) |
|
|
|
df = DataFrame({"a": [0]}) |
|
with pytest.raises(ValueError, match="storage_options"): |
|
df.to_parquet("nonfsspecpath", storage_options={"a": True}) |
|
|