import asyncio from concurrent.futures import ThreadPoolExecutor from typing import TypedDict import duckdb import pandas as pd async def execute_sql_query(sql_query: str) -> pd.DataFrame: """Executes a SQL query on the DRIAS database and returns the results. This function connects to the DuckDB database containing DRIAS climate data and executes the provided SQL query. It handles the database connection and returns the results as a pandas DataFrame. Args: sql_query (str): The SQL query to execute Returns: pd.DataFrame: A DataFrame containing the query results Raises: duckdb.Error: If there is an error executing the SQL query """ def _execute_query(): # Execute the query con = duckdb.connect() results = con.sql(sql_query).fetchdf() # return fetched data return results # Run the query in a thread pool to avoid blocking loop = asyncio.get_event_loop() with ThreadPoolExecutor() as executor: return await loop.run_in_executor(executor, _execute_query) class IndicatorPerYearAtLocationQueryParams(TypedDict, total=False): """Parameters for querying an indicator's values over time at a location. This class defines the parameters needed to query climate indicator data for a specific location over multiple years. Attributes: indicator_column (str): The column name for the climate indicator latitude (str): The latitude coordinate of the location longitude (str): The longitude coordinate of the location model (str): The climate model to use (optional) """ indicator_column: str latitude: str longitude: str model: str def indicator_per_year_at_location_query( table: str, params: IndicatorPerYearAtLocationQueryParams ) -> str: """SQL Query to get the evolution of an indicator per year at a certain location Args: table (str): sql table of the indicator params (IndicatorPerYearAtLocationQueryParams) : dictionary with the required params for the query Returns: str: the sql query """ indicator_column = params.get("indicator_column") latitude = params.get("latitude") longitude = params.get("longitude") if indicator_column is None or latitude is None or longitude is None: # If one parameter is missing, returns an empty query return "" table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'" sql_query = f"SELECT year, {indicator_column}, model\nFROM {table}\nWHERE latitude = {latitude} \nAnd longitude = {longitude} \nOrder by Year" return sql_query class IndicatorForGivenYearQueryParams(TypedDict, total=False): """Parameters for querying an indicator's values across locations for a year. This class defines the parameters needed to query climate indicator data across different locations for a specific year. Attributes: indicator_column (str): The column name for the climate indicator year (str): The year to query model (str): The climate model to use (optional) """ indicator_column: str year: str model: str def indicator_for_given_year_query( table:str, params: IndicatorForGivenYearQueryParams ) -> str: """SQL Query to get the values of an indicator with their latitudes, longitudes and models for a given year Args: table (str): sql table of the indicator params (IndicatorForGivenYearQueryParams): dictionarry with the required params for the query Returns: str: the sql query """ indicator_column = params.get("indicator_column") year = params.get('year') if year is None or indicator_column is None: # If one parameter is missing, returns an empty query return "" table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'" sql_query = f"Select {indicator_column}, latitude, longitude, model\nFrom {table}\nWhere year = {year}" return sql_query