File size: 4,082 Bytes
e57556f 26bb643 f576373 e57556f d8a656a f576373 d8a656a f576373 d8a656a f576373 e57556f 22ffcc6 e57556f 22ffcc6 f576373 e57556f f576373 d8a656a 31a6f8f f576373 31a6f8f f576373 31a6f8f c2bf2c8 26bb643 c2bf2c8 f576373 31a6f8f d8a656a 31a6f8f c2bf2c8 26bb643 31a6f8f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import TypedDict
import duckdb
import pandas as pd
async def execute_sql_query(sql_query: str) -> pd.DataFrame:
"""Executes a SQL query on the DRIAS database and returns the results.
This function connects to the DuckDB database containing DRIAS climate data
and executes the provided SQL query. It handles the database connection and
returns the results as a pandas DataFrame.
Args:
sql_query (str): The SQL query to execute
Returns:
pd.DataFrame: A DataFrame containing the query results
Raises:
duckdb.Error: If there is an error executing the SQL query
"""
def _execute_query():
# Execute the query
con = duckdb.connect()
results = con.sql(sql_query).fetchdf()
# return fetched data
return results
# Run the query in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
return await loop.run_in_executor(executor, _execute_query)
class IndicatorPerYearAtLocationQueryParams(TypedDict, total=False):
"""Parameters for querying an indicator's values over time at a location.
This class defines the parameters needed to query climate indicator data
for a specific location over multiple years.
Attributes:
indicator_column (str): The column name for the climate indicator
latitude (str): The latitude coordinate of the location
longitude (str): The longitude coordinate of the location
model (str): The climate model to use (optional)
"""
indicator_column: str
latitude: str
longitude: str
model: str
def indicator_per_year_at_location_query(
table: str, params: IndicatorPerYearAtLocationQueryParams
) -> str:
"""SQL Query to get the evolution of an indicator per year at a certain location
Args:
table (str): sql table of the indicator
params (IndicatorPerYearAtLocationQueryParams) : dictionary with the required params for the query
Returns:
str: the sql query
"""
indicator_column = params.get("indicator_column")
latitude = params.get("latitude")
longitude = params.get("longitude")
if indicator_column is None or latitude is None or longitude is None: # If one parameter is missing, returns an empty query
return ""
table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'"
sql_query = f"SELECT year, {indicator_column}, model\nFROM {table}\nWHERE latitude = {latitude} \nAnd longitude = {longitude} \nOrder by Year"
return sql_query
class IndicatorForGivenYearQueryParams(TypedDict, total=False):
"""Parameters for querying an indicator's values across locations for a year.
This class defines the parameters needed to query climate indicator data
across different locations for a specific year.
Attributes:
indicator_column (str): The column name for the climate indicator
year (str): The year to query
model (str): The climate model to use (optional)
"""
indicator_column: str
year: str
model: str
def indicator_for_given_year_query(
table:str, params: IndicatorForGivenYearQueryParams
) -> str:
"""SQL Query to get the values of an indicator with their latitudes, longitudes and models for a given year
Args:
table (str): sql table of the indicator
params (IndicatorForGivenYearQueryParams): dictionarry with the required params for the query
Returns:
str: the sql query
"""
indicator_column = params.get("indicator_column")
year = params.get('year')
if year is None or indicator_column is None: # If one parameter is missing, returns an empty query
return ""
table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'"
sql_query = f"Select {indicator_column}, latitude, longitude, model\nFrom {table}\nWhere year = {year}"
return sql_query |