Spaces:
Sleeping
Sleeping
| import os | |
| import datetime | |
| import time | |
| import requests | |
| import pandas as pd | |
| import json | |
| from geopy.geocoders import Nominatim | |
| def convert_date_to_unix(x): | |
| """ | |
| Convert datetime to unix time in milliseconds. | |
| """ | |
| dt_obj = datetime.datetime.strptime(str(x), '%Y-%m-%d %H:%M:%S') | |
| dt_obj = int(dt_obj.timestamp() * 1000) | |
| return dt_obj | |
| def get_city_coordinates(city_name: str): | |
| """ | |
| Takes city name and returns its latitude and longitude (rounded to 2 digits after dot). | |
| """ | |
| # Initialize Nominatim API (for getting lat and long of the city) | |
| geolocator = Nominatim(user_agent="MyApp") | |
| city = geolocator.geocode(city_name) | |
| latitude = round(city.latitude, 2) | |
| longitude = round(city.longitude, 2) | |
| return latitude, longitude | |
| ##################################### EEA | |
| def convert_to_daily(df, pollutant: str): | |
| """ | |
| Returns DataFrame where pollutant column is resampled to days and rounded. | |
| """ | |
| res_df = df.copy() | |
| # convert dates in 'time' column | |
| res_df["date"] = pd.to_datetime(res_df["date"]) | |
| # I want data daily, not hourly (mean per each day = 1 datarow per 1 day) | |
| res_df = res_df.set_index('date') | |
| res_df = res_df[pollutant].resample('1d').mean().reset_index() | |
| res_df[pollutant] = res_df[pollutant].fillna(res_df[pollutant].median()) | |
| res_df[pollutant] = res_df[pollutant].apply(lambda x: round(x, 0)) | |
| return res_df | |
| def find_fullest_csv(csv_links: list, year: str): | |
| candidates = [link for link in csv_links if str(year) in link] | |
| biggest_df = pd.read_csv(candidates[0]) | |
| for link in candidates[1:]: | |
| _df = pd.read_csv(link) | |
| if len(biggest_df) < len(_df): | |
| biggest_df = _df | |
| return biggest_df | |
| def get_air_quality_from_eea(city_name: str, | |
| pollutant: str, | |
| start_year: str, | |
| end_year: str): | |
| """ | |
| Takes city name, daterange and returns pandas DataFrame with daily air quality data. | |
| It parses data by 1-year batches, so please specify years, not dates. (example: "2014", "2022"...) | |
| EEA means European Environmental Agency. So it has data for Europe Union countries ONLY. | |
| """ | |
| start_of_cell = time.time() | |
| params = { | |
| 'CountryCode': '', | |
| 'CityName': city_name, | |
| 'Pollutant': pollutant.upper(), | |
| 'Year_from': start_year, | |
| 'Year_to': end_year, | |
| 'Station': '', | |
| 'Source': 'All', | |
| 'Samplingpoint': '', | |
| 'Output': 'TEXT', | |
| 'UpdateDate': '', | |
| 'TimeCoverage': 'Year' | |
| } | |
| # observations endpoint | |
| base_url = "https://fme.discomap.eea.europa.eu/fmedatastreaming/AirQualityDownload/AQData_Extract.fmw?" | |
| try: | |
| response = requests.get(base_url, params=params) | |
| except ConnectionError: | |
| response = requests.get(base_url, params=params) | |
| response.encoding = response.apparent_encoding | |
| csv_links = response.text.split("\r\n") | |
| res_df = pd.DataFrame() | |
| target_year = int(start_year) | |
| for year in range(int(start_year), int(end_year) + 1): | |
| try: | |
| # find the fullest, the biggest csv file with observations for this particular year | |
| _df = find_fullest_csv(csv_links, year) | |
| # append it to res_df | |
| res_df = pd.concat([res_df, _df]) | |
| except IndexError: | |
| print(f"!! Missing data for {year} for {city} city.") | |
| pass | |
| pollutant = pollutant.lower() | |
| if pollutant == "pm2.5": | |
| pollutant = "pm2_5" | |
| res_df = res_df.rename(columns={ | |
| 'DatetimeBegin': 'date', | |
| 'Concentration': pollutant | |
| }) | |
| # cut timezones info | |
| res_df['date'] = res_df['date'].apply(lambda x: x[:-6]) | |
| # convert dates in 'time' column | |
| res_df['date'] = pd.to_datetime(res_df['date']) | |
| res_df = convert_to_daily(res_df, pollutant) | |
| res_df['city_name'] = city_name | |
| res_df = res_df[['city_name', 'date', pollutant.lower()]] | |
| end_of_cell = time.time() | |
| print(f"Processed {pollutant.upper()} for {city_name} since {start_year} till {end_year}.") | |
| print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n") | |
| return res_df | |
| ##################################### USEPA | |
| city_code_dict = {} | |
| pollutant_dict = { | |
| 'CO': '42101', | |
| 'SO2': '42401', | |
| 'NO2': '42602', | |
| 'O3': '44201', | |
| 'PM10': '81102', | |
| 'PM2.5': '88101' | |
| } | |
| def get_city_code(city_name: str): | |
| "Encodes city name to be used later for data parsing using USEPA." | |
| if city_code_dict: | |
| city_full = [i for i in city_code_dict.keys() if city_name in i][0] | |
| return city_code_dict[city_full] | |
| else: | |
| params = { | |
| "email": "[email protected]", | |
| "key": "test" | |
| } | |
| response = requests.get("https://aqs.epa.gov/data/api/list/cbsas?", params) | |
| response_json = response.json() | |
| data = response_json["Data"] | |
| for item in data: | |
| city_code_dict[item['value_represented']] = item['code'] | |
| return get_city_code(city_name) | |
| def get_air_quality_from_usepa(city_name: str, | |
| pollutant: str, | |
| start_date: str, | |
| end_date: str): | |
| """ | |
| Takes city name, daterange and returns pandas DataFrame with daily air quality data. | |
| USEPA means United States Environmental Protection Agency. So it has data for US ONLY. | |
| """ | |
| start_of_cell = time.time() | |
| res_df = pd.DataFrame() | |
| for start_date_, end_date_ in make_date_intervals(start_date, end_date): | |
| params = { | |
| "email": "[email protected]", | |
| "key": "test", | |
| "param": pollutant_dict[pollutant.upper().replace("_", ".")], # encoded pollutant | |
| "bdate": start_date_, | |
| "edate": end_date_, | |
| "cbsa": get_city_code(city_name) # Core-based statistical area | |
| } | |
| # observations endpoint | |
| base_url = "https://aqs.epa.gov/data/api/dailyData/byCBSA?" | |
| response = requests.get(base_url, params=params) | |
| response_json = response.json() | |
| df_ = pd.DataFrame(response_json["Data"]) | |
| pollutant = pollutant.lower() | |
| if pollutant == "pm2.5": | |
| pollutant = "pm2_5" | |
| df_ = df_.rename(columns={ | |
| 'date_local': 'date', | |
| 'arithmetic_mean': pollutant | |
| }) | |
| # convert dates in 'date' column | |
| df_['date'] = pd.to_datetime(df_['date']) | |
| df_['city_name'] = city_name | |
| df_ = df_[['city_name', 'date', pollutant]] | |
| res_df = pd.concat([res_df, df_]) | |
| # there are duplicated rows (several records for the same day and station). get rid of it. | |
| res_df = res_df.groupby(['date', 'city_name'], as_index=False)[pollutant].mean() | |
| res_df[pollutant] = round(res_df[pollutant], 1) | |
| end_of_cell = time.time() | |
| print(f"Processed {pollutant.upper()} for {city_name} since {start_date} till {end_date}.") | |
| print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n") | |
| return res_df | |
| def make_date_intervals(start_date, end_date): | |
| start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d') | |
| end_dt = datetime.datetime.strptime(end_date, '%Y-%m-%d') | |
| date_intervals = [] | |
| for year in range(start_dt.year, end_dt.year + 1): | |
| year_start = datetime.datetime(year, 1, 1) | |
| year_end = datetime.datetime(year, 12, 31) | |
| interval_start = max(start_dt, year_start) | |
| interval_end = min(end_dt, year_end) | |
| if interval_start < interval_end: | |
| date_intervals.append((interval_start.strftime('%Y%m%d'), interval_end.strftime('%Y%m%d'))) | |
| return date_intervals | |
| ##################################### Weather Open Meteo | |
| def get_weather_data_from_open_meteo(city_name: str, | |
| start_date: str, | |
| end_date: str, | |
| coordinates: list = None, | |
| forecast: bool = False): | |
| """ | |
| Takes [city name OR coordinates] and returns pandas DataFrame with weather data. | |
| Examples of arguments: | |
| coordinates=(47.755, -122.2806), start_date="2023-01-01" | |
| """ | |
| start_of_cell = time.time() | |
| if coordinates: | |
| latitude, longitude = coordinates | |
| else: | |
| latitude, longitude = get_city_coordinates(city_name=city_name) | |
| params = { | |
| 'latitude': latitude, | |
| 'longitude': longitude, | |
| 'daily': ["temperature_2m_max", "temperature_2m_min", | |
| "precipitation_sum", "rain_sum", "snowfall_sum", | |
| "precipitation_hours", "windspeed_10m_max", | |
| "windgusts_10m_max", "winddirection_10m_dominant"], | |
| 'start_date': start_date, | |
| 'end_date': end_date, | |
| 'timezone': "Europe/London" | |
| } | |
| if forecast: | |
| # historical forecast endpoint | |
| base_url = 'https://api.open-meteo.com/v1/forecast' | |
| else: | |
| # historical observations endpoint | |
| base_url = 'https://archive-api.open-meteo.com/v1/archive' | |
| try: | |
| response = requests.get(base_url, params=params) | |
| except ConnectionError: | |
| response = requests.get(base_url, params=params) | |
| response_json = response.json() | |
| res_df = pd.DataFrame(response_json["daily"]) | |
| res_df["city_name"] = city_name | |
| # rename columns | |
| res_df = res_df.rename(columns={ | |
| "time": "date", | |
| "temperature_2m_max": "temperature_max", | |
| "temperature_2m_min": "temperature_min", | |
| "windspeed_10m_max": "wind_speed_max", | |
| "winddirection_10m_dominant": "wind_direction_dominant", | |
| "windgusts_10m_max": "wind_gusts_max" | |
| }) | |
| # change columns order | |
| res_df = res_df[ | |
| ['city_name', 'date', 'temperature_max', 'temperature_min', | |
| 'precipitation_sum', 'rain_sum', 'snowfall_sum', | |
| 'precipitation_hours', 'wind_speed_max', | |
| 'wind_gusts_max', 'wind_direction_dominant'] | |
| ] | |
| # convert dates in 'date' column | |
| res_df["date"] = pd.to_datetime(res_df["date"]) | |
| end_of_cell = time.time() | |
| print(f"Parsed weather for {city_name} since {start_date} till {end_date}.") | |
| print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n") | |
| return res_df | |
| ##################################### Air Quality data from Open Meteo | |
| def get_aqi_data_from_open_meteo(city_name: str, | |
| start_date: str, | |
| end_date: str, | |
| coordinates: list = None, | |
| pollutant: str = "pm2_5"): | |
| """ | |
| Takes [city name OR coordinates] and returns pandas DataFrame with AQI data. | |
| Examples of arguments: | |
| ... | |
| coordinates=(47.755, -122.2806), | |
| start_date="2023-01-01", | |
| pollutant="no2" | |
| ... | |
| """ | |
| start_of_cell = time.time() | |
| if coordinates: | |
| latitude, longitude = coordinates | |
| else: | |
| latitude, longitude = get_city_coordinates(city_name=city_name) | |
| pollutant = pollutant.lower() | |
| if pollutant == "pm2.5": | |
| pollutant = "pm2_5" | |
| # make it work with both "no2" and "nitrogen_dioxide" passed. | |
| if pollutant == "no2": | |
| pollutant = "nitrogen_dioxide" | |
| params = { | |
| 'latitude': latitude, | |
| 'longitude': longitude, | |
| 'hourly': [pollutant], | |
| 'start_date': start_date, | |
| 'end_date': end_date, | |
| 'timezone': "Europe/London" | |
| } | |
| # base endpoint | |
| base_url = "https://air-quality-api.open-meteo.com/v1/air-quality" | |
| try: | |
| response = requests.get(base_url, params=params) | |
| except ConnectionError: | |
| response = requests.get(base_url, params=params) | |
| response_json = response.json() | |
| res_df = pd.DataFrame(response_json["hourly"]) | |
| # convert dates | |
| res_df["time"] = pd.to_datetime(res_df["time"]) | |
| # resample to days | |
| res_df = res_df.groupby(res_df['time'].dt.date).mean(numeric_only=True).reset_index() | |
| res_df[pollutant] = round(res_df[pollutant], 1) | |
| # rename columns | |
| res_df = res_df.rename(columns={ | |
| "time": "date" | |
| }) | |
| res_df["city_name"] = city_name | |
| # change columns order | |
| res_df = res_df[ | |
| ['city_name', 'date', pollutant] | |
| ] | |
| end_of_cell = time.time() | |
| print(f"Processed {pollutant.upper()} for {city_name} since {start_date} till {end_date}.") | |
| print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n") | |
| return res_df | |