|
import duckdb |
|
import os |
|
from fastapi import FastAPI, HTTPException, Request, Path as FastPath |
|
from fastapi.responses import FileResponse, StreamingResponse |
|
from pydantic import BaseModel, Field |
|
from typing import List, Dict, Any, Optional |
|
import logging |
|
import io |
|
import asyncio |
|
|
|
|
|
DATABASE_PATH = os.environ.get("DUCKDB_PATH", "data/mydatabase.db") |
|
DATA_DIR = "data" |
|
|
|
|
|
os.makedirs(DATA_DIR, exist_ok=True) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI( |
|
title="DuckDB API", |
|
description="An API to interact with a DuckDB database.", |
|
version="0.1.0" |
|
) |
|
|
|
|
|
|
|
|
|
def get_db(): |
|
try: |
|
|
|
initialize = not os.path.exists(DATABASE_PATH) or os.path.getsize(DATABASE_PATH) == 0 |
|
conn = duckdb.connect(DATABASE_PATH, read_only=False) |
|
if initialize: |
|
logger.info(f"Database file not found or empty at {DATABASE_PATH}. Initializing.") |
|
|
|
|
|
yield conn |
|
except duckdb.Error as e: |
|
logger.error(f"Database connection error: {e}") |
|
raise HTTPException(status_code=500, detail=f"Database connection error: {e}") |
|
finally: |
|
if 'conn' in locals() and conn: |
|
conn.close() |
|
|
|
|
|
class ColumnDefinition(BaseModel): |
|
name: str |
|
type: str |
|
|
|
class CreateTableRequest(BaseModel): |
|
columns: List[ColumnDefinition] |
|
|
|
class CreateRowRequest(BaseModel): |
|
|
|
rows: List[Dict[str, Any]] |
|
|
|
class UpdateRowRequest(BaseModel): |
|
updates: Dict[str, Any] |
|
condition: str |
|
|
|
class DeleteRowRequest(BaseModel): |
|
condition: str |
|
|
|
class ApiResponse(BaseModel): |
|
message: str |
|
details: Optional[Any] = None |
|
|
|
|
|
def safe_identifier(name: str) -> str: |
|
"""Quotes an identifier safely.""" |
|
if not name.isidentifier(): |
|
|
|
|
|
try: |
|
conn = duckdb.connect(':memory:') |
|
quoted = conn.execute(f"SELECT '{name}'::IDENTIFIER").fetchone()[0] |
|
conn.close() |
|
return quoted |
|
except duckdb.Error: |
|
raise HTTPException(status_code=400, detail=f"Invalid identifier: {name}") |
|
|
|
return f'"{name}"' |
|
|
|
def generate_column_sql(columns: List[ColumnDefinition]) -> str: |
|
"""Generates the column definition part of a CREATE TABLE statement.""" |
|
defs = [] |
|
for col in columns: |
|
col_name_safe = safe_identifier(col.name) |
|
|
|
allowed_types = ['INTEGER', 'VARCHAR', 'TEXT', 'BOOLEAN', 'FLOAT', 'DOUBLE', 'DATE', 'TIMESTAMP', 'BLOB', 'BIGINT', 'DECIMAL'] |
|
type_upper = col.type.strip().upper() |
|
|
|
if not (type_upper.startswith('DECIMAL(') and type_upper.endswith(')')) and \ |
|
not any(base_type in type_upper for base_type in allowed_types): |
|
raise HTTPException(status_code=400, detail=f"Unsupported or invalid data type: {col.type}") |
|
defs.append(f"{col_name_safe} {col.type}") |
|
return ", ".join(defs) |
|
|
|
|
|
|
|
@app.get("/", summary="API Root", response_model=ApiResponse) |
|
async def read_root(): |
|
"""Provides a welcome message for the API.""" |
|
return {"message": "Welcome to the DuckDB API!"} |
|
|
|
@app.post("/tables/{table_name}", summary="Create Table", response_model=ApiResponse, status_code=201) |
|
async def create_table( |
|
table_name: str = FastPath(..., description="Name of the table to create"), |
|
schema: CreateTableRequest = ..., |
|
): |
|
"""Creates a new table with the specified schema.""" |
|
table_name_safe = safe_identifier(table_name) |
|
if not schema.columns: |
|
raise HTTPException(status_code=400, detail="Table must have at least one column.") |
|
|
|
try: |
|
columns_sql = generate_column_sql(schema.columns) |
|
sql = f"CREATE TABLE {table_name_safe} ({columns_sql});" |
|
logger.info(f"Executing SQL: {sql}") |
|
for conn in get_db(): |
|
conn.execute(sql) |
|
return {"message": f"Table '{table_name}' created successfully."} |
|
except HTTPException as e: |
|
raise e |
|
except duckdb.Error as e: |
|
logger.error(f"Error creating table '{table_name}': {e}") |
|
raise HTTPException(status_code=400, detail=f"Error creating table: {e}") |
|
except Exception as e: |
|
logger.error(f"Unexpected error creating table '{table_name}': {e}") |
|
raise HTTPException(status_code=500, detail="An unexpected error occurred.") |
|
|
|
@app.get("/tables/{table_name}", summary="Read Table Data") |
|
async def read_table( |
|
table_name: str = FastPath(..., description="Name of the table to read from"), |
|
limit: Optional[int] = None, |
|
offset: Optional[int] = None |
|
): |
|
"""Reads and returns all rows from a specified table. Supports limit and offset.""" |
|
table_name_safe = safe_identifier(table_name) |
|
sql = f"SELECT * FROM {table_name_safe}" |
|
params = [] |
|
if limit is not None: |
|
sql += " LIMIT ?" |
|
params.append(limit) |
|
if offset is not None: |
|
sql += " OFFSET ?" |
|
params.append(offset) |
|
sql += ";" |
|
|
|
try: |
|
logger.info(f"Executing SQL: {sql} with params: {params}") |
|
for conn in get_db(): |
|
result = conn.execute(sql, params).fetchall() |
|
|
|
column_names = [desc[0] for desc in conn.description] |
|
data = [dict(zip(column_names, row)) for row in result] |
|
return data |
|
except duckdb.CatalogException as e: |
|
raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found.") |
|
except duckdb.Error as e: |
|
logger.error(f"Error reading table '{table_name}': {e}") |
|
raise HTTPException(status_code=400, detail=f"Error reading table: {e}") |
|
except Exception as e: |
|
logger.error(f"Unexpected error reading table '{table_name}': {e}") |
|
raise HTTPException(status_code=500, detail="An unexpected error occurred.") |
|
|
|
|
|
@app.post("/tables/{table_name}/rows", summary="Create Rows", response_model=ApiResponse, status_code=201) |
|
async def create_rows( |
|
table_name: str = FastPath(..., description="Name of the table to insert into"), |
|
request: CreateRowRequest = ..., |
|
): |
|
"""Inserts one or more rows into the specified table.""" |
|
table_name_safe = safe_identifier(table_name) |
|
if not request.rows: |
|
raise HTTPException(status_code=400, detail="No rows provided to insert.") |
|
|
|
|
|
columns = list(request.rows[0].keys()) |
|
columns_safe = [safe_identifier(col) for col in columns] |
|
placeholders = ", ".join(["?"] * len(columns)) |
|
columns_sql = ", ".join(columns_safe) |
|
|
|
sql = f"INSERT INTO {table_name_safe} ({columns_sql}) VALUES ({placeholders});" |
|
|
|
|
|
params_list = [] |
|
for row_dict in request.rows: |
|
if list(row_dict.keys()) != columns: |
|
raise HTTPException(status_code=400, detail="All rows must have the same columns in the same order.") |
|
params_list.append(list(row_dict.values())) |
|
|
|
try: |
|
logger.info(f"Executing SQL: {sql} for {len(params_list)} rows") |
|
for conn in get_db(): |
|
conn.executemany(sql, params_list) |
|
conn.commit() |
|
return {"message": f"Successfully inserted {len(params_list)} rows into '{table_name}'."} |
|
except duckdb.CatalogException as e: |
|
raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found.") |
|
except duckdb.Error as e: |
|
logger.error(f"Error inserting rows into '{table_name}': {e}") |
|
|
|
|
|
raise HTTPException(status_code=400, detail=f"Error inserting rows: {e}") |
|
except Exception as e: |
|
logger.error(f"Unexpected error inserting rows into '{table_name}': {e}") |
|
raise HTTPException(status_code=500, detail="An unexpected error occurred.") |
|
|
|
|
|
@app.put("/tables/{table_name}/rows", summary="Update Rows", response_model=ApiResponse) |
|
async def update_rows( |
|
table_name: str = FastPath(..., description="Name of the table to update"), |
|
request: UpdateRowRequest = ..., |
|
): |
|
"""Updates rows in the table based on a condition.""" |
|
table_name_safe = safe_identifier(table_name) |
|
if not request.updates: |
|
raise HTTPException(status_code=400, detail="No updates provided.") |
|
if not request.condition: |
|
raise HTTPException(status_code=400, detail="Update condition (WHERE clause) is required.") |
|
|
|
set_clauses = [] |
|
params = [] |
|
for col, value in request.updates.items(): |
|
set_clauses.append(f"{safe_identifier(col)} = ?") |
|
params.append(value) |
|
|
|
set_sql = ", ".join(set_clauses) |
|
|
|
|
|
sql = f"UPDATE {table_name_safe} SET {set_sql} WHERE {request.condition};" |
|
|
|
try: |
|
logger.info(f"Executing SQL: {sql} with params: {params}") |
|
for conn in get_db(): |
|
|
|
conn.execute(sql, params) |
|
conn.commit() |
|
return {"message": f"Rows in '{table_name}' updated successfully based on condition."} |
|
except duckdb.CatalogException as e: |
|
raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found.") |
|
except duckdb.Error as e: |
|
logger.error(f"Error updating rows in '{table_name}': {e}") |
|
raise HTTPException(status_code=400, detail=f"Error updating rows: {e}") |
|
except Exception as e: |
|
logger.error(f"Unexpected error updating rows in '{table_name}': {e}") |
|
raise HTTPException(status_code=500, detail="An unexpected error occurred.") |
|
|
|
@app.delete("/tables/{table_name}/rows", summary="Delete Rows", response_model=ApiResponse) |
|
async def delete_rows( |
|
table_name: str = FastPath(..., description="Name of the table to delete from"), |
|
request: DeleteRowRequest = ..., |
|
): |
|
"""Deletes rows from the table based on a condition.""" |
|
table_name_safe = safe_identifier(table_name) |
|
if not request.condition: |
|
raise HTTPException(status_code=400, detail="Delete condition (WHERE clause) is required.") |
|
|
|
|
|
|
|
sql = f"DELETE FROM {table_name_safe} WHERE {request.condition};" |
|
|
|
try: |
|
logger.info(f"Executing SQL: {sql}") |
|
for conn in get_db(): |
|
|
|
conn.execute(sql) |
|
conn.commit() |
|
return {"message": f"Rows from '{table_name}' deleted successfully based on condition."} |
|
except duckdb.CatalogException as e: |
|
raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found.") |
|
except duckdb.Error as e: |
|
logger.error(f"Error deleting rows from '{table_name}': {e}") |
|
raise HTTPException(status_code=400, detail=f"Error deleting rows: {e}") |
|
except Exception as e: |
|
logger.error(f"Unexpected error deleting rows from '{table_name}': {e}") |
|
raise HTTPException(status_code=500, detail="An unexpected error occurred.") |
|
|
|
|
|
|
|
@app.get("/download/table/{table_name}", summary="Download Table as CSV") |
|
async def download_table_csv( |
|
table_name: str = FastPath(..., description="Name of the table to download") |
|
): |
|
"""Downloads the entire content of a table as a CSV file.""" |
|
table_name_safe = safe_identifier(table_name) |
|
|
|
sql = f"COPY (SELECT * FROM {table_name_safe}) TO STDOUT (FORMAT CSV, HEADER)" |
|
|
|
async def stream_csv_data(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
all_data_io = io.StringIO() |
|
|
|
|
|
for conn in get_db(): |
|
df = conn.execute(f"SELECT * FROM {table_name_safe}").df() |
|
|
|
|
|
df.to_csv(all_data_io, index=False) |
|
all_data_io.seek(0) |
|
|
|
|
|
chunk_size = 8192 |
|
while True: |
|
chunk = all_data_io.read(chunk_size) |
|
if not chunk: |
|
break |
|
yield chunk |
|
|
|
await asyncio.sleep(0) |
|
all_data_io.close() |
|
|
|
except duckdb.CatalogException as e: |
|
|
|
yield f"Error: Table '{table_name}' not found.".encode('utf-8') |
|
logger.error(f"Error downloading table '{table_name}': {e}") |
|
except duckdb.Error as e: |
|
yield f"Error: Could not export table '{table_name}'. {e}".encode('utf-8') |
|
logger.error(f"Error downloading table '{table_name}': {e}") |
|
except Exception as e: |
|
yield f"Error: An unexpected error occurred.".encode('utf-8') |
|
logger.error(f"Unexpected error downloading table '{table_name}': {e}") |
|
|
|
|
|
return StreamingResponse( |
|
stream_csv_data(), |
|
media_type="text/csv", |
|
headers={"Content-Disposition": f"attachment; filename={table_name}.csv"}, |
|
) |
|
|
|
|
|
@app.get("/download/database", summary="Download Database File") |
|
async def download_database_file(): |
|
"""Downloads the entire DuckDB database file.""" |
|
if not os.path.exists(DATABASE_PATH): |
|
raise HTTPException(status_code=404, detail="Database file not found.") |
|
|
|
|
|
|
|
|
|
logger.warning("Attempting to download database file. Ensure no active writes are occurring.") |
|
|
|
return FileResponse( |
|
path=DATABASE_PATH, |
|
filename=os.path.basename(DATABASE_PATH), |
|
media_type="application/octet-stream" |
|
) |
|
|
|
|
|
|
|
@app.get("/health", summary="Health Check", response_model=ApiResponse) |
|
async def health_check(): |
|
"""Checks if the API and database connection are working.""" |
|
try: |
|
for conn in get_db(): |
|
conn.execute("SELECT 1") |
|
return {"message": "API is healthy and database connection is successful."} |
|
except Exception as e: |
|
logger.error(f"Health check failed: {e}") |
|
raise HTTPException(status_code=503, detail=f"Health check failed: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|