|
import asyncio |
|
from concurrent.futures import ThreadPoolExecutor |
|
from typing import TypedDict |
|
import duckdb |
|
import pandas as pd |
|
|
|
async def execute_sql_query(sql_query: str) -> pd.DataFrame: |
|
"""Executes a SQL query on the DRIAS database and returns the results. |
|
|
|
This function connects to the DuckDB database containing DRIAS climate data |
|
and executes the provided SQL query. It handles the database connection and |
|
returns the results as a pandas DataFrame. |
|
|
|
Args: |
|
sql_query (str): The SQL query to execute |
|
|
|
Returns: |
|
pd.DataFrame: A DataFrame containing the query results |
|
|
|
Raises: |
|
duckdb.Error: If there is an error executing the SQL query |
|
""" |
|
def _execute_query(): |
|
|
|
con = duckdb.connect() |
|
results = con.sql(sql_query).fetchdf() |
|
|
|
return results |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
with ThreadPoolExecutor() as executor: |
|
return await loop.run_in_executor(executor, _execute_query) |
|
|
|
|
|
class IndicatorPerYearAtLocationQueryParams(TypedDict, total=False): |
|
"""Parameters for querying an indicator's values over time at a location. |
|
|
|
This class defines the parameters needed to query climate indicator data |
|
for a specific location over multiple years. |
|
|
|
Attributes: |
|
indicator_column (str): The column name for the climate indicator |
|
latitude (str): The latitude coordinate of the location |
|
longitude (str): The longitude coordinate of the location |
|
model (str): The climate model to use (optional) |
|
""" |
|
indicator_column: str |
|
latitude: str |
|
longitude: str |
|
model: str |
|
|
|
|
|
def indicator_per_year_at_location_query( |
|
table: str, params: IndicatorPerYearAtLocationQueryParams |
|
) -> str: |
|
"""SQL Query to get the evolution of an indicator per year at a certain location |
|
|
|
Args: |
|
table (str): sql table of the indicator |
|
params (IndicatorPerYearAtLocationQueryParams) : dictionary with the required params for the query |
|
|
|
Returns: |
|
str: the sql query |
|
""" |
|
indicator_column = params.get("indicator_column") |
|
latitude = params.get("latitude") |
|
longitude = params.get("longitude") |
|
|
|
if indicator_column is None or latitude is None or longitude is None: |
|
return "" |
|
|
|
table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'" |
|
|
|
sql_query = f"SELECT year, {indicator_column}, model\nFROM {table}\nWHERE latitude = {latitude} \nAnd longitude = {longitude} \nOrder by Year" |
|
|
|
return sql_query |
|
|
|
class IndicatorForGivenYearQueryParams(TypedDict, total=False): |
|
"""Parameters for querying an indicator's values across locations for a year. |
|
|
|
This class defines the parameters needed to query climate indicator data |
|
across different locations for a specific year. |
|
|
|
Attributes: |
|
indicator_column (str): The column name for the climate indicator |
|
year (str): The year to query |
|
model (str): The climate model to use (optional) |
|
""" |
|
indicator_column: str |
|
year: str |
|
model: str |
|
|
|
def indicator_for_given_year_query( |
|
table:str, params: IndicatorForGivenYearQueryParams |
|
) -> str: |
|
"""SQL Query to get the values of an indicator with their latitudes, longitudes and models for a given year |
|
|
|
Args: |
|
table (str): sql table of the indicator |
|
params (IndicatorForGivenYearQueryParams): dictionarry with the required params for the query |
|
|
|
Returns: |
|
str: the sql query |
|
""" |
|
indicator_column = params.get("indicator_column") |
|
year = params.get('year') |
|
if year is None or indicator_column is None: |
|
return "" |
|
|
|
table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'" |
|
|
|
sql_query = f"Select {indicator_column}, latitude, longitude, model\nFrom {table}\nWhere year = {year}" |
|
return sql_query |