|
""" test feather-format compat """ |
|
import numpy as np |
|
import pytest |
|
|
|
import pandas as pd |
|
import pandas._testing as tm |
|
from pandas.core.arrays import ( |
|
ArrowStringArray, |
|
StringArray, |
|
) |
|
|
|
from pandas.io.feather_format import read_feather, to_feather |
|
|
|
pytestmark = pytest.mark.filterwarnings( |
|
"ignore:Passing a BlockManager to DataFrame:DeprecationWarning" |
|
) |
|
|
|
pa = pytest.importorskip("pyarrow") |
|
|
|
|
|
@pytest.mark.single_cpu |
|
class TestFeather: |
|
def check_error_on_write(self, df, exc, err_msg): |
|
|
|
|
|
|
|
with pytest.raises(exc, match=err_msg): |
|
with tm.ensure_clean() as path: |
|
to_feather(df, path) |
|
|
|
def check_external_error_on_write(self, df): |
|
|
|
|
|
|
|
with tm.external_error_raised(Exception): |
|
with tm.ensure_clean() as path: |
|
to_feather(df, path) |
|
|
|
def check_round_trip(self, df, expected=None, write_kwargs={}, **read_kwargs): |
|
if expected is None: |
|
expected = df.copy() |
|
|
|
with tm.ensure_clean() as path: |
|
to_feather(df, path, **write_kwargs) |
|
|
|
result = read_feather(path, **read_kwargs) |
|
|
|
tm.assert_frame_equal(result, expected) |
|
|
|
def test_error(self): |
|
msg = "feather only support IO with DataFrames" |
|
for obj in [ |
|
pd.Series([1, 2, 3]), |
|
1, |
|
"foo", |
|
pd.Timestamp("20130101"), |
|
np.array([1, 2, 3]), |
|
]: |
|
self.check_error_on_write(obj, ValueError, msg) |
|
|
|
def test_basic(self): |
|
df = pd.DataFrame( |
|
{ |
|
"string": list("abc"), |
|
"int": list(range(1, 4)), |
|
"uint": np.arange(3, 6).astype("u1"), |
|
"float": np.arange(4.0, 7.0, dtype="float64"), |
|
"float_with_null": [1.0, np.nan, 3], |
|
"bool": [True, False, True], |
|
"bool_with_null": [True, np.nan, False], |
|
"cat": pd.Categorical(list("abc")), |
|
"dt": pd.DatetimeIndex( |
|
list(pd.date_range("20130101", periods=3)), freq=None |
|
), |
|
"dttz": pd.DatetimeIndex( |
|
list(pd.date_range("20130101", periods=3, tz="US/Eastern")), |
|
freq=None, |
|
), |
|
"dt_with_null": [ |
|
pd.Timestamp("20130101"), |
|
pd.NaT, |
|
pd.Timestamp("20130103"), |
|
], |
|
"dtns": pd.DatetimeIndex( |
|
list(pd.date_range("20130101", periods=3, freq="ns")), freq=None |
|
), |
|
} |
|
) |
|
df["periods"] = pd.period_range("2013", freq="M", periods=3) |
|
df["timedeltas"] = pd.timedelta_range("1 day", periods=3) |
|
df["intervals"] = pd.interval_range(0, 3, 3) |
|
|
|
assert df.dttz.dtype.tz.zone == "US/Eastern" |
|
|
|
expected = df.copy() |
|
expected.loc[1, "bool_with_null"] = None |
|
self.check_round_trip(df, expected=expected) |
|
|
|
def test_duplicate_columns(self): |
|
|
|
|
|
df = pd.DataFrame(np.arange(12).reshape(4, 3), columns=list("aaa")).copy() |
|
self.check_external_error_on_write(df) |
|
|
|
def test_read_columns(self): |
|
|
|
df = pd.DataFrame( |
|
{ |
|
"col1": list("abc"), |
|
"col2": list(range(1, 4)), |
|
"col3": list("xyz"), |
|
"col4": list(range(4, 7)), |
|
} |
|
) |
|
columns = ["col1", "col3"] |
|
self.check_round_trip(df, expected=df[columns], columns=columns) |
|
|
|
def test_read_columns_different_order(self): |
|
|
|
df = pd.DataFrame({"A": [1, 2], "B": ["x", "y"], "C": [True, False]}) |
|
expected = df[["B", "A"]] |
|
self.check_round_trip(df, expected, columns=["B", "A"]) |
|
|
|
def test_unsupported_other(self): |
|
|
|
df = pd.DataFrame({"a": ["a", 1, 2.0]}) |
|
self.check_external_error_on_write(df) |
|
|
|
def test_rw_use_threads(self): |
|
df = pd.DataFrame({"A": np.arange(100000)}) |
|
self.check_round_trip(df, use_threads=True) |
|
self.check_round_trip(df, use_threads=False) |
|
|
|
def test_path_pathlib(self): |
|
df = pd.DataFrame( |
|
1.1 * np.arange(120).reshape((30, 4)), |
|
columns=pd.Index(list("ABCD"), dtype=object), |
|
index=pd.Index([f"i-{i}" for i in range(30)], dtype=object), |
|
).reset_index() |
|
result = tm.round_trip_pathlib(df.to_feather, read_feather) |
|
tm.assert_frame_equal(df, result) |
|
|
|
def test_path_localpath(self): |
|
df = pd.DataFrame( |
|
1.1 * np.arange(120).reshape((30, 4)), |
|
columns=pd.Index(list("ABCD"), dtype=object), |
|
index=pd.Index([f"i-{i}" for i in range(30)], dtype=object), |
|
).reset_index() |
|
result = tm.round_trip_localpath(df.to_feather, read_feather) |
|
tm.assert_frame_equal(df, result) |
|
|
|
def test_passthrough_keywords(self): |
|
df = pd.DataFrame( |
|
1.1 * np.arange(120).reshape((30, 4)), |
|
columns=pd.Index(list("ABCD"), dtype=object), |
|
index=pd.Index([f"i-{i}" for i in range(30)], dtype=object), |
|
).reset_index() |
|
self.check_round_trip(df, write_kwargs={"version": 1}) |
|
|
|
@pytest.mark.network |
|
@pytest.mark.single_cpu |
|
def test_http_path(self, feather_file, httpserver): |
|
|
|
expected = read_feather(feather_file) |
|
with open(feather_file, "rb") as f: |
|
httpserver.serve_content(content=f.read()) |
|
res = read_feather(httpserver.url) |
|
tm.assert_frame_equal(expected, res) |
|
|
|
def test_read_feather_dtype_backend(self, string_storage, dtype_backend): |
|
|
|
df = pd.DataFrame( |
|
{ |
|
"a": pd.Series([1, np.nan, 3], dtype="Int64"), |
|
"b": pd.Series([1, 2, 3], dtype="Int64"), |
|
"c": pd.Series([1.5, np.nan, 2.5], dtype="Float64"), |
|
"d": pd.Series([1.5, 2.0, 2.5], dtype="Float64"), |
|
"e": [True, False, None], |
|
"f": [True, False, True], |
|
"g": ["a", "b", "c"], |
|
"h": ["a", "b", None], |
|
} |
|
) |
|
|
|
if string_storage == "python": |
|
string_array = StringArray(np.array(["a", "b", "c"], dtype=np.object_)) |
|
string_array_na = StringArray(np.array(["a", "b", pd.NA], dtype=np.object_)) |
|
|
|
elif dtype_backend == "pyarrow": |
|
from pandas.arrays import ArrowExtensionArray |
|
|
|
string_array = ArrowExtensionArray(pa.array(["a", "b", "c"])) |
|
string_array_na = ArrowExtensionArray(pa.array(["a", "b", None])) |
|
|
|
else: |
|
string_array = ArrowStringArray(pa.array(["a", "b", "c"])) |
|
string_array_na = ArrowStringArray(pa.array(["a", "b", None])) |
|
|
|
with tm.ensure_clean() as path: |
|
to_feather(df, path) |
|
with pd.option_context("mode.string_storage", string_storage): |
|
result = read_feather(path, dtype_backend=dtype_backend) |
|
|
|
expected = pd.DataFrame( |
|
{ |
|
"a": pd.Series([1, np.nan, 3], dtype="Int64"), |
|
"b": pd.Series([1, 2, 3], dtype="Int64"), |
|
"c": pd.Series([1.5, np.nan, 2.5], dtype="Float64"), |
|
"d": pd.Series([1.5, 2.0, 2.5], dtype="Float64"), |
|
"e": pd.Series([True, False, pd.NA], dtype="boolean"), |
|
"f": pd.Series([True, False, True], dtype="boolean"), |
|
"g": string_array, |
|
"h": string_array_na, |
|
} |
|
) |
|
|
|
if dtype_backend == "pyarrow": |
|
from pandas.arrays import ArrowExtensionArray |
|
|
|
expected = pd.DataFrame( |
|
{ |
|
col: ArrowExtensionArray(pa.array(expected[col], from_pandas=True)) |
|
for col in expected.columns |
|
} |
|
) |
|
|
|
tm.assert_frame_equal(result, expected) |
|
|
|
def test_int_columns_and_index(self): |
|
df = pd.DataFrame({"a": [1, 2, 3]}, index=pd.Index([3, 4, 5], name="test")) |
|
self.check_round_trip(df) |
|
|
|
def test_invalid_dtype_backend(self): |
|
msg = ( |
|
"dtype_backend numpy is invalid, only 'numpy_nullable' and " |
|
"'pyarrow' are allowed." |
|
) |
|
df = pd.DataFrame({"int": list(range(1, 4))}) |
|
with tm.ensure_clean("tmp.feather") as path: |
|
df.to_feather(path) |
|
with pytest.raises(ValueError, match=msg): |
|
read_feather(path, dtype_backend="numpy") |
|
|
|
def test_string_inference(self, tmp_path): |
|
|
|
path = tmp_path / "test_string_inference.p" |
|
df = pd.DataFrame(data={"a": ["x", "y"]}) |
|
df.to_feather(path) |
|
with pd.option_context("future.infer_string", True): |
|
result = read_feather(path) |
|
expected = pd.DataFrame(data={"a": ["x", "y"]}, dtype="string[pyarrow_numpy]") |
|
tm.assert_frame_equal(result, expected) |
|
|