File size: 4,082 Bytes
e57556f
 
26bb643
 
 
f576373
e57556f
d8a656a
 
 
 
 
 
f576373
d8a656a
 
f576373
d8a656a
 
 
 
f576373
e57556f
 
22ffcc6
 
e57556f
22ffcc6
f576373
e57556f
 
 
 
f576373
 
 
d8a656a
 
 
 
 
 
 
 
 
 
 
31a6f8f
f576373
 
31a6f8f
f576373
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31a6f8f
 
 
c2bf2c8
26bb643
 
 
c2bf2c8
f576373
31a6f8f
 
d8a656a
 
 
 
 
 
 
 
 
 
31a6f8f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c2bf2c8
26bb643
 
 
31a6f8f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import TypedDict
import duckdb
import pandas as pd

async def execute_sql_query(sql_query: str) -> pd.DataFrame:
    """Executes a SQL query on the DRIAS database and returns the results.
    
    This function connects to the DuckDB database containing DRIAS climate data
    and executes the provided SQL query. It handles the database connection and
    returns the results as a pandas DataFrame.
    
    Args:
        sql_query (str): The SQL query to execute
        
    Returns:
        pd.DataFrame: A DataFrame containing the query results
        
    Raises:
        duckdb.Error: If there is an error executing the SQL query
    """
    def _execute_query():
        # Execute the query
        con = duckdb.connect()
        results = con.sql(sql_query).fetchdf()
        # return fetched data
        return results

    # Run the query in a thread pool to avoid blocking
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as executor:
        return await loop.run_in_executor(executor, _execute_query)


class IndicatorPerYearAtLocationQueryParams(TypedDict, total=False):
    """Parameters for querying an indicator's values over time at a location.
    
    This class defines the parameters needed to query climate indicator data
    for a specific location over multiple years.
    
    Attributes:
        indicator_column (str): The column name for the climate indicator
        latitude (str): The latitude coordinate of the location
        longitude (str): The longitude coordinate of the location
        model (str): The climate model to use (optional)
    """
    indicator_column: str
    latitude: str
    longitude: str
    model: str


def indicator_per_year_at_location_query(
    table: str, params: IndicatorPerYearAtLocationQueryParams
) -> str:
    """SQL Query to get the evolution of an indicator per year at a certain location

    Args:
        table (str): sql table of the indicator
        params (IndicatorPerYearAtLocationQueryParams) : dictionary with the required params for the query

    Returns:
        str: the sql query
    """
    indicator_column = params.get("indicator_column")
    latitude = params.get("latitude")
    longitude = params.get("longitude")
    
    if indicator_column is None or latitude is None or longitude is None: # If one parameter is missing, returns an empty query
        return ""
    
    table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'"

    sql_query = f"SELECT year, {indicator_column}, model\nFROM {table}\nWHERE latitude = {latitude} \nAnd longitude = {longitude} \nOrder by Year"

    return sql_query

class IndicatorForGivenYearQueryParams(TypedDict, total=False):
    """Parameters for querying an indicator's values across locations for a year.
    
    This class defines the parameters needed to query climate indicator data
    across different locations for a specific year.
    
    Attributes:
        indicator_column (str): The column name for the climate indicator
        year (str): The year to query
        model (str): The climate model to use (optional)
    """
    indicator_column: str
    year: str
    model: str

def indicator_for_given_year_query(
        table:str, params: IndicatorForGivenYearQueryParams 
) -> str:
    """SQL Query to get the values of an indicator with their latitudes, longitudes and models for a given year

    Args:
        table (str): sql table of the indicator
        params (IndicatorForGivenYearQueryParams): dictionarry with the required params for the query

    Returns:
        str: the sql query
    """
    indicator_column = params.get("indicator_column")
    year = params.get('year')
    if year is None or indicator_column is None: # If one parameter is missing, returns an empty query
        return ""
    
    table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'"

    sql_query = f"Select {indicator_column}, latitude, longitude, model\nFrom {table}\nWhere year = {year}"
    return sql_query