|
|
|
import duckdb |
|
import pandas as pd |
|
import pyarrow as pa |
|
import pyarrow.ipc |
|
from pathlib import Path |
|
import tempfile |
|
import os |
|
import shutil |
|
from typing import Optional, List, Dict, Any, Union, Iterator, Generator, Tuple |
|
|
|
|
|
|
|
class DatabaseAPIError(Exception): |
|
"""Base exception for our custom API.""" |
|
pass |
|
|
|
class QueryError(DatabaseAPIError): |
|
"""Exception raised for errors during query execution.""" |
|
pass |
|
|
|
|
|
def _format_copy_options(options: Optional[Dict[str, Any]]) -> str: |
|
if not options: |
|
return "" |
|
opts_parts = [] |
|
for k, v in options.items(): |
|
key_upper = k.upper() |
|
if isinstance(v, bool): |
|
value_repr = str(v).upper() |
|
elif isinstance(v, (int, float)): |
|
value_repr = str(v) |
|
elif isinstance(v, str): |
|
escaped_v = v.replace("'", "''") |
|
value_repr = f"'{escaped_v}'" |
|
else: |
|
value_repr = repr(v) |
|
opts_parts.append(f"{key_upper} {value_repr}") |
|
|
|
opts_str = ", ".join(opts_parts) |
|
return f"WITH ({opts_str})" |
|
|
|
|
|
class DatabaseAPI: |
|
def __init__(self, |
|
db_path: Union[str, Path] = ":memory:", |
|
read_only: bool = False, |
|
config: Optional[Dict[str, str]] = None): |
|
self._db_path = str(db_path) |
|
self._config = config or {} |
|
self._read_only = read_only |
|
self._conn: Optional[duckdb.DuckDBPyConnection] = None |
|
try: |
|
self._conn = duckdb.connect( |
|
database=self._db_path, |
|
read_only=self._read_only, |
|
config=self._config |
|
) |
|
print(f"Connected to DuckDB database at '{self._db_path}'") |
|
except duckdb.Error as e: |
|
print(f"Failed to connect to DuckDB: {e}") |
|
raise DatabaseAPIError(f"Failed to connect to DuckDB: {e}") from e |
|
|
|
def _ensure_connection(self): |
|
if self._conn is None: |
|
raise DatabaseAPIError("Database connection is not established or has been closed.") |
|
try: |
|
self._conn.execute("SELECT 1", []) |
|
except (duckdb.ConnectionException, RuntimeError) as e: |
|
if "Connection has already been closed" in str(e) or "connection closed" in str(e).lower(): |
|
self._conn = None |
|
raise DatabaseAPIError("Database connection is closed.") from e |
|
else: |
|
raise DatabaseAPIError(f"Database connection error: {e}") from e |
|
|
|
|
|
def execute_sql(self, sql: str, parameters: Optional[List[Any]] = None) -> None: |
|
self._ensure_connection() |
|
print(f"Executing SQL: {sql}") |
|
try: |
|
self._conn.execute(sql, parameters) |
|
except duckdb.Error as e: |
|
print(f"Error executing SQL: {e}") |
|
raise QueryError(f"Error executing SQL: {e}") from e |
|
|
|
def query_sql(self, sql: str, parameters: Optional[List[Any]] = None) -> duckdb.DuckDBPyRelation: |
|
self._ensure_connection() |
|
print(f"Querying SQL: {sql}") |
|
try: |
|
return self._conn.sql(sql, params=parameters) |
|
except duckdb.Error as e: |
|
print(f"Error querying SQL: {e}") |
|
raise QueryError(f"Error querying SQL: {e}") from e |
|
|
|
def query_df(self, sql: str, parameters: Optional[List[Any]] = None) -> pd.DataFrame: |
|
self._ensure_connection() |
|
print(f"Querying SQL to DataFrame: {sql}") |
|
try: |
|
return self._conn.execute(sql, parameters).df() |
|
except ImportError: |
|
print("Pandas library is required for DataFrame operations.") |
|
raise |
|
except duckdb.Error as e: |
|
print(f"Error querying SQL to DataFrame: {e}") |
|
raise QueryError(f"Error querying SQL to DataFrame: {e}") from e |
|
|
|
def query_arrow(self, sql: str, parameters: Optional[List[Any]] = None) -> pa.Table: |
|
self._ensure_connection() |
|
print(f"Querying SQL to Arrow Table: {sql}") |
|
try: |
|
return self._conn.execute(sql, parameters).arrow() |
|
except ImportError: |
|
print("PyArrow library is required for Arrow operations.") |
|
raise |
|
except duckdb.Error as e: |
|
print(f"Error querying SQL to Arrow Table: {e}") |
|
raise QueryError(f"Error querying SQL to Arrow Table: {e}") from e |
|
|
|
def query_fetchall(self, sql: str, parameters: Optional[List[Any]] = None) -> List[Tuple[Any, ...]]: |
|
self._ensure_connection() |
|
print(f"Querying SQL and fetching all: {sql}") |
|
try: |
|
return self._conn.execute(sql, parameters).fetchall() |
|
except duckdb.Error as e: |
|
print(f"Error querying SQL: {e}") |
|
raise QueryError(f"Error querying SQL: {e}") from e |
|
|
|
def query_fetchone(self, sql: str, parameters: Optional[List[Any]] = None) -> Optional[Tuple[Any, ...]]: |
|
self._ensure_connection() |
|
print(f"Querying SQL and fetching one: {sql}") |
|
try: |
|
return self._conn.execute(sql, parameters).fetchone() |
|
except duckdb.Error as e: |
|
print(f"Error querying SQL: {e}") |
|
raise QueryError(f"Error querying SQL: {e}") from e |
|
|
|
|
|
def register_df(self, name: str, df: pd.DataFrame): |
|
self._ensure_connection() |
|
print(f"Registering DataFrame as '{name}'") |
|
try: |
|
self._conn.register(name, df) |
|
except duckdb.Error as e: |
|
print(f"Error registering DataFrame: {e}") |
|
raise QueryError(f"Error registering DataFrame: {e}") from e |
|
|
|
def unregister_df(self, name: str): |
|
self._ensure_connection() |
|
print(f"Unregistering virtual table '{name}'") |
|
try: |
|
self._conn.unregister(name) |
|
except duckdb.Error as e: |
|
if "not found" in str(e).lower(): |
|
print(f"Warning: Virtual table '{name}' not found for unregistering.") |
|
else: |
|
print(f"Error unregistering virtual table: {e}") |
|
raise QueryError(f"Error unregistering virtual table: {e}") from e |
|
|
|
|
|
def install_extension(self, extension_name: str, force_install: bool = False): |
|
self._ensure_connection() |
|
print(f"Installing extension: {extension_name}") |
|
try: |
|
self._conn.install_extension(extension_name, force_install=force_install) |
|
except duckdb.Error as e: |
|
print(f"Error installing extension '{extension_name}': {e}") |
|
raise DatabaseAPIError(f"Error installing extension '{extension_name}': {e}") from e |
|
|
|
def load_extension(self, extension_name: str): |
|
self._ensure_connection() |
|
print(f"Loading extension: {extension_name}") |
|
try: |
|
self._conn.load_extension(extension_name) |
|
|
|
except (duckdb.IOException, duckdb.CatalogException) as load_err: |
|
print(f"Error loading extension '{extension_name}': {load_err}") |
|
raise QueryError(f"Error loading extension '{extension_name}': {load_err}") from load_err |
|
except duckdb.Error as e: |
|
print(f"Unexpected DuckDB error loading extension '{extension_name}': {e}") |
|
raise DatabaseAPIError(f"Unexpected DuckDB error loading extension '{extension_name}': {e}") from e |
|
|
|
|
|
def export_database(self, directory_path: Union[str, Path]): |
|
self._ensure_connection() |
|
path_str = str(directory_path) |
|
if not os.path.isdir(path_str): |
|
try: |
|
os.makedirs(path_str) |
|
print(f"Created export directory: {path_str}") |
|
except OSError as e: |
|
raise DatabaseAPIError(f"Could not create export directory '{path_str}': {e}") from e |
|
print(f"Exporting database to directory: {path_str}") |
|
sql = f"EXPORT DATABASE '{path_str}' (FORMAT CSV)" |
|
try: |
|
self._conn.execute(sql) |
|
print("Database export completed successfully.") |
|
except duckdb.Error as e: |
|
print(f"Error exporting database: {e}") |
|
raise DatabaseAPIError(f"Error exporting database: {e}") from e |
|
|
|
def _export_data(self, |
|
source: str, |
|
output_path: Union[str, Path], |
|
file_format: str, |
|
options: Optional[Dict[str, Any]] = None): |
|
self._ensure_connection() |
|
path_str = str(output_path) |
|
options_str = _format_copy_options(options) |
|
source_safe = source.strip() |
|
|
|
if ' ' in source_safe or source_safe.upper().startswith(('SELECT', 'WITH', 'VALUES')): |
|
copy_source = f"({source})" |
|
else: |
|
|
|
copy_source = f'"{source_safe}"' |
|
|
|
|
|
sql = f"COPY {copy_source} TO '{path_str}' {options_str}" |
|
print(f"Exporting data to {path_str} (Format: {file_format}) with options: {options or {}}") |
|
try: |
|
self._conn.execute(sql) |
|
print("Data export completed successfully.") |
|
except duckdb.Error as e: |
|
print(f"Error exporting data: {e}") |
|
raise QueryError(f"Error exporting data to {file_format}: {e}") from e |
|
|
|
|
|
def export_data_to_csv(self, |
|
source: str, |
|
output_path: Union[str, Path], |
|
options: Optional[Dict[str, Any]] = None): |
|
csv_options = options.copy() if options else {} |
|
csv_options['FORMAT'] = 'CSV' |
|
if 'HEADER' not in {k.upper() for k in csv_options}: |
|
csv_options['HEADER'] = True |
|
self._export_data(source, output_path, "CSV", csv_options) |
|
|
|
def export_data_to_parquet(self, |
|
source: str, |
|
output_path: Union[str, Path], |
|
options: Optional[Dict[str, Any]] = None): |
|
parquet_options = options.copy() if options else {} |
|
parquet_options['FORMAT'] = 'PARQUET' |
|
self._export_data(source, output_path, "Parquet", parquet_options) |
|
|
|
def export_data_to_json(self, |
|
source: str, |
|
output_path: Union[str, Path], |
|
array_format: bool = True, |
|
options: Optional[Dict[str, Any]] = None): |
|
json_options = options.copy() if options else {} |
|
json_options['FORMAT'] = 'JSON' |
|
if 'ARRAY' not in {k.upper() for k in json_options}: |
|
json_options['ARRAY'] = array_format |
|
self._export_data(source, output_path, "JSON", json_options) |
|
|
|
def export_data_to_jsonl(self, |
|
source: str, |
|
output_path: Union[str, Path], |
|
options: Optional[Dict[str, Any]] = None): |
|
self.export_data_to_json(source, output_path, array_format=False, options=options) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def stream_query_df(self, |
|
sql: str, |
|
parameters: Optional[List[Any]] = None, |
|
vectors_per_chunk: int = 1 |
|
) -> Iterator[pd.DataFrame]: |
|
self._ensure_connection() |
|
print(f"Streaming DataFrame query (vectors per chunk {vectors_per_chunk}): {sql}") |
|
try: |
|
result_set = self._conn.execute(sql, parameters) |
|
while True: |
|
chunk_df = result_set.fetch_df_chunk(vectors_per_chunk) |
|
if chunk_df.empty: |
|
break |
|
yield chunk_df |
|
except ImportError: |
|
print("Pandas library is required for DataFrame streaming.") |
|
raise |
|
except duckdb.Error as e: |
|
print(f"Error streaming DataFrame query: {e}") |
|
raise QueryError(f"Error streaming DataFrame query: {e}") from e |
|
|
|
def stream_query_arrow(self, |
|
sql: str, |
|
parameters: Optional[List[Any]] = None, |
|
batch_size: int = 1000000 |
|
) -> Iterator[pa.RecordBatch]: |
|
""" |
|
Executes a SQL query and streams the results as Arrow RecordBatches. |
|
Useful for processing large results iteratively in Python without |
|
loading the entire result set into memory. |
|
|
|
Args: |
|
sql: The SQL query to execute. |
|
parameters: Optional list of parameters for prepared statements. |
|
batch_size: The approximate number of rows per Arrow RecordBatch. |
|
|
|
Yields: |
|
pyarrow.RecordBatch: Chunks of the result set. |
|
|
|
Raises: |
|
QueryError: If the query execution or fetching fails. |
|
ImportError: If pyarrow is not installed. |
|
""" |
|
self._ensure_connection() |
|
print(f"Streaming Arrow query (batch size {batch_size}): {sql}") |
|
record_batch_reader = None |
|
try: |
|
|
|
result_set = self._conn.execute(sql, parameters) |
|
|
|
record_batch_reader = result_set.fetch_record_batch(batch_size) |
|
|
|
for batch in record_batch_reader: |
|
yield batch |
|
|
|
except ImportError: |
|
print("PyArrow library is required for Arrow streaming.") |
|
raise |
|
except duckdb.Error as e: |
|
print(f"Error streaming Arrow query: {e}") |
|
raise QueryError(f"Error streaming Arrow query: {e}") from e |
|
finally: |
|
|
|
if record_batch_reader is not None: |
|
|
|
|
|
del record_batch_reader |
|
|
|
|
|
|
|
if 'result_set' in locals() and result_set: |
|
try: |
|
|
|
del result_set |
|
except Exception: |
|
pass |
|
|
|
|
|
def close(self): |
|
if self._conn: |
|
conn_id = id(self._conn) |
|
print(f"Closing connection to '{self._db_path}' (ID: {conn_id})") |
|
try: |
|
self._conn.close() |
|
except duckdb.Error as e: |
|
print(f"Error closing DuckDB connection (ID: {conn_id}): {e}") |
|
finally: |
|
self._conn = None |
|
else: |
|
print("Connection already closed or never opened.") |
|
|
|
def __enter__(self): |
|
self._ensure_connection() |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_value, traceback): |
|
self.close() |
|
|
|
def __del__(self): |
|
if self._conn: |
|
print(f"ResourceWarning: DatabaseAPI for '{self._db_path}' was not explicitly closed. Closing now in __del__.") |
|
try: |
|
self.close() |
|
except Exception as e: |
|
print(f"Exception during implicit close in __del__: {e}") |
|
self._conn = None |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
temp_dir_obj = tempfile.TemporaryDirectory() |
|
temp_dir = temp_dir_obj.name |
|
print(f"\n--- Using temporary directory: {temp_dir} ---") |
|
db_file = Path(temp_dir) / "export_test.db" |
|
try: |
|
with DatabaseAPI(db_path=db_file) as db_api: |
|
db_api.execute_sql("CREATE OR REPLACE TABLE products(id INTEGER, name VARCHAR, price DECIMAL(8,2))") |
|
db_api.execute_sql("INSERT INTO products VALUES (101, 'Gadget', 19.99), (102, 'Widget', 35.00), (103, 'Thing''amajig', 9.50)") |
|
db_api.execute_sql("CREATE OR REPLACE TABLE sales(product_id INTEGER, sale_date DATE, quantity INTEGER)") |
|
db_api.execute_sql("INSERT INTO sales VALUES (101, '2023-10-26', 5), (102, '2023-10-26', 2), (101, '2023-10-27', 3)") |
|
export_dir = Path(temp_dir) / "exported_db" |
|
db_api.export_database(export_dir) |
|
csv_path = Path(temp_dir) / "products_export.csv" |
|
db_api.export_data_to_csv('products', csv_path, options={'HEADER': True}) |
|
parquet_path = Path(temp_dir) / "high_value_products.parquet" |
|
db_api.export_data_to_parquet("SELECT * FROM products WHERE price > 20", parquet_path, options={'COMPRESSION': 'SNAPPY'}) |
|
json_path = Path(temp_dir) / "sales.json" |
|
db_api.export_data_to_json("SELECT * FROM sales", json_path, array_format=True) |
|
jsonl_path = Path(temp_dir) / "sales.jsonl" |
|
db_api.export_data_to_jsonl("SELECT * FROM sales ORDER BY sale_date", jsonl_path) |
|
|
|
with DatabaseAPI() as db_api: |
|
db_api.execute_sql("CREATE TABLE large_range AS SELECT range AS id, range % 100 AS category FROM range(1000)") |
|
for batch in db_api.stream_query_arrow("SELECT * FROM large_range", batch_size=200): |
|
pass |
|
for df_chunk in db_api.stream_query_df("SELECT * FROM large_range", vectors_per_chunk=1): |
|
pass |
|
finally: |
|
temp_dir_obj.cleanup() |
|
print(f"\n--- Cleaned up temporary directory: {temp_dir} ---") |