Spaces:
Runtime error
Runtime error
# app.py | |
import gradio as gr | |
import requests | |
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
import matplotlib.pyplot as plt | |
from matplotlib.gridspec import GridSpec | |
import json | |
import time | |
from bs4 import BeautifulSoup | |
# Selenium-related imports | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.chrome.service import Service | |
from webdriver_manager.chrome import ChromeDriverManager | |
def get_raw_data(station_id): | |
""" | |
Get raw data from the NWS API. | |
""" | |
headers = { | |
'User-Agent': '(Weather Data Viewer, [email protected])', | |
'Accept': 'application/json' | |
} | |
# Calculate date range for last 3 days | |
end_time = datetime.utcnow() | |
start_time = end_time - timedelta(hours=72) | |
params = { | |
'start': start_time.isoformat() + 'Z', | |
'end': end_time.isoformat() + 'Z' | |
} | |
url = f"https://api.weather.gov/stations/{station_id}/observations" | |
try: | |
print("\nFetching observations...") | |
print(f"URL: {url}") | |
print(f"Time range: {start_time} to {end_time}") | |
response = requests.get(url, headers=headers, params=params) | |
print(f"Response status: {response.status_code}") | |
if response.status_code != 200: | |
print(f"Response content: {response.text}") | |
response.raise_for_status() | |
data = response.json() | |
if 'features' in data: | |
print(f"\nNumber of observations: {len(data['features'])}") | |
if len(data['features']) > 0: | |
print("\nFirst observation properties:") | |
print(json.dumps(data['features'][0]['properties'], indent=2)) | |
keys = set() | |
for feature in data['features']: | |
keys.update(feature['properties'].keys()) | |
print("\nAll available property keys:") | |
print(sorted(list(keys))) | |
return data | |
except Exception as e: | |
print(f"Error fetching data: {e}") | |
import traceback | |
traceback.print_exc() | |
return None | |
def scrape_snow_depth(): | |
""" | |
Uses Selenium with a headless browser to load the weather.gov timeseries page, | |
waits until an element containing 'Snow Depth' is present, then extracts the table data. | |
Returns a DataFrame with columns "timestamp" and "snowDepth". | |
""" | |
url = ("https://www.weather.gov/wrh/timeseries?" | |
"site=YCTIM&hours=720&units=english&chart=on&headers=on&" | |
"obs=tabular&hourly=false&pview=standard&font=12&plot=") | |
# Set up headless Chrome options | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
# Tell Selenium where Chromium is located | |
chrome_options.binary_location = "/usr/bin/chromium-browser" | |
# Initialize Chrome using the Service object (Selenium 4+) | |
service = Service(ChromeDriverManager().install()) | |
driver = webdriver.Chrome(service=service, options=chrome_options) | |
driver.get(url) | |
try: | |
# Wait up to 30 seconds for any element containing the text "Snow Depth" to appear | |
WebDriverWait(driver, 30).until( | |
EC.presence_of_element_located((By.XPATH, "//*[contains(text(), 'Snow Depth')]")) | |
) | |
except Exception as e: | |
print("Timeout waiting for 'Snow Depth' element to appear:", e) | |
# Allow extra time for dynamic content to load | |
time.sleep(5) | |
page_source = driver.page_source | |
driver.quit() | |
soup = BeautifulSoup(page_source, 'html.parser') | |
# Look through all tables for one that contains "Snow Depth" in its text | |
tables = soup.find_all("table") | |
target_table = None | |
for table in tables: | |
table_text = table.get_text() | |
print("Found table text snippet:", table_text[:100]) | |
if "Snow Depth" in table_text: | |
target_table = table | |
break | |
if target_table is None: | |
print("No table with 'Snow Depth' found in the page.") | |
return pd.DataFrame() | |
# Look for header cells in the table | |
header_row = target_table.find("tr") | |
if not header_row: | |
print("No header row found in the table.") | |
return pd.DataFrame() | |
headers = [th.get_text(strip=True) for th in header_row.find_all("th")] | |
print("Table headers found:", headers) | |
# Identify column indices (using case-insensitive match) | |
time_index = None | |
snow_index = None | |
for i, header in enumerate(headers): | |
if "time" in header.lower(): | |
time_index = i | |
if "snow" in header.lower(): | |
snow_index = i | |
if time_index is None or snow_index is None: | |
print("Required columns ('Time' and 'Snow Depth') not found in the table headers.") | |
return pd.DataFrame() | |
# Extract rows (skip header) | |
data = [] | |
rows = target_table.find_all("tr")[1:] | |
for row in rows: | |
cells = row.find_all("td") | |
if len(cells) <= max(time_index, snow_index): | |
continue | |
time_text = cells[time_index].get_text(strip=True) | |
snow_text = cells[snow_index].get_text(strip=True) | |
data.append((time_text, snow_text)) | |
df = pd.DataFrame(data, columns=["Time", "Snow Depth"]) | |
# Convert the "Time" column to datetime | |
df["Time"] = pd.to_datetime(df["Time"], errors="coerce") | |
# Convert "Snow Depth" to numeric (in inches) | |
df["Snow Depth"] = pd.to_numeric(df["Snow Depth"], errors="coerce") | |
print("Scraped snow depth data:") | |
print(df.head()) | |
# Rename columns to match API data | |
return df.rename(columns={"Time": "timestamp", "Snow Depth": "snowDepth"}) | |
def parse_raw_data(data): | |
""" | |
Parse the raw JSON API data into a DataFrame. | |
""" | |
if not data or 'features' not in data: | |
return None | |
records = [] | |
for feature in data['features']: | |
props = feature['properties'] | |
# Extract any snow-related fields if present | |
snow_fields = {k: v for k, v in props.items() if 'snow' in k.lower()} | |
if snow_fields: | |
print("\nFound snow-related fields:") | |
for k, v in snow_fields.items(): | |
print(f"{k}: {v}") | |
record = { | |
'timestamp': props['timestamp'], | |
'temperature': props.get('temperature', {}).get('value'), | |
'wind_speed': props.get('windSpeed', {}).get('value'), | |
'wind_direction': props.get('windDirection', {}).get('value') | |
} | |
# Add any snow fields | |
for k, v in snow_fields.items(): | |
if isinstance(v, dict) and 'value' in v: | |
record[k] = v['value'] | |
else: | |
record[k] = v | |
records.append(record) | |
df = pd.DataFrame(records) | |
print("\nDataFrame columns from API:") | |
print(df.columns.tolist()) | |
print("\nSample of raw API data:") | |
print(df.head()) | |
return df | |
def process_weather_data(df): | |
""" | |
Process the weather DataFrame. | |
""" | |
if df is None or df.empty: | |
return None | |
# Convert timestamp column to datetime | |
df['timestamp'] = pd.to_datetime(df['timestamp']) | |
df['date'] = df['timestamp'].dt.date | |
# Convert temperature from Celsius to Fahrenheit if available | |
if df['temperature'].notna().all(): | |
df['temperature'] = (df['temperature'] * 9/5) + 32 | |
# Convert wind speed from km/h to mph if available (original unit is km/h) | |
if df['wind_speed'].notna().all(): | |
df['wind_speed'] = df['wind_speed'] * 0.621371 | |
return df | |
def create_wind_rose(ax, data, title): | |
""" | |
Create a wind rose subplot. | |
""" | |
if data.empty or data['wind_direction'].isna().all() or data['wind_speed'].isna().all(): | |
ax.text(0.5, 0.5, 'No wind data available', | |
horizontalalignment='center', | |
verticalalignment='center', | |
transform=ax.transAxes) | |
ax.set_title(title) | |
return | |
plot_data = data.copy() | |
direction_bins = np.arange(0, 361, 45) | |
directions = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW'] | |
mask = plot_data['wind_direction'].notna() & plot_data['wind_speed'].notna() | |
plot_data = plot_data[mask] | |
if plot_data.empty: | |
ax.text(0.5, 0.5, 'No valid wind data', | |
horizontalalignment='center', | |
verticalalignment='center', | |
transform=ax.transAxes) | |
ax.set_title(title) | |
return | |
plot_data.loc[:, 'direction_bin'] = pd.cut(plot_data['wind_direction'], | |
bins=direction_bins, | |
labels=directions, | |
include_lowest=True) | |
wind_stats = plot_data.groupby('direction_bin', observed=True)['wind_speed'].mean() | |
all_directions = pd.Series(0.0, index=directions) | |
wind_stats = wind_stats.combine_first(all_directions) | |
angles = np.linspace(0, 2*np.pi, len(directions), endpoint=False) | |
values = [wind_stats[d] for d in directions] | |
if any(v > 0 for v in values): | |
ax.bar(angles, values, width=0.5, alpha=0.6) | |
ax.set_xticks(angles) | |
ax.set_xticklabels(directions) | |
else: | |
ax.text(0.5, 0.5, 'No significant wind', | |
horizontalalignment='center', | |
verticalalignment='center', | |
transform=ax.transAxes) | |
ax.set_title(title) | |
def create_visualizations(df): | |
""" | |
Create static visualizations using matplotlib. | |
Plots temperature, wind speed, and snow depth. | |
""" | |
fig = plt.figure(figsize=(20, 24)) | |
gs = GridSpec(5, 2, figure=fig) | |
ax1 = fig.add_subplot(gs[0, :]) | |
ax2 = fig.add_subplot(gs[1, :]) | |
ax3 = fig.add_subplot(gs[2, :]) | |
if not df['temperature'].isna().all(): | |
ax1.plot(df['timestamp'], df['temperature'], linewidth=2) | |
ax1.set_title('Temperature Over Time') | |
ax1.set_ylabel('Temperature (°F)') | |
ax1.set_xlabel('') | |
ax1.grid(True) | |
if not df['wind_speed'].isna().all(): | |
ax2.plot(df['timestamp'], df['wind_speed'], linewidth=2) | |
ax2.set_title('Wind Speed Over Time') | |
ax2.set_ylabel('Wind Speed (mph)') | |
ax2.set_xlabel('') | |
ax2.grid(True) | |
if 'snowDepth' in df.columns and not df['snowDepth'].isna().all(): | |
ax3.plot(df['timestamp'], df['snowDepth'], linewidth=2) | |
ax3.set_ylim(0, 80) | |
else: | |
ax3.text(0.5, 0.5, 'No snow depth data available', | |
horizontalalignment='center', | |
verticalalignment='center', | |
transform=ax3.transAxes) | |
ax3.set_title('Snow Depth') | |
ax3.set_ylabel('Snow Depth (inches)') | |
ax3.set_xlabel('') | |
ax3.grid(True) | |
for ax in [ax1, ax2, ax3]: | |
ax.tick_params(axis='x', rotation=45) | |
ax.xaxis.set_major_formatter(plt.matplotlib.dates.DateFormatter('%Y-%m-%d %H:%M')) | |
dates = sorted(df['date'].unique()) | |
for i, date in enumerate(dates): | |
if i < 2: | |
ax = fig.add_subplot(gs[4, i], projection='polar') | |
day_data = df[df['date'] == date].copy() | |
create_wind_rose(ax, day_data, pd.to_datetime(date).strftime('%Y-%m-%d')) | |
plt.tight_layout() | |
return fig | |
def get_weather_data(station_id, hours): | |
""" | |
Main function to get and process weather data. | |
Combines API data and scraped snow depth data. | |
""" | |
try: | |
raw_data = get_raw_data(station_id) | |
if raw_data is None: | |
return None, "Failed to fetch data from API" | |
df = parse_raw_data(raw_data) | |
if df is None: | |
return None, "Failed to parse API data" | |
df = process_weather_data(df) | |
if df is None: | |
return None, "Failed to process API data" | |
# Attempt to scrape snow depth data using Selenium | |
snow_df = scrape_snow_depth() | |
if not snow_df.empty: | |
df = df.sort_values('timestamp') | |
snow_df = snow_df.sort_values('timestamp') | |
df = pd.merge_asof(df, snow_df, on='timestamp', tolerance=pd.Timedelta('30min'), direction='nearest') | |
print("\nProcessed combined data sample:") | |
print(df.head()) | |
return df, None | |
except Exception as e: | |
return None, f"Error: {str(e)}" | |
def fetch_and_display(station_id, hours): | |
""" | |
Fetch data and create visualization. | |
""" | |
df, error = get_weather_data(station_id, hours) | |
if error: | |
return None, error | |
if df is not None and not df.empty: | |
fig = create_visualizations(df) | |
return fig, "Data fetched successfully!" | |
return None, "No data available for the specified parameters." | |
# Create Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# Weather Data Viewer") | |
gr.Markdown("Displays temperature, wind speed, and snow depth from NWS stations.") | |
with gr.Row(): | |
station_id = gr.Textbox(label="Station ID", value="YCTIM") | |
hours = gr.Slider(minimum=24, maximum=168, value=72, label="Hours of Data", step=24) | |
fetch_btn = gr.Button("Fetch Data") | |
plot_output = gr.Plot() | |
message = gr.Textbox(label="Status") | |
fetch_btn.click( | |
fn=fetch_and_display, | |
inputs=[station_id, hours], | |
outputs=[plot_output, message] | |
) | |
# Launch the app | |
demo.launch() |