|
""" |
|
Collection of query wrappers / abstractions to both facilitate data |
|
retrieval and to reduce dependency on DB-specific API. |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
from abc import ( |
|
ABC, |
|
abstractmethod, |
|
) |
|
from contextlib import ( |
|
ExitStack, |
|
contextmanager, |
|
) |
|
from datetime import ( |
|
date, |
|
datetime, |
|
time, |
|
) |
|
from functools import partial |
|
import re |
|
from typing import ( |
|
TYPE_CHECKING, |
|
Any, |
|
Callable, |
|
Literal, |
|
cast, |
|
overload, |
|
) |
|
import warnings |
|
|
|
import numpy as np |
|
|
|
from pandas._config import using_pyarrow_string_dtype |
|
|
|
from pandas._libs import lib |
|
from pandas.compat._optional import import_optional_dependency |
|
from pandas.errors import ( |
|
AbstractMethodError, |
|
DatabaseError, |
|
) |
|
from pandas.util._exceptions import find_stack_level |
|
from pandas.util._validators import check_dtype_backend |
|
|
|
from pandas.core.dtypes.common import ( |
|
is_dict_like, |
|
is_list_like, |
|
) |
|
from pandas.core.dtypes.dtypes import ( |
|
ArrowDtype, |
|
DatetimeTZDtype, |
|
) |
|
from pandas.core.dtypes.missing import isna |
|
|
|
from pandas import get_option |
|
from pandas.core.api import ( |
|
DataFrame, |
|
Series, |
|
) |
|
from pandas.core.arrays import ArrowExtensionArray |
|
from pandas.core.base import PandasObject |
|
import pandas.core.common as com |
|
from pandas.core.common import maybe_make_list |
|
from pandas.core.internals.construction import convert_object_array |
|
from pandas.core.tools.datetimes import to_datetime |
|
|
|
if TYPE_CHECKING: |
|
from collections.abc import ( |
|
Iterator, |
|
Mapping, |
|
) |
|
|
|
from sqlalchemy import Table |
|
from sqlalchemy.sql.expression import ( |
|
Select, |
|
TextClause, |
|
) |
|
|
|
from pandas._typing import ( |
|
DateTimeErrorChoices, |
|
DtypeArg, |
|
DtypeBackend, |
|
IndexLabel, |
|
Self, |
|
) |
|
|
|
from pandas import Index |
|
|
|
|
|
|
|
|
|
|
|
def _process_parse_dates_argument(parse_dates): |
|
"""Process parse_dates argument for read_sql functions""" |
|
|
|
if parse_dates is True or parse_dates is None or parse_dates is False: |
|
parse_dates = [] |
|
|
|
elif not hasattr(parse_dates, "__iter__"): |
|
parse_dates = [parse_dates] |
|
return parse_dates |
|
|
|
|
|
def _handle_date_column( |
|
col, utc: bool = False, format: str | dict[str, Any] | None = None |
|
): |
|
if isinstance(format, dict): |
|
|
|
|
|
|
|
|
|
error: DateTimeErrorChoices = format.pop("errors", None) or "ignore" |
|
if error == "ignore": |
|
try: |
|
return to_datetime(col, **format) |
|
except (TypeError, ValueError): |
|
|
|
return col |
|
return to_datetime(col, errors=error, **format) |
|
else: |
|
|
|
|
|
if format is None and ( |
|
issubclass(col.dtype.type, np.floating) |
|
or issubclass(col.dtype.type, np.integer) |
|
): |
|
format = "s" |
|
if format in ["D", "d", "h", "m", "s", "ms", "us", "ns"]: |
|
return to_datetime(col, errors="coerce", unit=format, utc=utc) |
|
elif isinstance(col.dtype, DatetimeTZDtype): |
|
|
|
|
|
return to_datetime(col, utc=True) |
|
else: |
|
return to_datetime(col, errors="coerce", format=format, utc=utc) |
|
|
|
|
|
def _parse_date_columns(data_frame, parse_dates): |
|
""" |
|
Force non-datetime columns to be read as such. |
|
Supports both string formatted and integer timestamp columns. |
|
""" |
|
parse_dates = _process_parse_dates_argument(parse_dates) |
|
|
|
|
|
|
|
|
|
for i, (col_name, df_col) in enumerate(data_frame.items()): |
|
if isinstance(df_col.dtype, DatetimeTZDtype) or col_name in parse_dates: |
|
try: |
|
fmt = parse_dates[col_name] |
|
except (KeyError, TypeError): |
|
fmt = None |
|
data_frame.isetitem(i, _handle_date_column(df_col, format=fmt)) |
|
|
|
return data_frame |
|
|
|
|
|
def _convert_arrays_to_dataframe( |
|
data, |
|
columns, |
|
coerce_float: bool = True, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> DataFrame: |
|
content = lib.to_object_array_tuples(data) |
|
arrays = convert_object_array( |
|
list(content.T), |
|
dtype=None, |
|
coerce_float=coerce_float, |
|
dtype_backend=dtype_backend, |
|
) |
|
if dtype_backend == "pyarrow": |
|
pa = import_optional_dependency("pyarrow") |
|
|
|
result_arrays = [] |
|
for arr in arrays: |
|
pa_array = pa.array(arr, from_pandas=True) |
|
if arr.dtype == "string": |
|
|
|
|
|
|
|
pa_array = pa_array.cast(pa.string()) |
|
result_arrays.append(ArrowExtensionArray(pa_array)) |
|
arrays = result_arrays |
|
if arrays: |
|
df = DataFrame(dict(zip(list(range(len(columns))), arrays))) |
|
df.columns = columns |
|
return df |
|
else: |
|
return DataFrame(columns=columns) |
|
|
|
|
|
def _wrap_result( |
|
data, |
|
columns, |
|
index_col=None, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
dtype: DtypeArg | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
): |
|
"""Wrap result set of a SQLAlchemy query in a DataFrame.""" |
|
frame = _convert_arrays_to_dataframe(data, columns, coerce_float, dtype_backend) |
|
|
|
if dtype: |
|
frame = frame.astype(dtype) |
|
|
|
frame = _parse_date_columns(frame, parse_dates) |
|
|
|
if index_col is not None: |
|
frame = frame.set_index(index_col) |
|
|
|
return frame |
|
|
|
|
|
def _wrap_result_adbc( |
|
df: DataFrame, |
|
*, |
|
index_col=None, |
|
parse_dates=None, |
|
dtype: DtypeArg | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> DataFrame: |
|
"""Wrap result set of a SQLAlchemy query in a DataFrame.""" |
|
if dtype: |
|
df = df.astype(dtype) |
|
|
|
df = _parse_date_columns(df, parse_dates) |
|
|
|
if index_col is not None: |
|
df = df.set_index(index_col) |
|
|
|
return df |
|
|
|
|
|
def execute(sql, con, params=None): |
|
""" |
|
Execute the given SQL query using the provided connection object. |
|
|
|
Parameters |
|
---------- |
|
sql : string |
|
SQL query to be executed. |
|
con : SQLAlchemy connection or sqlite3 connection |
|
If a DBAPI2 object, only sqlite3 is supported. |
|
params : list or tuple, optional, default: None |
|
List of parameters to pass to execute method. |
|
|
|
Returns |
|
------- |
|
Results Iterable |
|
""" |
|
warnings.warn( |
|
"`pandas.io.sql.execute` is deprecated and " |
|
"will be removed in the future version.", |
|
FutureWarning, |
|
stacklevel=find_stack_level(), |
|
) |
|
sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore") |
|
|
|
if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Engine)): |
|
raise TypeError("pandas.io.sql.execute requires a connection") |
|
with pandasSQL_builder(con, need_transaction=True) as pandas_sql: |
|
return pandas_sql.execute(sql, params) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@overload |
|
def read_sql_table( |
|
table_name: str, |
|
con, |
|
schema=..., |
|
index_col: str | list[str] | None = ..., |
|
coerce_float=..., |
|
parse_dates: list[str] | dict[str, str] | None = ..., |
|
columns: list[str] | None = ..., |
|
chunksize: None = ..., |
|
dtype_backend: DtypeBackend | lib.NoDefault = ..., |
|
) -> DataFrame: |
|
... |
|
|
|
|
|
@overload |
|
def read_sql_table( |
|
table_name: str, |
|
con, |
|
schema=..., |
|
index_col: str | list[str] | None = ..., |
|
coerce_float=..., |
|
parse_dates: list[str] | dict[str, str] | None = ..., |
|
columns: list[str] | None = ..., |
|
chunksize: int = ..., |
|
dtype_backend: DtypeBackend | lib.NoDefault = ..., |
|
) -> Iterator[DataFrame]: |
|
... |
|
|
|
|
|
def read_sql_table( |
|
table_name: str, |
|
con, |
|
schema: str | None = None, |
|
index_col: str | list[str] | None = None, |
|
coerce_float: bool = True, |
|
parse_dates: list[str] | dict[str, str] | None = None, |
|
columns: list[str] | None = None, |
|
chunksize: int | None = None, |
|
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default, |
|
) -> DataFrame | Iterator[DataFrame]: |
|
""" |
|
Read SQL database table into a DataFrame. |
|
|
|
Given a table name and a SQLAlchemy connectable, returns a DataFrame. |
|
This function does not support DBAPI connections. |
|
|
|
Parameters |
|
---------- |
|
table_name : str |
|
Name of SQL table in database. |
|
con : SQLAlchemy connectable or str |
|
A database URI could be provided as str. |
|
SQLite DBAPI connection mode not supported. |
|
schema : str, default None |
|
Name of SQL schema in database to query (if database flavor |
|
supports this). Uses default schema if None (default). |
|
index_col : str or list of str, optional, default: None |
|
Column(s) to set as index(MultiIndex). |
|
coerce_float : bool, default True |
|
Attempts to convert values of non-string, non-numeric objects (like |
|
decimal.Decimal) to floating point. Can result in loss of Precision. |
|
parse_dates : list or dict, default None |
|
- List of column names to parse as dates. |
|
- Dict of ``{column_name: format string}`` where format string is |
|
strftime compatible in case of parsing string times or is one of |
|
(D, s, ns, ms, us) in case of parsing integer timestamps. |
|
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds |
|
to the keyword arguments of :func:`pandas.to_datetime` |
|
Especially useful with databases without native Datetime support, |
|
such as SQLite. |
|
columns : list, default None |
|
List of column names to select from SQL table. |
|
chunksize : int, default None |
|
If specified, returns an iterator where `chunksize` is the number of |
|
rows to include in each chunk. |
|
dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable' |
|
Back-end data type applied to the resultant :class:`DataFrame` |
|
(still experimental). Behaviour is as follows: |
|
|
|
* ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame` |
|
(default). |
|
* ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype` |
|
DataFrame. |
|
|
|
.. versionadded:: 2.0 |
|
|
|
Returns |
|
------- |
|
DataFrame or Iterator[DataFrame] |
|
A SQL table is returned as two-dimensional data structure with labeled |
|
axes. |
|
|
|
See Also |
|
-------- |
|
read_sql_query : Read SQL query into a DataFrame. |
|
read_sql : Read SQL query or database table into a DataFrame. |
|
|
|
Notes |
|
----- |
|
Any datetime values with time zone information will be converted to UTC. |
|
|
|
Examples |
|
-------- |
|
>>> pd.read_sql_table('table_name', 'postgres:///db_name') # doctest:+SKIP |
|
""" |
|
|
|
check_dtype_backend(dtype_backend) |
|
if dtype_backend is lib.no_default: |
|
dtype_backend = "numpy" |
|
assert dtype_backend is not lib.no_default |
|
|
|
with pandasSQL_builder(con, schema=schema, need_transaction=True) as pandas_sql: |
|
if not pandas_sql.has_table(table_name): |
|
raise ValueError(f"Table {table_name} not found") |
|
|
|
table = pandas_sql.read_table( |
|
table_name, |
|
index_col=index_col, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
columns=columns, |
|
chunksize=chunksize, |
|
dtype_backend=dtype_backend, |
|
) |
|
|
|
if table is not None: |
|
return table |
|
else: |
|
raise ValueError(f"Table {table_name} not found", con) |
|
|
|
|
|
@overload |
|
def read_sql_query( |
|
sql, |
|
con, |
|
index_col: str | list[str] | None = ..., |
|
coerce_float=..., |
|
params: list[Any] | Mapping[str, Any] | None = ..., |
|
parse_dates: list[str] | dict[str, str] | None = ..., |
|
chunksize: None = ..., |
|
dtype: DtypeArg | None = ..., |
|
dtype_backend: DtypeBackend | lib.NoDefault = ..., |
|
) -> DataFrame: |
|
... |
|
|
|
|
|
@overload |
|
def read_sql_query( |
|
sql, |
|
con, |
|
index_col: str | list[str] | None = ..., |
|
coerce_float=..., |
|
params: list[Any] | Mapping[str, Any] | None = ..., |
|
parse_dates: list[str] | dict[str, str] | None = ..., |
|
chunksize: int = ..., |
|
dtype: DtypeArg | None = ..., |
|
dtype_backend: DtypeBackend | lib.NoDefault = ..., |
|
) -> Iterator[DataFrame]: |
|
... |
|
|
|
|
|
def read_sql_query( |
|
sql, |
|
con, |
|
index_col: str | list[str] | None = None, |
|
coerce_float: bool = True, |
|
params: list[Any] | Mapping[str, Any] | None = None, |
|
parse_dates: list[str] | dict[str, str] | None = None, |
|
chunksize: int | None = None, |
|
dtype: DtypeArg | None = None, |
|
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default, |
|
) -> DataFrame | Iterator[DataFrame]: |
|
""" |
|
Read SQL query into a DataFrame. |
|
|
|
Returns a DataFrame corresponding to the result set of the query |
|
string. Optionally provide an `index_col` parameter to use one of the |
|
columns as the index, otherwise default integer index will be used. |
|
|
|
Parameters |
|
---------- |
|
sql : str SQL query or SQLAlchemy Selectable (select or text object) |
|
SQL query to be executed. |
|
con : SQLAlchemy connectable, str, or sqlite3 connection |
|
Using SQLAlchemy makes it possible to use any DB supported by that |
|
library. If a DBAPI2 object, only sqlite3 is supported. |
|
index_col : str or list of str, optional, default: None |
|
Column(s) to set as index(MultiIndex). |
|
coerce_float : bool, default True |
|
Attempts to convert values of non-string, non-numeric objects (like |
|
decimal.Decimal) to floating point. Useful for SQL result sets. |
|
params : list, tuple or mapping, optional, default: None |
|
List of parameters to pass to execute method. The syntax used |
|
to pass parameters is database driver dependent. Check your |
|
database driver documentation for which of the five syntax styles, |
|
described in PEP 249's paramstyle, is supported. |
|
Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}. |
|
parse_dates : list or dict, default: None |
|
- List of column names to parse as dates. |
|
- Dict of ``{column_name: format string}`` where format string is |
|
strftime compatible in case of parsing string times, or is one of |
|
(D, s, ns, ms, us) in case of parsing integer timestamps. |
|
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds |
|
to the keyword arguments of :func:`pandas.to_datetime` |
|
Especially useful with databases without native Datetime support, |
|
such as SQLite. |
|
chunksize : int, default None |
|
If specified, return an iterator where `chunksize` is the number of |
|
rows to include in each chunk. |
|
dtype : Type name or dict of columns |
|
Data type for data or columns. E.g. np.float64 or |
|
{'a': np.float64, 'b': np.int32, 'c': 'Int64'}. |
|
|
|
.. versionadded:: 1.3.0 |
|
dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable' |
|
Back-end data type applied to the resultant :class:`DataFrame` |
|
(still experimental). Behaviour is as follows: |
|
|
|
* ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame` |
|
(default). |
|
* ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype` |
|
DataFrame. |
|
|
|
.. versionadded:: 2.0 |
|
|
|
Returns |
|
------- |
|
DataFrame or Iterator[DataFrame] |
|
|
|
See Also |
|
-------- |
|
read_sql_table : Read SQL database table into a DataFrame. |
|
read_sql : Read SQL query or database table into a DataFrame. |
|
|
|
Notes |
|
----- |
|
Any datetime values with time zone information parsed via the `parse_dates` |
|
parameter will be converted to UTC. |
|
|
|
Examples |
|
-------- |
|
>>> from sqlalchemy import create_engine # doctest: +SKIP |
|
>>> engine = create_engine("sqlite:///database.db") # doctest: +SKIP |
|
>>> with engine.connect() as conn, conn.begin(): # doctest: +SKIP |
|
... data = pd.read_sql_table("data", conn) # doctest: +SKIP |
|
""" |
|
|
|
check_dtype_backend(dtype_backend) |
|
if dtype_backend is lib.no_default: |
|
dtype_backend = "numpy" |
|
assert dtype_backend is not lib.no_default |
|
|
|
with pandasSQL_builder(con) as pandas_sql: |
|
return pandas_sql.read_query( |
|
sql, |
|
index_col=index_col, |
|
params=params, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
chunksize=chunksize, |
|
dtype=dtype, |
|
dtype_backend=dtype_backend, |
|
) |
|
|
|
|
|
@overload |
|
def read_sql( |
|
sql, |
|
con, |
|
index_col: str | list[str] | None = ..., |
|
coerce_float=..., |
|
params=..., |
|
parse_dates=..., |
|
columns: list[str] = ..., |
|
chunksize: None = ..., |
|
dtype_backend: DtypeBackend | lib.NoDefault = ..., |
|
dtype: DtypeArg | None = None, |
|
) -> DataFrame: |
|
... |
|
|
|
|
|
@overload |
|
def read_sql( |
|
sql, |
|
con, |
|
index_col: str | list[str] | None = ..., |
|
coerce_float=..., |
|
params=..., |
|
parse_dates=..., |
|
columns: list[str] = ..., |
|
chunksize: int = ..., |
|
dtype_backend: DtypeBackend | lib.NoDefault = ..., |
|
dtype: DtypeArg | None = None, |
|
) -> Iterator[DataFrame]: |
|
... |
|
|
|
|
|
def read_sql( |
|
sql, |
|
con, |
|
index_col: str | list[str] | None = None, |
|
coerce_float: bool = True, |
|
params=None, |
|
parse_dates=None, |
|
columns: list[str] | None = None, |
|
chunksize: int | None = None, |
|
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default, |
|
dtype: DtypeArg | None = None, |
|
) -> DataFrame | Iterator[DataFrame]: |
|
""" |
|
Read SQL query or database table into a DataFrame. |
|
|
|
This function is a convenience wrapper around ``read_sql_table`` and |
|
``read_sql_query`` (for backward compatibility). It will delegate |
|
to the specific function depending on the provided input. A SQL query |
|
will be routed to ``read_sql_query``, while a database table name will |
|
be routed to ``read_sql_table``. Note that the delegated function might |
|
have more specific notes about their functionality not listed here. |
|
|
|
Parameters |
|
---------- |
|
sql : str or SQLAlchemy Selectable (select or text object) |
|
SQL query to be executed or a table name. |
|
con : ADBC Connection, SQLAlchemy connectable, str, or sqlite3 connection |
|
ADBC provides high performance I/O with native type support, where available. |
|
Using SQLAlchemy makes it possible to use any DB supported by that |
|
library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible |
|
for engine disposal and connection closure for the ADBC connection and |
|
SQLAlchemy connectable; str connections are closed automatically. See |
|
`here <https://docs.sqlalchemy.org/en/20/core/connections.html>`_. |
|
index_col : str or list of str, optional, default: None |
|
Column(s) to set as index(MultiIndex). |
|
coerce_float : bool, default True |
|
Attempts to convert values of non-string, non-numeric objects (like |
|
decimal.Decimal) to floating point, useful for SQL result sets. |
|
params : list, tuple or dict, optional, default: None |
|
List of parameters to pass to execute method. The syntax used |
|
to pass parameters is database driver dependent. Check your |
|
database driver documentation for which of the five syntax styles, |
|
described in PEP 249's paramstyle, is supported. |
|
Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}. |
|
parse_dates : list or dict, default: None |
|
- List of column names to parse as dates. |
|
- Dict of ``{column_name: format string}`` where format string is |
|
strftime compatible in case of parsing string times, or is one of |
|
(D, s, ns, ms, us) in case of parsing integer timestamps. |
|
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds |
|
to the keyword arguments of :func:`pandas.to_datetime` |
|
Especially useful with databases without native Datetime support, |
|
such as SQLite. |
|
columns : list, default: None |
|
List of column names to select from SQL table (only used when reading |
|
a table). |
|
chunksize : int, default None |
|
If specified, return an iterator where `chunksize` is the |
|
number of rows to include in each chunk. |
|
dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable' |
|
Back-end data type applied to the resultant :class:`DataFrame` |
|
(still experimental). Behaviour is as follows: |
|
|
|
* ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame` |
|
(default). |
|
* ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype` |
|
DataFrame. |
|
|
|
.. versionadded:: 2.0 |
|
dtype : Type name or dict of columns |
|
Data type for data or columns. E.g. np.float64 or |
|
{'a': np.float64, 'b': np.int32, 'c': 'Int64'}. |
|
The argument is ignored if a table is passed instead of a query. |
|
|
|
.. versionadded:: 2.0.0 |
|
|
|
Returns |
|
------- |
|
DataFrame or Iterator[DataFrame] |
|
|
|
See Also |
|
-------- |
|
read_sql_table : Read SQL database table into a DataFrame. |
|
read_sql_query : Read SQL query into a DataFrame. |
|
|
|
Examples |
|
-------- |
|
Read data from SQL via either a SQL query or a SQL tablename. |
|
When using a SQLite database only SQL queries are accepted, |
|
providing only the SQL tablename will result in an error. |
|
|
|
>>> from sqlite3 import connect |
|
>>> conn = connect(':memory:') |
|
>>> df = pd.DataFrame(data=[[0, '10/11/12'], [1, '12/11/10']], |
|
... columns=['int_column', 'date_column']) |
|
>>> df.to_sql(name='test_data', con=conn) |
|
2 |
|
|
|
>>> pd.read_sql('SELECT int_column, date_column FROM test_data', conn) |
|
int_column date_column |
|
0 0 10/11/12 |
|
1 1 12/11/10 |
|
|
|
>>> pd.read_sql('test_data', 'postgres:///db_name') # doctest:+SKIP |
|
|
|
Apply date parsing to columns through the ``parse_dates`` argument |
|
The ``parse_dates`` argument calls ``pd.to_datetime`` on the provided columns. |
|
Custom argument values for applying ``pd.to_datetime`` on a column are specified |
|
via a dictionary format: |
|
|
|
>>> pd.read_sql('SELECT int_column, date_column FROM test_data', |
|
... conn, |
|
... parse_dates={"date_column": {"format": "%d/%m/%y"}}) |
|
int_column date_column |
|
0 0 2012-11-10 |
|
1 1 2010-11-12 |
|
|
|
.. versionadded:: 2.2.0 |
|
|
|
pandas now supports reading via ADBC drivers |
|
|
|
>>> from adbc_driver_postgresql import dbapi # doctest:+SKIP |
|
>>> with dbapi.connect('postgres:///db_name') as conn: # doctest:+SKIP |
|
... pd.read_sql('SELECT int_column FROM test_data', conn) |
|
int_column |
|
0 0 |
|
1 1 |
|
""" |
|
|
|
check_dtype_backend(dtype_backend) |
|
if dtype_backend is lib.no_default: |
|
dtype_backend = "numpy" |
|
assert dtype_backend is not lib.no_default |
|
|
|
with pandasSQL_builder(con) as pandas_sql: |
|
if isinstance(pandas_sql, SQLiteDatabase): |
|
return pandas_sql.read_query( |
|
sql, |
|
index_col=index_col, |
|
params=params, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
chunksize=chunksize, |
|
dtype_backend=dtype_backend, |
|
dtype=dtype, |
|
) |
|
|
|
try: |
|
_is_table_name = pandas_sql.has_table(sql) |
|
except Exception: |
|
|
|
_is_table_name = False |
|
|
|
if _is_table_name: |
|
return pandas_sql.read_table( |
|
sql, |
|
index_col=index_col, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
columns=columns, |
|
chunksize=chunksize, |
|
dtype_backend=dtype_backend, |
|
) |
|
else: |
|
return pandas_sql.read_query( |
|
sql, |
|
index_col=index_col, |
|
params=params, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
chunksize=chunksize, |
|
dtype_backend=dtype_backend, |
|
dtype=dtype, |
|
) |
|
|
|
|
|
def to_sql( |
|
frame, |
|
name: str, |
|
con, |
|
schema: str | None = None, |
|
if_exists: Literal["fail", "replace", "append"] = "fail", |
|
index: bool = True, |
|
index_label: IndexLabel | None = None, |
|
chunksize: int | None = None, |
|
dtype: DtypeArg | None = None, |
|
method: Literal["multi"] | Callable | None = None, |
|
engine: str = "auto", |
|
**engine_kwargs, |
|
) -> int | None: |
|
""" |
|
Write records stored in a DataFrame to a SQL database. |
|
|
|
Parameters |
|
---------- |
|
frame : DataFrame, Series |
|
name : str |
|
Name of SQL table. |
|
con : ADBC Connection, SQLAlchemy connectable, str, or sqlite3 connection |
|
or sqlite3 DBAPI2 connection |
|
ADBC provides high performance I/O with native type support, where available. |
|
Using SQLAlchemy makes it possible to use any DB supported by that |
|
library. |
|
If a DBAPI2 object, only sqlite3 is supported. |
|
schema : str, optional |
|
Name of SQL schema in database to write to (if database flavor |
|
supports this). If None, use default schema (default). |
|
if_exists : {'fail', 'replace', 'append'}, default 'fail' |
|
- fail: If table exists, do nothing. |
|
- replace: If table exists, drop it, recreate it, and insert data. |
|
- append: If table exists, insert data. Create if does not exist. |
|
index : bool, default True |
|
Write DataFrame index as a column. |
|
index_label : str or sequence, optional |
|
Column label for index column(s). If None is given (default) and |
|
`index` is True, then the index names are used. |
|
A sequence should be given if the DataFrame uses MultiIndex. |
|
chunksize : int, optional |
|
Specify the number of rows in each batch to be written at a time. |
|
By default, all rows will be written at once. |
|
dtype : dict or scalar, optional |
|
Specifying the datatype for columns. If a dictionary is used, the |
|
keys should be the column names and the values should be the |
|
SQLAlchemy types or strings for the sqlite3 fallback mode. If a |
|
scalar is provided, it will be applied to all columns. |
|
method : {None, 'multi', callable}, optional |
|
Controls the SQL insertion clause used: |
|
|
|
- None : Uses standard SQL ``INSERT`` clause (one per row). |
|
- ``'multi'``: Pass multiple values in a single ``INSERT`` clause. |
|
- callable with signature ``(pd_table, conn, keys, data_iter) -> int | None``. |
|
|
|
Details and a sample callable implementation can be found in the |
|
section :ref:`insert method <io.sql.method>`. |
|
engine : {'auto', 'sqlalchemy'}, default 'auto' |
|
SQL engine library to use. If 'auto', then the option |
|
``io.sql.engine`` is used. The default ``io.sql.engine`` |
|
behavior is 'sqlalchemy' |
|
|
|
.. versionadded:: 1.3.0 |
|
|
|
**engine_kwargs |
|
Any additional kwargs are passed to the engine. |
|
|
|
Returns |
|
------- |
|
None or int |
|
Number of rows affected by to_sql. None is returned if the callable |
|
passed into ``method`` does not return an integer number of rows. |
|
|
|
.. versionadded:: 1.4.0 |
|
|
|
Notes |
|
----- |
|
The returned rows affected is the sum of the ``rowcount`` attribute of ``sqlite3.Cursor`` |
|
or SQLAlchemy connectable. If using ADBC the returned rows are the result |
|
of ``Cursor.adbc_ingest``. The returned value may not reflect the exact number of written |
|
rows as stipulated in the |
|
`sqlite3 <https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.rowcount>`__ or |
|
`SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount>`__ |
|
""" |
|
if if_exists not in ("fail", "replace", "append"): |
|
raise ValueError(f"'{if_exists}' is not valid for if_exists") |
|
|
|
if isinstance(frame, Series): |
|
frame = frame.to_frame() |
|
elif not isinstance(frame, DataFrame): |
|
raise NotImplementedError( |
|
"'frame' argument should be either a Series or a DataFrame" |
|
) |
|
|
|
with pandasSQL_builder(con, schema=schema, need_transaction=True) as pandas_sql: |
|
return pandas_sql.to_sql( |
|
frame, |
|
name, |
|
if_exists=if_exists, |
|
index=index, |
|
index_label=index_label, |
|
schema=schema, |
|
chunksize=chunksize, |
|
dtype=dtype, |
|
method=method, |
|
engine=engine, |
|
**engine_kwargs, |
|
) |
|
|
|
|
|
def has_table(table_name: str, con, schema: str | None = None) -> bool: |
|
""" |
|
Check if DataBase has named table. |
|
|
|
Parameters |
|
---------- |
|
table_name: string |
|
Name of SQL table. |
|
con: ADBC Connection, SQLAlchemy connectable, str, or sqlite3 connection |
|
ADBC provides high performance I/O with native type support, where available. |
|
Using SQLAlchemy makes it possible to use any DB supported by that |
|
library. |
|
If a DBAPI2 object, only sqlite3 is supported. |
|
schema : string, default None |
|
Name of SQL schema in database to write to (if database flavor supports |
|
this). If None, use default schema (default). |
|
|
|
Returns |
|
------- |
|
boolean |
|
""" |
|
with pandasSQL_builder(con, schema=schema) as pandas_sql: |
|
return pandas_sql.has_table(table_name) |
|
|
|
|
|
table_exists = has_table |
|
|
|
|
|
def pandasSQL_builder( |
|
con, |
|
schema: str | None = None, |
|
need_transaction: bool = False, |
|
) -> PandasSQL: |
|
""" |
|
Convenience function to return the correct PandasSQL subclass based on the |
|
provided parameters. Also creates a sqlalchemy connection and transaction |
|
if necessary. |
|
""" |
|
import sqlite3 |
|
|
|
if isinstance(con, sqlite3.Connection) or con is None: |
|
return SQLiteDatabase(con) |
|
|
|
sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore") |
|
|
|
if isinstance(con, str) and sqlalchemy is None: |
|
raise ImportError("Using URI string without sqlalchemy installed.") |
|
|
|
if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)): |
|
return SQLDatabase(con, schema, need_transaction) |
|
|
|
adbc = import_optional_dependency("adbc_driver_manager.dbapi", errors="ignore") |
|
if adbc and isinstance(con, adbc.Connection): |
|
return ADBCDatabase(con) |
|
|
|
warnings.warn( |
|
"pandas only supports SQLAlchemy connectable (engine/connection) or " |
|
"database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 " |
|
"objects are not tested. Please consider using SQLAlchemy.", |
|
UserWarning, |
|
stacklevel=find_stack_level(), |
|
) |
|
return SQLiteDatabase(con) |
|
|
|
|
|
class SQLTable(PandasObject): |
|
""" |
|
For mapping Pandas tables to SQL tables. |
|
Uses fact that table is reflected by SQLAlchemy to |
|
do better type conversions. |
|
Also holds various flags needed to avoid having to |
|
pass them between functions all the time. |
|
""" |
|
|
|
|
|
|
|
def __init__( |
|
self, |
|
name: str, |
|
pandas_sql_engine, |
|
frame=None, |
|
index: bool | str | list[str] | None = True, |
|
if_exists: Literal["fail", "replace", "append"] = "fail", |
|
prefix: str = "pandas", |
|
index_label=None, |
|
schema=None, |
|
keys=None, |
|
dtype: DtypeArg | None = None, |
|
) -> None: |
|
self.name = name |
|
self.pd_sql = pandas_sql_engine |
|
self.prefix = prefix |
|
self.frame = frame |
|
self.index = self._index_name(index, index_label) |
|
self.schema = schema |
|
self.if_exists = if_exists |
|
self.keys = keys |
|
self.dtype = dtype |
|
|
|
if frame is not None: |
|
|
|
self.table = self._create_table_setup() |
|
else: |
|
|
|
self.table = self.pd_sql.get_table(self.name, self.schema) |
|
|
|
if self.table is None: |
|
raise ValueError(f"Could not init table '{name}'") |
|
|
|
if not len(self.name): |
|
raise ValueError("Empty table name specified") |
|
|
|
def exists(self): |
|
return self.pd_sql.has_table(self.name, self.schema) |
|
|
|
def sql_schema(self) -> str: |
|
from sqlalchemy.schema import CreateTable |
|
|
|
return str(CreateTable(self.table).compile(self.pd_sql.con)) |
|
|
|
def _execute_create(self) -> None: |
|
|
|
self.table = self.table.to_metadata(self.pd_sql.meta) |
|
with self.pd_sql.run_transaction(): |
|
self.table.create(bind=self.pd_sql.con) |
|
|
|
def create(self) -> None: |
|
if self.exists(): |
|
if self.if_exists == "fail": |
|
raise ValueError(f"Table '{self.name}' already exists.") |
|
if self.if_exists == "replace": |
|
self.pd_sql.drop_table(self.name, self.schema) |
|
self._execute_create() |
|
elif self.if_exists == "append": |
|
pass |
|
else: |
|
raise ValueError(f"'{self.if_exists}' is not valid for if_exists") |
|
else: |
|
self._execute_create() |
|
|
|
def _execute_insert(self, conn, keys: list[str], data_iter) -> int: |
|
""" |
|
Execute SQL statement inserting data |
|
|
|
Parameters |
|
---------- |
|
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection |
|
keys : list of str |
|
Column names |
|
data_iter : generator of list |
|
Each item contains a list of values to be inserted |
|
""" |
|
data = [dict(zip(keys, row)) for row in data_iter] |
|
result = conn.execute(self.table.insert(), data) |
|
return result.rowcount |
|
|
|
def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int: |
|
""" |
|
Alternative to _execute_insert for DBs support multi-value INSERT. |
|
|
|
Note: multi-value insert is usually faster for analytics DBs |
|
and tables containing a few columns |
|
but performance degrades quickly with increase of columns. |
|
|
|
""" |
|
|
|
from sqlalchemy import insert |
|
|
|
data = [dict(zip(keys, row)) for row in data_iter] |
|
stmt = insert(self.table).values(data) |
|
result = conn.execute(stmt) |
|
return result.rowcount |
|
|
|
def insert_data(self) -> tuple[list[str], list[np.ndarray]]: |
|
if self.index is not None: |
|
temp = self.frame.copy() |
|
temp.index.names = self.index |
|
try: |
|
temp.reset_index(inplace=True) |
|
except ValueError as err: |
|
raise ValueError(f"duplicate name in index/columns: {err}") from err |
|
else: |
|
temp = self.frame |
|
|
|
column_names = list(map(str, temp.columns)) |
|
ncols = len(column_names) |
|
|
|
|
|
data_list: list[np.ndarray] = [None] * ncols |
|
|
|
for i, (_, ser) in enumerate(temp.items()): |
|
if ser.dtype.kind == "M": |
|
if isinstance(ser._values, ArrowExtensionArray): |
|
import pyarrow as pa |
|
|
|
if pa.types.is_date(ser.dtype.pyarrow_dtype): |
|
|
|
d = ser._values.to_numpy(dtype=object) |
|
else: |
|
with warnings.catch_warnings(): |
|
warnings.filterwarnings("ignore", category=FutureWarning) |
|
|
|
d = np.asarray(ser.dt.to_pydatetime(), dtype=object) |
|
else: |
|
d = ser._values.to_pydatetime() |
|
elif ser.dtype.kind == "m": |
|
vals = ser._values |
|
if isinstance(vals, ArrowExtensionArray): |
|
vals = vals.to_numpy(dtype=np.dtype("m8[ns]")) |
|
|
|
d = vals.view("i8").astype(object) |
|
else: |
|
d = ser._values.astype(object) |
|
|
|
assert isinstance(d, np.ndarray), type(d) |
|
|
|
if ser._can_hold_na: |
|
|
|
mask = isna(d) |
|
d[mask] = None |
|
|
|
data_list[i] = d |
|
|
|
return column_names, data_list |
|
|
|
def insert( |
|
self, |
|
chunksize: int | None = None, |
|
method: Literal["multi"] | Callable | None = None, |
|
) -> int | None: |
|
|
|
if method is None: |
|
exec_insert = self._execute_insert |
|
elif method == "multi": |
|
exec_insert = self._execute_insert_multi |
|
elif callable(method): |
|
exec_insert = partial(method, self) |
|
else: |
|
raise ValueError(f"Invalid parameter `method`: {method}") |
|
|
|
keys, data_list = self.insert_data() |
|
|
|
nrows = len(self.frame) |
|
|
|
if nrows == 0: |
|
return 0 |
|
|
|
if chunksize is None: |
|
chunksize = nrows |
|
elif chunksize == 0: |
|
raise ValueError("chunksize argument should be non-zero") |
|
|
|
chunks = (nrows // chunksize) + 1 |
|
total_inserted = None |
|
with self.pd_sql.run_transaction() as conn: |
|
for i in range(chunks): |
|
start_i = i * chunksize |
|
end_i = min((i + 1) * chunksize, nrows) |
|
if start_i >= end_i: |
|
break |
|
|
|
chunk_iter = zip(*(arr[start_i:end_i] for arr in data_list)) |
|
num_inserted = exec_insert(conn, keys, chunk_iter) |
|
|
|
if num_inserted is not None: |
|
if total_inserted is None: |
|
total_inserted = num_inserted |
|
else: |
|
total_inserted += num_inserted |
|
return total_inserted |
|
|
|
def _query_iterator( |
|
self, |
|
result, |
|
exit_stack: ExitStack, |
|
chunksize: int | None, |
|
columns, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
): |
|
"""Return generator through chunked result set.""" |
|
has_read_data = False |
|
with exit_stack: |
|
while True: |
|
data = result.fetchmany(chunksize) |
|
if not data: |
|
if not has_read_data: |
|
yield DataFrame.from_records( |
|
[], columns=columns, coerce_float=coerce_float |
|
) |
|
break |
|
|
|
has_read_data = True |
|
self.frame = _convert_arrays_to_dataframe( |
|
data, columns, coerce_float, dtype_backend |
|
) |
|
|
|
self._harmonize_columns( |
|
parse_dates=parse_dates, dtype_backend=dtype_backend |
|
) |
|
|
|
if self.index is not None: |
|
self.frame.set_index(self.index, inplace=True) |
|
|
|
yield self.frame |
|
|
|
def read( |
|
self, |
|
exit_stack: ExitStack, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
columns=None, |
|
chunksize: int | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> DataFrame | Iterator[DataFrame]: |
|
from sqlalchemy import select |
|
|
|
if columns is not None and len(columns) > 0: |
|
cols = [self.table.c[n] for n in columns] |
|
if self.index is not None: |
|
for idx in self.index[::-1]: |
|
cols.insert(0, self.table.c[idx]) |
|
sql_select = select(*cols) |
|
else: |
|
sql_select = select(self.table) |
|
result = self.pd_sql.execute(sql_select) |
|
column_names = result.keys() |
|
|
|
if chunksize is not None: |
|
return self._query_iterator( |
|
result, |
|
exit_stack, |
|
chunksize, |
|
column_names, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
dtype_backend=dtype_backend, |
|
) |
|
else: |
|
data = result.fetchall() |
|
self.frame = _convert_arrays_to_dataframe( |
|
data, column_names, coerce_float, dtype_backend |
|
) |
|
|
|
self._harmonize_columns( |
|
parse_dates=parse_dates, dtype_backend=dtype_backend |
|
) |
|
|
|
if self.index is not None: |
|
self.frame.set_index(self.index, inplace=True) |
|
|
|
return self.frame |
|
|
|
def _index_name(self, index, index_label): |
|
|
|
if index is True: |
|
nlevels = self.frame.index.nlevels |
|
|
|
if index_label is not None: |
|
if not isinstance(index_label, list): |
|
index_label = [index_label] |
|
if len(index_label) != nlevels: |
|
raise ValueError( |
|
"Length of 'index_label' should match number of " |
|
f"levels, which is {nlevels}" |
|
) |
|
return index_label |
|
|
|
if ( |
|
nlevels == 1 |
|
and "index" not in self.frame.columns |
|
and self.frame.index.name is None |
|
): |
|
return ["index"] |
|
else: |
|
return com.fill_missing_names(self.frame.index.names) |
|
|
|
|
|
elif isinstance(index, str): |
|
return [index] |
|
elif isinstance(index, list): |
|
return index |
|
else: |
|
return None |
|
|
|
def _get_column_names_and_types(self, dtype_mapper): |
|
column_names_and_types = [] |
|
if self.index is not None: |
|
for i, idx_label in enumerate(self.index): |
|
idx_type = dtype_mapper(self.frame.index._get_level_values(i)) |
|
column_names_and_types.append((str(idx_label), idx_type, True)) |
|
|
|
column_names_and_types += [ |
|
(str(self.frame.columns[i]), dtype_mapper(self.frame.iloc[:, i]), False) |
|
for i in range(len(self.frame.columns)) |
|
] |
|
|
|
return column_names_and_types |
|
|
|
def _create_table_setup(self): |
|
from sqlalchemy import ( |
|
Column, |
|
PrimaryKeyConstraint, |
|
Table, |
|
) |
|
from sqlalchemy.schema import MetaData |
|
|
|
column_names_and_types = self._get_column_names_and_types(self._sqlalchemy_type) |
|
|
|
columns: list[Any] = [ |
|
Column(name, typ, index=is_index) |
|
for name, typ, is_index in column_names_and_types |
|
] |
|
|
|
if self.keys is not None: |
|
if not is_list_like(self.keys): |
|
keys = [self.keys] |
|
else: |
|
keys = self.keys |
|
pkc = PrimaryKeyConstraint(*keys, name=self.name + "_pk") |
|
columns.append(pkc) |
|
|
|
schema = self.schema or self.pd_sql.meta.schema |
|
|
|
|
|
|
|
meta = MetaData() |
|
return Table(self.name, meta, *columns, schema=schema) |
|
|
|
def _harmonize_columns( |
|
self, |
|
parse_dates=None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> None: |
|
""" |
|
Make the DataFrame's column types align with the SQL table |
|
column types. |
|
Need to work around limited NA value support. Floats are always |
|
fine, ints must always be floats if there are Null values. |
|
Booleans are hard because converting bool column with None replaces |
|
all Nones with false. Therefore only convert bool if there are no |
|
NA values. |
|
Datetimes should already be converted to np.datetime64 if supported, |
|
but here we also force conversion if required. |
|
""" |
|
parse_dates = _process_parse_dates_argument(parse_dates) |
|
|
|
for sql_col in self.table.columns: |
|
col_name = sql_col.name |
|
try: |
|
df_col = self.frame[col_name] |
|
|
|
|
|
|
|
if col_name in parse_dates: |
|
try: |
|
fmt = parse_dates[col_name] |
|
except TypeError: |
|
fmt = None |
|
self.frame[col_name] = _handle_date_column(df_col, format=fmt) |
|
continue |
|
|
|
|
|
col_type = self._get_dtype(sql_col.type) |
|
|
|
if ( |
|
col_type is datetime |
|
or col_type is date |
|
or col_type is DatetimeTZDtype |
|
): |
|
|
|
utc = col_type is DatetimeTZDtype |
|
self.frame[col_name] = _handle_date_column(df_col, utc=utc) |
|
elif dtype_backend == "numpy" and col_type is float: |
|
|
|
self.frame[col_name] = df_col.astype(col_type, copy=False) |
|
|
|
elif dtype_backend == "numpy" and len(df_col) == df_col.count(): |
|
|
|
if col_type is np.dtype("int64") or col_type is bool: |
|
self.frame[col_name] = df_col.astype(col_type, copy=False) |
|
except KeyError: |
|
pass |
|
|
|
def _sqlalchemy_type(self, col: Index | Series): |
|
dtype: DtypeArg = self.dtype or {} |
|
if is_dict_like(dtype): |
|
dtype = cast(dict, dtype) |
|
if col.name in dtype: |
|
return dtype[col.name] |
|
|
|
|
|
|
|
col_type = lib.infer_dtype(col, skipna=True) |
|
|
|
from sqlalchemy.types import ( |
|
TIMESTAMP, |
|
BigInteger, |
|
Boolean, |
|
Date, |
|
DateTime, |
|
Float, |
|
Integer, |
|
SmallInteger, |
|
Text, |
|
Time, |
|
) |
|
|
|
if col_type in ("datetime64", "datetime"): |
|
|
|
|
|
try: |
|
|
|
if col.dt.tz is not None: |
|
return TIMESTAMP(timezone=True) |
|
except AttributeError: |
|
|
|
|
|
if getattr(col, "tz", None) is not None: |
|
return TIMESTAMP(timezone=True) |
|
return DateTime |
|
if col_type == "timedelta64": |
|
warnings.warn( |
|
"the 'timedelta' type is not supported, and will be " |
|
"written as integer values (ns frequency) to the database.", |
|
UserWarning, |
|
stacklevel=find_stack_level(), |
|
) |
|
return BigInteger |
|
elif col_type == "floating": |
|
if col.dtype == "float32": |
|
return Float(precision=23) |
|
else: |
|
return Float(precision=53) |
|
elif col_type == "integer": |
|
|
|
if col.dtype.name.lower() in ("int8", "uint8", "int16"): |
|
return SmallInteger |
|
elif col.dtype.name.lower() in ("uint16", "int32"): |
|
return Integer |
|
elif col.dtype.name.lower() == "uint64": |
|
raise ValueError("Unsigned 64 bit integer datatype is not supported") |
|
else: |
|
return BigInteger |
|
elif col_type == "boolean": |
|
return Boolean |
|
elif col_type == "date": |
|
return Date |
|
elif col_type == "time": |
|
return Time |
|
elif col_type == "complex": |
|
raise ValueError("Complex datatypes not supported") |
|
|
|
return Text |
|
|
|
def _get_dtype(self, sqltype): |
|
from sqlalchemy.types import ( |
|
TIMESTAMP, |
|
Boolean, |
|
Date, |
|
DateTime, |
|
Float, |
|
Integer, |
|
) |
|
|
|
if isinstance(sqltype, Float): |
|
return float |
|
elif isinstance(sqltype, Integer): |
|
|
|
return np.dtype("int64") |
|
elif isinstance(sqltype, TIMESTAMP): |
|
|
|
if not sqltype.timezone: |
|
return datetime |
|
return DatetimeTZDtype |
|
elif isinstance(sqltype, DateTime): |
|
|
|
return datetime |
|
elif isinstance(sqltype, Date): |
|
return date |
|
elif isinstance(sqltype, Boolean): |
|
return bool |
|
return object |
|
|
|
|
|
class PandasSQL(PandasObject, ABC): |
|
""" |
|
Subclasses Should define read_query and to_sql. |
|
""" |
|
|
|
def __enter__(self) -> Self: |
|
return self |
|
|
|
def __exit__(self, *args) -> None: |
|
pass |
|
|
|
def read_table( |
|
self, |
|
table_name: str, |
|
index_col: str | list[str] | None = None, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
columns=None, |
|
schema: str | None = None, |
|
chunksize: int | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> DataFrame | Iterator[DataFrame]: |
|
raise NotImplementedError |
|
|
|
@abstractmethod |
|
def read_query( |
|
self, |
|
sql: str, |
|
index_col: str | list[str] | None = None, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
params=None, |
|
chunksize: int | None = None, |
|
dtype: DtypeArg | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> DataFrame | Iterator[DataFrame]: |
|
pass |
|
|
|
@abstractmethod |
|
def to_sql( |
|
self, |
|
frame, |
|
name: str, |
|
if_exists: Literal["fail", "replace", "append"] = "fail", |
|
index: bool = True, |
|
index_label=None, |
|
schema=None, |
|
chunksize: int | None = None, |
|
dtype: DtypeArg | None = None, |
|
method: Literal["multi"] | Callable | None = None, |
|
engine: str = "auto", |
|
**engine_kwargs, |
|
) -> int | None: |
|
pass |
|
|
|
@abstractmethod |
|
def execute(self, sql: str | Select | TextClause, params=None): |
|
pass |
|
|
|
@abstractmethod |
|
def has_table(self, name: str, schema: str | None = None) -> bool: |
|
pass |
|
|
|
@abstractmethod |
|
def _create_sql_schema( |
|
self, |
|
frame: DataFrame, |
|
table_name: str, |
|
keys: list[str] | None = None, |
|
dtype: DtypeArg | None = None, |
|
schema: str | None = None, |
|
) -> str: |
|
pass |
|
|
|
|
|
class BaseEngine: |
|
def insert_records( |
|
self, |
|
table: SQLTable, |
|
con, |
|
frame, |
|
name: str, |
|
index: bool | str | list[str] | None = True, |
|
schema=None, |
|
chunksize: int | None = None, |
|
method=None, |
|
**engine_kwargs, |
|
) -> int | None: |
|
""" |
|
Inserts data into already-prepared table |
|
""" |
|
raise AbstractMethodError(self) |
|
|
|
|
|
class SQLAlchemyEngine(BaseEngine): |
|
def __init__(self) -> None: |
|
import_optional_dependency( |
|
"sqlalchemy", extra="sqlalchemy is required for SQL support." |
|
) |
|
|
|
def insert_records( |
|
self, |
|
table: SQLTable, |
|
con, |
|
frame, |
|
name: str, |
|
index: bool | str | list[str] | None = True, |
|
schema=None, |
|
chunksize: int | None = None, |
|
method=None, |
|
**engine_kwargs, |
|
) -> int | None: |
|
from sqlalchemy import exc |
|
|
|
try: |
|
return table.insert(chunksize=chunksize, method=method) |
|
except exc.StatementError as err: |
|
|
|
|
|
msg = r"""(\(1054, "Unknown column 'inf(e0)?' in 'field list'"\))(?# |
|
)|inf can not be used with MySQL""" |
|
err_text = str(err.orig) |
|
if re.search(msg, err_text): |
|
raise ValueError("inf cannot be used with MySQL") from err |
|
raise err |
|
|
|
|
|
def get_engine(engine: str) -> BaseEngine: |
|
"""return our implementation""" |
|
if engine == "auto": |
|
engine = get_option("io.sql.engine") |
|
|
|
if engine == "auto": |
|
|
|
engine_classes = [SQLAlchemyEngine] |
|
|
|
error_msgs = "" |
|
for engine_class in engine_classes: |
|
try: |
|
return engine_class() |
|
except ImportError as err: |
|
error_msgs += "\n - " + str(err) |
|
|
|
raise ImportError( |
|
"Unable to find a usable engine; " |
|
"tried using: 'sqlalchemy'.\n" |
|
"A suitable version of " |
|
"sqlalchemy is required for sql I/O " |
|
"support.\n" |
|
"Trying to import the above resulted in these errors:" |
|
f"{error_msgs}" |
|
) |
|
|
|
if engine == "sqlalchemy": |
|
return SQLAlchemyEngine() |
|
|
|
raise ValueError("engine must be one of 'auto', 'sqlalchemy'") |
|
|
|
|
|
class SQLDatabase(PandasSQL): |
|
""" |
|
This class enables conversion between DataFrame and SQL databases |
|
using SQLAlchemy to handle DataBase abstraction. |
|
|
|
Parameters |
|
---------- |
|
con : SQLAlchemy Connectable or URI string. |
|
Connectable to connect with the database. Using SQLAlchemy makes it |
|
possible to use any DB supported by that library. |
|
schema : string, default None |
|
Name of SQL schema in database to write to (if database flavor |
|
supports this). If None, use default schema (default). |
|
need_transaction : bool, default False |
|
If True, SQLDatabase will create a transaction. |
|
|
|
""" |
|
|
|
def __init__( |
|
self, con, schema: str | None = None, need_transaction: bool = False |
|
) -> None: |
|
from sqlalchemy import create_engine |
|
from sqlalchemy.engine import Engine |
|
from sqlalchemy.schema import MetaData |
|
|
|
|
|
|
|
|
|
|
|
self.exit_stack = ExitStack() |
|
if isinstance(con, str): |
|
con = create_engine(con) |
|
self.exit_stack.callback(con.dispose) |
|
if isinstance(con, Engine): |
|
con = self.exit_stack.enter_context(con.connect()) |
|
if need_transaction and not con.in_transaction(): |
|
self.exit_stack.enter_context(con.begin()) |
|
self.con = con |
|
self.meta = MetaData(schema=schema) |
|
self.returns_generator = False |
|
|
|
def __exit__(self, *args) -> None: |
|
if not self.returns_generator: |
|
self.exit_stack.close() |
|
|
|
@contextmanager |
|
def run_transaction(self): |
|
if not self.con.in_transaction(): |
|
with self.con.begin(): |
|
yield self.con |
|
else: |
|
yield self.con |
|
|
|
def execute(self, sql: str | Select | TextClause, params=None): |
|
"""Simple passthrough to SQLAlchemy connectable""" |
|
args = [] if params is None else [params] |
|
if isinstance(sql, str): |
|
return self.con.exec_driver_sql(sql, *args) |
|
return self.con.execute(sql, *args) |
|
|
|
def read_table( |
|
self, |
|
table_name: str, |
|
index_col: str | list[str] | None = None, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
columns=None, |
|
schema: str | None = None, |
|
chunksize: int | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> DataFrame | Iterator[DataFrame]: |
|
""" |
|
Read SQL database table into a DataFrame. |
|
|
|
Parameters |
|
---------- |
|
table_name : str |
|
Name of SQL table in database. |
|
index_col : string, optional, default: None |
|
Column to set as index. |
|
coerce_float : bool, default True |
|
Attempts to convert values of non-string, non-numeric objects |
|
(like decimal.Decimal) to floating point. This can result in |
|
loss of precision. |
|
parse_dates : list or dict, default: None |
|
- List of column names to parse as dates. |
|
- Dict of ``{column_name: format string}`` where format string is |
|
strftime compatible in case of parsing string times, or is one of |
|
(D, s, ns, ms, us) in case of parsing integer timestamps. |
|
- Dict of ``{column_name: arg}``, where the arg corresponds |
|
to the keyword arguments of :func:`pandas.to_datetime`. |
|
Especially useful with databases without native Datetime support, |
|
such as SQLite. |
|
columns : list, default: None |
|
List of column names to select from SQL table. |
|
schema : string, default None |
|
Name of SQL schema in database to query (if database flavor |
|
supports this). If specified, this overwrites the default |
|
schema of the SQL database object. |
|
chunksize : int, default None |
|
If specified, return an iterator where `chunksize` is the number |
|
of rows to include in each chunk. |
|
dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable' |
|
Back-end data type applied to the resultant :class:`DataFrame` |
|
(still experimental). Behaviour is as follows: |
|
|
|
* ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame` |
|
(default). |
|
* ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype` |
|
DataFrame. |
|
|
|
.. versionadded:: 2.0 |
|
|
|
Returns |
|
------- |
|
DataFrame |
|
|
|
See Also |
|
-------- |
|
pandas.read_sql_table |
|
SQLDatabase.read_query |
|
|
|
""" |
|
self.meta.reflect(bind=self.con, only=[table_name], views=True) |
|
table = SQLTable(table_name, self, index=index_col, schema=schema) |
|
if chunksize is not None: |
|
self.returns_generator = True |
|
return table.read( |
|
self.exit_stack, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
columns=columns, |
|
chunksize=chunksize, |
|
dtype_backend=dtype_backend, |
|
) |
|
|
|
@staticmethod |
|
def _query_iterator( |
|
result, |
|
exit_stack: ExitStack, |
|
chunksize: int, |
|
columns, |
|
index_col=None, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
dtype: DtypeArg | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
): |
|
"""Return generator through chunked result set""" |
|
has_read_data = False |
|
with exit_stack: |
|
while True: |
|
data = result.fetchmany(chunksize) |
|
if not data: |
|
if not has_read_data: |
|
yield _wrap_result( |
|
[], |
|
columns, |
|
index_col=index_col, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
dtype=dtype, |
|
dtype_backend=dtype_backend, |
|
) |
|
break |
|
|
|
has_read_data = True |
|
yield _wrap_result( |
|
data, |
|
columns, |
|
index_col=index_col, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
dtype=dtype, |
|
dtype_backend=dtype_backend, |
|
) |
|
|
|
def read_query( |
|
self, |
|
sql: str, |
|
index_col: str | list[str] | None = None, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
params=None, |
|
chunksize: int | None = None, |
|
dtype: DtypeArg | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> DataFrame | Iterator[DataFrame]: |
|
""" |
|
Read SQL query into a DataFrame. |
|
|
|
Parameters |
|
---------- |
|
sql : str |
|
SQL query to be executed. |
|
index_col : string, optional, default: None |
|
Column name to use as index for the returned DataFrame object. |
|
coerce_float : bool, default True |
|
Attempt to convert values of non-string, non-numeric objects (like |
|
decimal.Decimal) to floating point, useful for SQL result sets. |
|
params : list, tuple or dict, optional, default: None |
|
List of parameters to pass to execute method. The syntax used |
|
to pass parameters is database driver dependent. Check your |
|
database driver documentation for which of the five syntax styles, |
|
described in PEP 249's paramstyle, is supported. |
|
Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'} |
|
parse_dates : list or dict, default: None |
|
- List of column names to parse as dates. |
|
- Dict of ``{column_name: format string}`` where format string is |
|
strftime compatible in case of parsing string times, or is one of |
|
(D, s, ns, ms, us) in case of parsing integer timestamps. |
|
- Dict of ``{column_name: arg dict}``, where the arg dict |
|
corresponds to the keyword arguments of |
|
:func:`pandas.to_datetime` Especially useful with databases |
|
without native Datetime support, such as SQLite. |
|
chunksize : int, default None |
|
If specified, return an iterator where `chunksize` is the number |
|
of rows to include in each chunk. |
|
dtype : Type name or dict of columns |
|
Data type for data or columns. E.g. np.float64 or |
|
{'a': np.float64, 'b': np.int32, 'c': 'Int64'} |
|
|
|
.. versionadded:: 1.3.0 |
|
|
|
Returns |
|
------- |
|
DataFrame |
|
|
|
See Also |
|
-------- |
|
read_sql_table : Read SQL database table into a DataFrame. |
|
read_sql |
|
|
|
""" |
|
result = self.execute(sql, params) |
|
columns = result.keys() |
|
|
|
if chunksize is not None: |
|
self.returns_generator = True |
|
return self._query_iterator( |
|
result, |
|
self.exit_stack, |
|
chunksize, |
|
columns, |
|
index_col=index_col, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
dtype=dtype, |
|
dtype_backend=dtype_backend, |
|
) |
|
else: |
|
data = result.fetchall() |
|
frame = _wrap_result( |
|
data, |
|
columns, |
|
index_col=index_col, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
dtype=dtype, |
|
dtype_backend=dtype_backend, |
|
) |
|
return frame |
|
|
|
read_sql = read_query |
|
|
|
def prep_table( |
|
self, |
|
frame, |
|
name: str, |
|
if_exists: Literal["fail", "replace", "append"] = "fail", |
|
index: bool | str | list[str] | None = True, |
|
index_label=None, |
|
schema=None, |
|
dtype: DtypeArg | None = None, |
|
) -> SQLTable: |
|
""" |
|
Prepares table in the database for data insertion. Creates it if needed, etc. |
|
""" |
|
if dtype: |
|
if not is_dict_like(dtype): |
|
|
|
|
|
|
|
|
|
|
|
|
|
dtype = {col_name: dtype for col_name in frame} |
|
else: |
|
dtype = cast(dict, dtype) |
|
|
|
from sqlalchemy.types import TypeEngine |
|
|
|
for col, my_type in dtype.items(): |
|
if isinstance(my_type, type) and issubclass(my_type, TypeEngine): |
|
pass |
|
elif isinstance(my_type, TypeEngine): |
|
pass |
|
else: |
|
raise ValueError(f"The type of {col} is not a SQLAlchemy type") |
|
|
|
table = SQLTable( |
|
name, |
|
self, |
|
frame=frame, |
|
index=index, |
|
if_exists=if_exists, |
|
index_label=index_label, |
|
schema=schema, |
|
dtype=dtype, |
|
) |
|
table.create() |
|
return table |
|
|
|
def check_case_sensitive( |
|
self, |
|
name: str, |
|
schema: str | None, |
|
) -> None: |
|
""" |
|
Checks table name for issues with case-sensitivity. |
|
Method is called after data is inserted. |
|
""" |
|
if not name.isdigit() and not name.islower(): |
|
|
|
|
|
from sqlalchemy import inspect as sqlalchemy_inspect |
|
|
|
insp = sqlalchemy_inspect(self.con) |
|
table_names = insp.get_table_names(schema=schema or self.meta.schema) |
|
if name not in table_names: |
|
msg = ( |
|
f"The provided table name '{name}' is not found exactly as " |
|
"such in the database after writing the table, possibly " |
|
"due to case sensitivity issues. Consider using lower " |
|
"case table names." |
|
) |
|
warnings.warn( |
|
msg, |
|
UserWarning, |
|
stacklevel=find_stack_level(), |
|
) |
|
|
|
def to_sql( |
|
self, |
|
frame, |
|
name: str, |
|
if_exists: Literal["fail", "replace", "append"] = "fail", |
|
index: bool = True, |
|
index_label=None, |
|
schema: str | None = None, |
|
chunksize: int | None = None, |
|
dtype: DtypeArg | None = None, |
|
method: Literal["multi"] | Callable | None = None, |
|
engine: str = "auto", |
|
**engine_kwargs, |
|
) -> int | None: |
|
""" |
|
Write records stored in a DataFrame to a SQL database. |
|
|
|
Parameters |
|
---------- |
|
frame : DataFrame |
|
name : string |
|
Name of SQL table. |
|
if_exists : {'fail', 'replace', 'append'}, default 'fail' |
|
- fail: If table exists, do nothing. |
|
- replace: If table exists, drop it, recreate it, and insert data. |
|
- append: If table exists, insert data. Create if does not exist. |
|
index : boolean, default True |
|
Write DataFrame index as a column. |
|
index_label : string or sequence, default None |
|
Column label for index column(s). If None is given (default) and |
|
`index` is True, then the index names are used. |
|
A sequence should be given if the DataFrame uses MultiIndex. |
|
schema : string, default None |
|
Name of SQL schema in database to write to (if database flavor |
|
supports this). If specified, this overwrites the default |
|
schema of the SQLDatabase object. |
|
chunksize : int, default None |
|
If not None, then rows will be written in batches of this size at a |
|
time. If None, all rows will be written at once. |
|
dtype : single type or dict of column name to SQL type, default None |
|
Optional specifying the datatype for columns. The SQL type should |
|
be a SQLAlchemy type. If all columns are of the same type, one |
|
single value can be used. |
|
method : {None', 'multi', callable}, default None |
|
Controls the SQL insertion clause used: |
|
|
|
* None : Uses standard SQL ``INSERT`` clause (one per row). |
|
* 'multi': Pass multiple values in a single ``INSERT`` clause. |
|
* callable with signature ``(pd_table, conn, keys, data_iter)``. |
|
|
|
Details and a sample callable implementation can be found in the |
|
section :ref:`insert method <io.sql.method>`. |
|
engine : {'auto', 'sqlalchemy'}, default 'auto' |
|
SQL engine library to use. If 'auto', then the option |
|
``io.sql.engine`` is used. The default ``io.sql.engine`` |
|
behavior is 'sqlalchemy' |
|
|
|
.. versionadded:: 1.3.0 |
|
|
|
**engine_kwargs |
|
Any additional kwargs are passed to the engine. |
|
""" |
|
sql_engine = get_engine(engine) |
|
|
|
table = self.prep_table( |
|
frame=frame, |
|
name=name, |
|
if_exists=if_exists, |
|
index=index, |
|
index_label=index_label, |
|
schema=schema, |
|
dtype=dtype, |
|
) |
|
|
|
total_inserted = sql_engine.insert_records( |
|
table=table, |
|
con=self.con, |
|
frame=frame, |
|
name=name, |
|
index=index, |
|
schema=schema, |
|
chunksize=chunksize, |
|
method=method, |
|
**engine_kwargs, |
|
) |
|
|
|
self.check_case_sensitive(name=name, schema=schema) |
|
return total_inserted |
|
|
|
@property |
|
def tables(self): |
|
return self.meta.tables |
|
|
|
def has_table(self, name: str, schema: str | None = None) -> bool: |
|
from sqlalchemy import inspect as sqlalchemy_inspect |
|
|
|
insp = sqlalchemy_inspect(self.con) |
|
return insp.has_table(name, schema or self.meta.schema) |
|
|
|
def get_table(self, table_name: str, schema: str | None = None) -> Table: |
|
from sqlalchemy import ( |
|
Numeric, |
|
Table, |
|
) |
|
|
|
schema = schema or self.meta.schema |
|
tbl = Table(table_name, self.meta, autoload_with=self.con, schema=schema) |
|
for column in tbl.columns: |
|
if isinstance(column.type, Numeric): |
|
column.type.asdecimal = False |
|
return tbl |
|
|
|
def drop_table(self, table_name: str, schema: str | None = None) -> None: |
|
schema = schema or self.meta.schema |
|
if self.has_table(table_name, schema): |
|
self.meta.reflect( |
|
bind=self.con, only=[table_name], schema=schema, views=True |
|
) |
|
with self.run_transaction(): |
|
self.get_table(table_name, schema).drop(bind=self.con) |
|
self.meta.clear() |
|
|
|
def _create_sql_schema( |
|
self, |
|
frame: DataFrame, |
|
table_name: str, |
|
keys: list[str] | None = None, |
|
dtype: DtypeArg | None = None, |
|
schema: str | None = None, |
|
) -> str: |
|
table = SQLTable( |
|
table_name, |
|
self, |
|
frame=frame, |
|
index=False, |
|
keys=keys, |
|
dtype=dtype, |
|
schema=schema, |
|
) |
|
return str(table.sql_schema()) |
|
|
|
|
|
|
|
|
|
|
|
class ADBCDatabase(PandasSQL): |
|
""" |
|
This class enables conversion between DataFrame and SQL databases |
|
using ADBC to handle DataBase abstraction. |
|
|
|
Parameters |
|
---------- |
|
con : adbc_driver_manager.dbapi.Connection |
|
""" |
|
|
|
def __init__(self, con) -> None: |
|
self.con = con |
|
|
|
@contextmanager |
|
def run_transaction(self): |
|
with self.con.cursor() as cur: |
|
try: |
|
yield cur |
|
except Exception: |
|
self.con.rollback() |
|
raise |
|
self.con.commit() |
|
|
|
def execute(self, sql: str | Select | TextClause, params=None): |
|
if not isinstance(sql, str): |
|
raise TypeError("Query must be a string unless using sqlalchemy.") |
|
args = [] if params is None else [params] |
|
cur = self.con.cursor() |
|
try: |
|
cur.execute(sql, *args) |
|
return cur |
|
except Exception as exc: |
|
try: |
|
self.con.rollback() |
|
except Exception as inner_exc: |
|
ex = DatabaseError( |
|
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback" |
|
) |
|
raise ex from inner_exc |
|
|
|
ex = DatabaseError(f"Execution failed on sql '{sql}': {exc}") |
|
raise ex from exc |
|
|
|
def read_table( |
|
self, |
|
table_name: str, |
|
index_col: str | list[str] | None = None, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
columns=None, |
|
schema: str | None = None, |
|
chunksize: int | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> DataFrame | Iterator[DataFrame]: |
|
""" |
|
Read SQL database table into a DataFrame. |
|
|
|
Parameters |
|
---------- |
|
table_name : str |
|
Name of SQL table in database. |
|
coerce_float : bool, default True |
|
Raises NotImplementedError |
|
parse_dates : list or dict, default: None |
|
- List of column names to parse as dates. |
|
- Dict of ``{column_name: format string}`` where format string is |
|
strftime compatible in case of parsing string times, or is one of |
|
(D, s, ns, ms, us) in case of parsing integer timestamps. |
|
- Dict of ``{column_name: arg}``, where the arg corresponds |
|
to the keyword arguments of :func:`pandas.to_datetime`. |
|
Especially useful with databases without native Datetime support, |
|
such as SQLite. |
|
columns : list, default: None |
|
List of column names to select from SQL table. |
|
schema : string, default None |
|
Name of SQL schema in database to query (if database flavor |
|
supports this). If specified, this overwrites the default |
|
schema of the SQL database object. |
|
chunksize : int, default None |
|
Raises NotImplementedError |
|
dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable' |
|
Back-end data type applied to the resultant :class:`DataFrame` |
|
(still experimental). Behaviour is as follows: |
|
|
|
* ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame` |
|
(default). |
|
* ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype` |
|
DataFrame. |
|
|
|
.. versionadded:: 2.0 |
|
|
|
Returns |
|
------- |
|
DataFrame |
|
|
|
See Also |
|
-------- |
|
pandas.read_sql_table |
|
SQLDatabase.read_query |
|
|
|
""" |
|
if coerce_float is not True: |
|
raise NotImplementedError( |
|
"'coerce_float' is not implemented for ADBC drivers" |
|
) |
|
if chunksize: |
|
raise NotImplementedError("'chunksize' is not implemented for ADBC drivers") |
|
|
|
if columns: |
|
if index_col: |
|
index_select = maybe_make_list(index_col) |
|
else: |
|
index_select = [] |
|
to_select = index_select + columns |
|
select_list = ", ".join(f'"{x}"' for x in to_select) |
|
else: |
|
select_list = "*" |
|
if schema: |
|
stmt = f"SELECT {select_list} FROM {schema}.{table_name}" |
|
else: |
|
stmt = f"SELECT {select_list} FROM {table_name}" |
|
|
|
mapping: type[ArrowDtype] | None | Callable |
|
if dtype_backend == "pyarrow": |
|
mapping = ArrowDtype |
|
elif dtype_backend == "numpy_nullable": |
|
from pandas.io._util import _arrow_dtype_mapping |
|
|
|
mapping = _arrow_dtype_mapping().get |
|
elif using_pyarrow_string_dtype(): |
|
from pandas.io._util import arrow_string_types_mapper |
|
|
|
arrow_string_types_mapper() |
|
else: |
|
mapping = None |
|
|
|
with self.con.cursor() as cur: |
|
cur.execute(stmt) |
|
df = cur.fetch_arrow_table().to_pandas(types_mapper=mapping) |
|
|
|
return _wrap_result_adbc( |
|
df, |
|
index_col=index_col, |
|
parse_dates=parse_dates, |
|
) |
|
|
|
def read_query( |
|
self, |
|
sql: str, |
|
index_col: str | list[str] | None = None, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
params=None, |
|
chunksize: int | None = None, |
|
dtype: DtypeArg | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> DataFrame | Iterator[DataFrame]: |
|
""" |
|
Read SQL query into a DataFrame. |
|
|
|
Parameters |
|
---------- |
|
sql : str |
|
SQL query to be executed. |
|
index_col : string, optional, default: None |
|
Column name to use as index for the returned DataFrame object. |
|
coerce_float : bool, default True |
|
Raises NotImplementedError |
|
params : list, tuple or dict, optional, default: None |
|
Raises NotImplementedError |
|
parse_dates : list or dict, default: None |
|
- List of column names to parse as dates. |
|
- Dict of ``{column_name: format string}`` where format string is |
|
strftime compatible in case of parsing string times, or is one of |
|
(D, s, ns, ms, us) in case of parsing integer timestamps. |
|
- Dict of ``{column_name: arg dict}``, where the arg dict |
|
corresponds to the keyword arguments of |
|
:func:`pandas.to_datetime` Especially useful with databases |
|
without native Datetime support, such as SQLite. |
|
chunksize : int, default None |
|
Raises NotImplementedError |
|
dtype : Type name or dict of columns |
|
Data type for data or columns. E.g. np.float64 or |
|
{'a': np.float64, 'b': np.int32, 'c': 'Int64'} |
|
|
|
.. versionadded:: 1.3.0 |
|
|
|
Returns |
|
------- |
|
DataFrame |
|
|
|
See Also |
|
-------- |
|
read_sql_table : Read SQL database table into a DataFrame. |
|
read_sql |
|
|
|
""" |
|
if coerce_float is not True: |
|
raise NotImplementedError( |
|
"'coerce_float' is not implemented for ADBC drivers" |
|
) |
|
if params: |
|
raise NotImplementedError("'params' is not implemented for ADBC drivers") |
|
if chunksize: |
|
raise NotImplementedError("'chunksize' is not implemented for ADBC drivers") |
|
|
|
mapping: type[ArrowDtype] | None | Callable |
|
if dtype_backend == "pyarrow": |
|
mapping = ArrowDtype |
|
elif dtype_backend == "numpy_nullable": |
|
from pandas.io._util import _arrow_dtype_mapping |
|
|
|
mapping = _arrow_dtype_mapping().get |
|
else: |
|
mapping = None |
|
|
|
with self.con.cursor() as cur: |
|
cur.execute(sql) |
|
df = cur.fetch_arrow_table().to_pandas(types_mapper=mapping) |
|
|
|
return _wrap_result_adbc( |
|
df, |
|
index_col=index_col, |
|
parse_dates=parse_dates, |
|
dtype=dtype, |
|
) |
|
|
|
read_sql = read_query |
|
|
|
def to_sql( |
|
self, |
|
frame, |
|
name: str, |
|
if_exists: Literal["fail", "replace", "append"] = "fail", |
|
index: bool = True, |
|
index_label=None, |
|
schema: str | None = None, |
|
chunksize: int | None = None, |
|
dtype: DtypeArg | None = None, |
|
method: Literal["multi"] | Callable | None = None, |
|
engine: str = "auto", |
|
**engine_kwargs, |
|
) -> int | None: |
|
""" |
|
Write records stored in a DataFrame to a SQL database. |
|
|
|
Parameters |
|
---------- |
|
frame : DataFrame |
|
name : string |
|
Name of SQL table. |
|
if_exists : {'fail', 'replace', 'append'}, default 'fail' |
|
- fail: If table exists, do nothing. |
|
- replace: If table exists, drop it, recreate it, and insert data. |
|
- append: If table exists, insert data. Create if does not exist. |
|
index : boolean, default True |
|
Write DataFrame index as a column. |
|
index_label : string or sequence, default None |
|
Raises NotImplementedError |
|
schema : string, default None |
|
Name of SQL schema in database to write to (if database flavor |
|
supports this). If specified, this overwrites the default |
|
schema of the SQLDatabase object. |
|
chunksize : int, default None |
|
Raises NotImplementedError |
|
dtype : single type or dict of column name to SQL type, default None |
|
Raises NotImplementedError |
|
method : {None', 'multi', callable}, default None |
|
Raises NotImplementedError |
|
engine : {'auto', 'sqlalchemy'}, default 'auto' |
|
Raises NotImplementedError if not set to 'auto' |
|
""" |
|
if index_label: |
|
raise NotImplementedError( |
|
"'index_label' is not implemented for ADBC drivers" |
|
) |
|
if chunksize: |
|
raise NotImplementedError("'chunksize' is not implemented for ADBC drivers") |
|
if dtype: |
|
raise NotImplementedError("'dtype' is not implemented for ADBC drivers") |
|
if method: |
|
raise NotImplementedError("'method' is not implemented for ADBC drivers") |
|
if engine != "auto": |
|
raise NotImplementedError( |
|
"engine != 'auto' not implemented for ADBC drivers" |
|
) |
|
|
|
if schema: |
|
table_name = f"{schema}.{name}" |
|
else: |
|
table_name = name |
|
|
|
|
|
|
|
|
|
|
|
mode = "create" |
|
if self.has_table(name, schema): |
|
if if_exists == "fail": |
|
raise ValueError(f"Table '{table_name}' already exists.") |
|
elif if_exists == "replace": |
|
with self.con.cursor() as cur: |
|
cur.execute(f"DROP TABLE {table_name}") |
|
elif if_exists == "append": |
|
mode = "append" |
|
|
|
import pyarrow as pa |
|
|
|
try: |
|
tbl = pa.Table.from_pandas(frame, preserve_index=index) |
|
except pa.ArrowNotImplementedError as exc: |
|
raise ValueError("datatypes not supported") from exc |
|
|
|
with self.con.cursor() as cur: |
|
total_inserted = cur.adbc_ingest( |
|
table_name=name, data=tbl, mode=mode, db_schema_name=schema |
|
) |
|
|
|
self.con.commit() |
|
return total_inserted |
|
|
|
def has_table(self, name: str, schema: str | None = None) -> bool: |
|
meta = self.con.adbc_get_objects( |
|
db_schema_filter=schema, table_name_filter=name |
|
).read_all() |
|
|
|
for catalog_schema in meta["catalog_db_schemas"].to_pylist(): |
|
if not catalog_schema: |
|
continue |
|
for schema_record in catalog_schema: |
|
if not schema_record: |
|
continue |
|
|
|
for table_record in schema_record["db_schema_tables"]: |
|
if table_record["table_name"] == name: |
|
return True |
|
|
|
return False |
|
|
|
def _create_sql_schema( |
|
self, |
|
frame: DataFrame, |
|
table_name: str, |
|
keys: list[str] | None = None, |
|
dtype: DtypeArg | None = None, |
|
schema: str | None = None, |
|
) -> str: |
|
raise NotImplementedError("not implemented for adbc") |
|
|
|
|
|
|
|
|
|
_SQL_TYPES = { |
|
"string": "TEXT", |
|
"floating": "REAL", |
|
"integer": "INTEGER", |
|
"datetime": "TIMESTAMP", |
|
"date": "DATE", |
|
"time": "TIME", |
|
"boolean": "INTEGER", |
|
} |
|
|
|
|
|
def _get_unicode_name(name: object): |
|
try: |
|
uname = str(name).encode("utf-8", "strict").decode("utf-8") |
|
except UnicodeError as err: |
|
raise ValueError(f"Cannot convert identifier to UTF-8: '{name}'") from err |
|
return uname |
|
|
|
|
|
def _get_valid_sqlite_name(name: object): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
uname = _get_unicode_name(name) |
|
if not len(uname): |
|
raise ValueError("Empty table or column name specified") |
|
|
|
nul_index = uname.find("\x00") |
|
if nul_index >= 0: |
|
raise ValueError("SQLite identifier cannot contain NULs") |
|
return '"' + uname.replace('"', '""') + '"' |
|
|
|
|
|
class SQLiteTable(SQLTable): |
|
""" |
|
Patch the SQLTable for fallback support. |
|
Instead of a table variable just use the Create Table statement. |
|
""" |
|
|
|
def __init__(self, *args, **kwargs) -> None: |
|
super().__init__(*args, **kwargs) |
|
|
|
self._register_date_adapters() |
|
|
|
def _register_date_adapters(self) -> None: |
|
|
|
|
|
import sqlite3 |
|
|
|
|
|
|
|
def _adapt_time(t) -> str: |
|
|
|
return f"{t.hour:02d}:{t.minute:02d}:{t.second:02d}.{t.microsecond:06d}" |
|
|
|
|
|
|
|
|
|
|
|
adapt_date_iso = lambda val: val.isoformat() |
|
adapt_datetime_iso = lambda val: val.isoformat(" ") |
|
|
|
sqlite3.register_adapter(time, _adapt_time) |
|
|
|
sqlite3.register_adapter(date, adapt_date_iso) |
|
sqlite3.register_adapter(datetime, adapt_datetime_iso) |
|
|
|
convert_date = lambda val: date.fromisoformat(val.decode()) |
|
convert_timestamp = lambda val: datetime.fromisoformat(val.decode()) |
|
|
|
sqlite3.register_converter("date", convert_date) |
|
sqlite3.register_converter("timestamp", convert_timestamp) |
|
|
|
def sql_schema(self) -> str: |
|
return str(";\n".join(self.table)) |
|
|
|
def _execute_create(self) -> None: |
|
with self.pd_sql.run_transaction() as conn: |
|
for stmt in self.table: |
|
conn.execute(stmt) |
|
|
|
def insert_statement(self, *, num_rows: int) -> str: |
|
names = list(map(str, self.frame.columns)) |
|
wld = "?" |
|
escape = _get_valid_sqlite_name |
|
|
|
if self.index is not None: |
|
for idx in self.index[::-1]: |
|
names.insert(0, idx) |
|
|
|
bracketed_names = [escape(column) for column in names] |
|
col_names = ",".join(bracketed_names) |
|
|
|
row_wildcards = ",".join([wld] * len(names)) |
|
wildcards = ",".join([f"({row_wildcards})" for _ in range(num_rows)]) |
|
insert_statement = ( |
|
f"INSERT INTO {escape(self.name)} ({col_names}) VALUES {wildcards}" |
|
) |
|
return insert_statement |
|
|
|
def _execute_insert(self, conn, keys, data_iter) -> int: |
|
data_list = list(data_iter) |
|
conn.executemany(self.insert_statement(num_rows=1), data_list) |
|
return conn.rowcount |
|
|
|
def _execute_insert_multi(self, conn, keys, data_iter) -> int: |
|
data_list = list(data_iter) |
|
flattened_data = [x for row in data_list for x in row] |
|
conn.execute(self.insert_statement(num_rows=len(data_list)), flattened_data) |
|
return conn.rowcount |
|
|
|
def _create_table_setup(self): |
|
""" |
|
Return a list of SQL statements that creates a table reflecting the |
|
structure of a DataFrame. The first entry will be a CREATE TABLE |
|
statement while the rest will be CREATE INDEX statements. |
|
""" |
|
column_names_and_types = self._get_column_names_and_types(self._sql_type_name) |
|
escape = _get_valid_sqlite_name |
|
|
|
create_tbl_stmts = [ |
|
escape(cname) + " " + ctype for cname, ctype, _ in column_names_and_types |
|
] |
|
|
|
if self.keys is not None and len(self.keys): |
|
if not is_list_like(self.keys): |
|
keys = [self.keys] |
|
else: |
|
keys = self.keys |
|
cnames_br = ", ".join([escape(c) for c in keys]) |
|
create_tbl_stmts.append( |
|
f"CONSTRAINT {self.name}_pk PRIMARY KEY ({cnames_br})" |
|
) |
|
if self.schema: |
|
schema_name = self.schema + "." |
|
else: |
|
schema_name = "" |
|
create_stmts = [ |
|
"CREATE TABLE " |
|
+ schema_name |
|
+ escape(self.name) |
|
+ " (\n" |
|
+ ",\n ".join(create_tbl_stmts) |
|
+ "\n)" |
|
] |
|
|
|
ix_cols = [cname for cname, _, is_index in column_names_and_types if is_index] |
|
if len(ix_cols): |
|
cnames = "_".join(ix_cols) |
|
cnames_br = ",".join([escape(c) for c in ix_cols]) |
|
create_stmts.append( |
|
"CREATE INDEX " |
|
+ escape("ix_" + self.name + "_" + cnames) |
|
+ "ON " |
|
+ escape(self.name) |
|
+ " (" |
|
+ cnames_br |
|
+ ")" |
|
) |
|
|
|
return create_stmts |
|
|
|
def _sql_type_name(self, col): |
|
dtype: DtypeArg = self.dtype or {} |
|
if is_dict_like(dtype): |
|
dtype = cast(dict, dtype) |
|
if col.name in dtype: |
|
return dtype[col.name] |
|
|
|
|
|
|
|
col_type = lib.infer_dtype(col, skipna=True) |
|
|
|
if col_type == "timedelta64": |
|
warnings.warn( |
|
"the 'timedelta' type is not supported, and will be " |
|
"written as integer values (ns frequency) to the database.", |
|
UserWarning, |
|
stacklevel=find_stack_level(), |
|
) |
|
col_type = "integer" |
|
|
|
elif col_type == "datetime64": |
|
col_type = "datetime" |
|
|
|
elif col_type == "empty": |
|
col_type = "string" |
|
|
|
elif col_type == "complex": |
|
raise ValueError("Complex datatypes not supported") |
|
|
|
if col_type not in _SQL_TYPES: |
|
col_type = "string" |
|
|
|
return _SQL_TYPES[col_type] |
|
|
|
|
|
class SQLiteDatabase(PandasSQL): |
|
""" |
|
Version of SQLDatabase to support SQLite connections (fallback without |
|
SQLAlchemy). This should only be used internally. |
|
|
|
Parameters |
|
---------- |
|
con : sqlite connection object |
|
|
|
""" |
|
|
|
def __init__(self, con) -> None: |
|
self.con = con |
|
|
|
@contextmanager |
|
def run_transaction(self): |
|
cur = self.con.cursor() |
|
try: |
|
yield cur |
|
self.con.commit() |
|
except Exception: |
|
self.con.rollback() |
|
raise |
|
finally: |
|
cur.close() |
|
|
|
def execute(self, sql: str | Select | TextClause, params=None): |
|
if not isinstance(sql, str): |
|
raise TypeError("Query must be a string unless using sqlalchemy.") |
|
args = [] if params is None else [params] |
|
cur = self.con.cursor() |
|
try: |
|
cur.execute(sql, *args) |
|
return cur |
|
except Exception as exc: |
|
try: |
|
self.con.rollback() |
|
except Exception as inner_exc: |
|
ex = DatabaseError( |
|
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback" |
|
) |
|
raise ex from inner_exc |
|
|
|
ex = DatabaseError(f"Execution failed on sql '{sql}': {exc}") |
|
raise ex from exc |
|
|
|
@staticmethod |
|
def _query_iterator( |
|
cursor, |
|
chunksize: int, |
|
columns, |
|
index_col=None, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
dtype: DtypeArg | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
): |
|
"""Return generator through chunked result set""" |
|
has_read_data = False |
|
while True: |
|
data = cursor.fetchmany(chunksize) |
|
if type(data) == tuple: |
|
data = list(data) |
|
if not data: |
|
cursor.close() |
|
if not has_read_data: |
|
result = DataFrame.from_records( |
|
[], columns=columns, coerce_float=coerce_float |
|
) |
|
if dtype: |
|
result = result.astype(dtype) |
|
yield result |
|
break |
|
|
|
has_read_data = True |
|
yield _wrap_result( |
|
data, |
|
columns, |
|
index_col=index_col, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
dtype=dtype, |
|
dtype_backend=dtype_backend, |
|
) |
|
|
|
def read_query( |
|
self, |
|
sql, |
|
index_col=None, |
|
coerce_float: bool = True, |
|
parse_dates=None, |
|
params=None, |
|
chunksize: int | None = None, |
|
dtype: DtypeArg | None = None, |
|
dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", |
|
) -> DataFrame | Iterator[DataFrame]: |
|
cursor = self.execute(sql, params) |
|
columns = [col_desc[0] for col_desc in cursor.description] |
|
|
|
if chunksize is not None: |
|
return self._query_iterator( |
|
cursor, |
|
chunksize, |
|
columns, |
|
index_col=index_col, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
dtype=dtype, |
|
dtype_backend=dtype_backend, |
|
) |
|
else: |
|
data = self._fetchall_as_list(cursor) |
|
cursor.close() |
|
|
|
frame = _wrap_result( |
|
data, |
|
columns, |
|
index_col=index_col, |
|
coerce_float=coerce_float, |
|
parse_dates=parse_dates, |
|
dtype=dtype, |
|
dtype_backend=dtype_backend, |
|
) |
|
return frame |
|
|
|
def _fetchall_as_list(self, cur): |
|
result = cur.fetchall() |
|
if not isinstance(result, list): |
|
result = list(result) |
|
return result |
|
|
|
def to_sql( |
|
self, |
|
frame, |
|
name: str, |
|
if_exists: str = "fail", |
|
index: bool = True, |
|
index_label=None, |
|
schema=None, |
|
chunksize: int | None = None, |
|
dtype: DtypeArg | None = None, |
|
method: Literal["multi"] | Callable | None = None, |
|
engine: str = "auto", |
|
**engine_kwargs, |
|
) -> int | None: |
|
""" |
|
Write records stored in a DataFrame to a SQL database. |
|
|
|
Parameters |
|
---------- |
|
frame: DataFrame |
|
name: string |
|
Name of SQL table. |
|
if_exists: {'fail', 'replace', 'append'}, default 'fail' |
|
fail: If table exists, do nothing. |
|
replace: If table exists, drop it, recreate it, and insert data. |
|
append: If table exists, insert data. Create if it does not exist. |
|
index : bool, default True |
|
Write DataFrame index as a column |
|
index_label : string or sequence, default None |
|
Column label for index column(s). If None is given (default) and |
|
`index` is True, then the index names are used. |
|
A sequence should be given if the DataFrame uses MultiIndex. |
|
schema : string, default None |
|
Ignored parameter included for compatibility with SQLAlchemy |
|
version of ``to_sql``. |
|
chunksize : int, default None |
|
If not None, then rows will be written in batches of this |
|
size at a time. If None, all rows will be written at once. |
|
dtype : single type or dict of column name to SQL type, default None |
|
Optional specifying the datatype for columns. The SQL type should |
|
be a string. If all columns are of the same type, one single value |
|
can be used. |
|
method : {None, 'multi', callable}, default None |
|
Controls the SQL insertion clause used: |
|
|
|
* None : Uses standard SQL ``INSERT`` clause (one per row). |
|
* 'multi': Pass multiple values in a single ``INSERT`` clause. |
|
* callable with signature ``(pd_table, conn, keys, data_iter)``. |
|
|
|
Details and a sample callable implementation can be found in the |
|
section :ref:`insert method <io.sql.method>`. |
|
""" |
|
if dtype: |
|
if not is_dict_like(dtype): |
|
|
|
|
|
|
|
|
|
|
|
|
|
dtype = {col_name: dtype for col_name in frame} |
|
else: |
|
dtype = cast(dict, dtype) |
|
|
|
for col, my_type in dtype.items(): |
|
if not isinstance(my_type, str): |
|
raise ValueError(f"{col} ({my_type}) not a string") |
|
|
|
table = SQLiteTable( |
|
name, |
|
self, |
|
frame=frame, |
|
index=index, |
|
if_exists=if_exists, |
|
index_label=index_label, |
|
dtype=dtype, |
|
) |
|
table.create() |
|
return table.insert(chunksize, method) |
|
|
|
def has_table(self, name: str, schema: str | None = None) -> bool: |
|
wld = "?" |
|
query = f""" |
|
SELECT |
|
name |
|
FROM |
|
sqlite_master |
|
WHERE |
|
type IN ('table', 'view') |
|
AND name={wld}; |
|
""" |
|
|
|
return len(self.execute(query, [name]).fetchall()) > 0 |
|
|
|
def get_table(self, table_name: str, schema: str | None = None) -> None: |
|
return None |
|
|
|
def drop_table(self, name: str, schema: str | None = None) -> None: |
|
drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}" |
|
self.execute(drop_sql) |
|
|
|
def _create_sql_schema( |
|
self, |
|
frame, |
|
table_name: str, |
|
keys=None, |
|
dtype: DtypeArg | None = None, |
|
schema: str | None = None, |
|
) -> str: |
|
table = SQLiteTable( |
|
table_name, |
|
self, |
|
frame=frame, |
|
index=False, |
|
keys=keys, |
|
dtype=dtype, |
|
schema=schema, |
|
) |
|
return str(table.sql_schema()) |
|
|
|
|
|
def get_schema( |
|
frame, |
|
name: str, |
|
keys=None, |
|
con=None, |
|
dtype: DtypeArg | None = None, |
|
schema: str | None = None, |
|
) -> str: |
|
""" |
|
Get the SQL db table schema for the given frame. |
|
|
|
Parameters |
|
---------- |
|
frame : DataFrame |
|
name : str |
|
name of SQL table |
|
keys : string or sequence, default: None |
|
columns to use a primary key |
|
con: ADBC Connection, SQLAlchemy connectable, sqlite3 connection, default: None |
|
ADBC provides high performance I/O with native type support, where available. |
|
Using SQLAlchemy makes it possible to use any DB supported by that |
|
library |
|
If a DBAPI2 object, only sqlite3 is supported. |
|
dtype : dict of column name to SQL type, default None |
|
Optional specifying the datatype for columns. The SQL type should |
|
be a SQLAlchemy type, or a string for sqlite3 fallback connection. |
|
schema: str, default: None |
|
Optional specifying the schema to be used in creating the table. |
|
""" |
|
with pandasSQL_builder(con=con) as pandas_sql: |
|
return pandas_sql._create_sql_schema( |
|
frame, name, keys=keys, dtype=dtype, schema=schema |
|
) |
|
|