armanddemasson's picture
feat: added multithreading to run sql queries in talk to drias
22ffcc6
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import TypedDict
import duckdb
import pandas as pd
async def execute_sql_query(sql_query: str) -> pd.DataFrame:
"""Executes a SQL query on the DRIAS database and returns the results.
This function connects to the DuckDB database containing DRIAS climate data
and executes the provided SQL query. It handles the database connection and
returns the results as a pandas DataFrame.
Args:
sql_query (str): The SQL query to execute
Returns:
pd.DataFrame: A DataFrame containing the query results
Raises:
duckdb.Error: If there is an error executing the SQL query
"""
def _execute_query():
# Execute the query
con = duckdb.connect()
results = con.sql(sql_query).fetchdf()
# return fetched data
return results
# Run the query in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
return await loop.run_in_executor(executor, _execute_query)
class IndicatorPerYearAtLocationQueryParams(TypedDict, total=False):
"""Parameters for querying an indicator's values over time at a location.
This class defines the parameters needed to query climate indicator data
for a specific location over multiple years.
Attributes:
indicator_column (str): The column name for the climate indicator
latitude (str): The latitude coordinate of the location
longitude (str): The longitude coordinate of the location
model (str): The climate model to use (optional)
"""
indicator_column: str
latitude: str
longitude: str
model: str
def indicator_per_year_at_location_query(
table: str, params: IndicatorPerYearAtLocationQueryParams
) -> str:
"""SQL Query to get the evolution of an indicator per year at a certain location
Args:
table (str): sql table of the indicator
params (IndicatorPerYearAtLocationQueryParams) : dictionary with the required params for the query
Returns:
str: the sql query
"""
indicator_column = params.get("indicator_column")
latitude = params.get("latitude")
longitude = params.get("longitude")
if indicator_column is None or latitude is None or longitude is None: # If one parameter is missing, returns an empty query
return ""
table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'"
sql_query = f"SELECT year, {indicator_column}, model\nFROM {table}\nWHERE latitude = {latitude} \nAnd longitude = {longitude} \nOrder by Year"
return sql_query
class IndicatorForGivenYearQueryParams(TypedDict, total=False):
"""Parameters for querying an indicator's values across locations for a year.
This class defines the parameters needed to query climate indicator data
across different locations for a specific year.
Attributes:
indicator_column (str): The column name for the climate indicator
year (str): The year to query
model (str): The climate model to use (optional)
"""
indicator_column: str
year: str
model: str
def indicator_for_given_year_query(
table:str, params: IndicatorForGivenYearQueryParams
) -> str:
"""SQL Query to get the values of an indicator with their latitudes, longitudes and models for a given year
Args:
table (str): sql table of the indicator
params (IndicatorForGivenYearQueryParams): dictionarry with the required params for the query
Returns:
str: the sql query
"""
indicator_column = params.get("indicator_column")
year = params.get('year')
if year is None or indicator_column is None: # If one parameter is missing, returns an empty query
return ""
table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'"
sql_query = f"Select {indicator_column}, latitude, longitude, model\nFrom {table}\nWhere year = {year}"
return sql_query