nakas's picture
Create app.py
24539ce verified
# app.py
import gradio as gr
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import json
import time
from bs4 import BeautifulSoup
# Selenium-related imports
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
def get_raw_data(station_id):
"""
Get raw data from the NWS API.
"""
headers = {
'User-Agent': '(Weather Data Viewer, [email protected])',
'Accept': 'application/json'
}
# Calculate date range for last 3 days
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=72)
params = {
'start': start_time.isoformat() + 'Z',
'end': end_time.isoformat() + 'Z'
}
url = f"https://api.weather.gov/stations/{station_id}/observations"
try:
print("\nFetching observations...")
print(f"URL: {url}")
print(f"Time range: {start_time} to {end_time}")
response = requests.get(url, headers=headers, params=params)
print(f"Response status: {response.status_code}")
if response.status_code != 200:
print(f"Response content: {response.text}")
response.raise_for_status()
data = response.json()
if 'features' in data:
print(f"\nNumber of observations: {len(data['features'])}")
if len(data['features']) > 0:
print("\nFirst observation properties:")
print(json.dumps(data['features'][0]['properties'], indent=2))
keys = set()
for feature in data['features']:
keys.update(feature['properties'].keys())
print("\nAll available property keys:")
print(sorted(list(keys)))
return data
except Exception as e:
print(f"Error fetching data: {e}")
import traceback
traceback.print_exc()
return None
def scrape_snow_depth():
"""
Uses Selenium with a headless browser to load the weather.gov timeseries page,
waits until an element containing 'Snow Depth' is present, then extracts the table data.
Returns a DataFrame with columns "timestamp" and "snowDepth".
"""
url = ("https://www.weather.gov/wrh/timeseries?"
"site=YCTIM&hours=720&units=english&chart=on&headers=on&"
"obs=tabular&hourly=false&pview=standard&font=12&plot=")
# Set up headless Chrome options
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
# Tell Selenium where Chromium is located
chrome_options.binary_location = "/usr/bin/chromium-browser"
# Initialize Chrome using the Service object (Selenium 4+)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.get(url)
try:
# Wait up to 30 seconds for any element containing the text "Snow Depth" to appear
WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.XPATH, "//*[contains(text(), 'Snow Depth')]"))
)
except Exception as e:
print("Timeout waiting for 'Snow Depth' element to appear:", e)
# Allow extra time for dynamic content to load
time.sleep(5)
page_source = driver.page_source
driver.quit()
soup = BeautifulSoup(page_source, 'html.parser')
# Look through all tables for one that contains "Snow Depth" in its text
tables = soup.find_all("table")
target_table = None
for table in tables:
table_text = table.get_text()
print("Found table text snippet:", table_text[:100])
if "Snow Depth" in table_text:
target_table = table
break
if target_table is None:
print("No table with 'Snow Depth' found in the page.")
return pd.DataFrame()
# Look for header cells in the table
header_row = target_table.find("tr")
if not header_row:
print("No header row found in the table.")
return pd.DataFrame()
headers = [th.get_text(strip=True) for th in header_row.find_all("th")]
print("Table headers found:", headers)
# Identify column indices (using case-insensitive match)
time_index = None
snow_index = None
for i, header in enumerate(headers):
if "time" in header.lower():
time_index = i
if "snow" in header.lower():
snow_index = i
if time_index is None or snow_index is None:
print("Required columns ('Time' and 'Snow Depth') not found in the table headers.")
return pd.DataFrame()
# Extract rows (skip header)
data = []
rows = target_table.find_all("tr")[1:]
for row in rows:
cells = row.find_all("td")
if len(cells) <= max(time_index, snow_index):
continue
time_text = cells[time_index].get_text(strip=True)
snow_text = cells[snow_index].get_text(strip=True)
data.append((time_text, snow_text))
df = pd.DataFrame(data, columns=["Time", "Snow Depth"])
# Convert the "Time" column to datetime
df["Time"] = pd.to_datetime(df["Time"], errors="coerce")
# Convert "Snow Depth" to numeric (in inches)
df["Snow Depth"] = pd.to_numeric(df["Snow Depth"], errors="coerce")
print("Scraped snow depth data:")
print(df.head())
# Rename columns to match API data
return df.rename(columns={"Time": "timestamp", "Snow Depth": "snowDepth"})
def parse_raw_data(data):
"""
Parse the raw JSON API data into a DataFrame.
"""
if not data or 'features' not in data:
return None
records = []
for feature in data['features']:
props = feature['properties']
# Extract any snow-related fields if present
snow_fields = {k: v for k, v in props.items() if 'snow' in k.lower()}
if snow_fields:
print("\nFound snow-related fields:")
for k, v in snow_fields.items():
print(f"{k}: {v}")
record = {
'timestamp': props['timestamp'],
'temperature': props.get('temperature', {}).get('value'),
'wind_speed': props.get('windSpeed', {}).get('value'),
'wind_direction': props.get('windDirection', {}).get('value')
}
# Add any snow fields
for k, v in snow_fields.items():
if isinstance(v, dict) and 'value' in v:
record[k] = v['value']
else:
record[k] = v
records.append(record)
df = pd.DataFrame(records)
print("\nDataFrame columns from API:")
print(df.columns.tolist())
print("\nSample of raw API data:")
print(df.head())
return df
def process_weather_data(df):
"""
Process the weather DataFrame.
"""
if df is None or df.empty:
return None
# Convert timestamp column to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['date'] = df['timestamp'].dt.date
# Convert temperature from Celsius to Fahrenheit if available
if df['temperature'].notna().all():
df['temperature'] = (df['temperature'] * 9/5) + 32
# Convert wind speed from km/h to mph if available (original unit is km/h)
if df['wind_speed'].notna().all():
df['wind_speed'] = df['wind_speed'] * 0.621371
return df
def create_wind_rose(ax, data, title):
"""
Create a wind rose subplot.
"""
if data.empty or data['wind_direction'].isna().all() or data['wind_speed'].isna().all():
ax.text(0.5, 0.5, 'No wind data available',
horizontalalignment='center',
verticalalignment='center',
transform=ax.transAxes)
ax.set_title(title)
return
plot_data = data.copy()
direction_bins = np.arange(0, 361, 45)
directions = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
mask = plot_data['wind_direction'].notna() & plot_data['wind_speed'].notna()
plot_data = plot_data[mask]
if plot_data.empty:
ax.text(0.5, 0.5, 'No valid wind data',
horizontalalignment='center',
verticalalignment='center',
transform=ax.transAxes)
ax.set_title(title)
return
plot_data.loc[:, 'direction_bin'] = pd.cut(plot_data['wind_direction'],
bins=direction_bins,
labels=directions,
include_lowest=True)
wind_stats = plot_data.groupby('direction_bin', observed=True)['wind_speed'].mean()
all_directions = pd.Series(0.0, index=directions)
wind_stats = wind_stats.combine_first(all_directions)
angles = np.linspace(0, 2*np.pi, len(directions), endpoint=False)
values = [wind_stats[d] for d in directions]
if any(v > 0 for v in values):
ax.bar(angles, values, width=0.5, alpha=0.6)
ax.set_xticks(angles)
ax.set_xticklabels(directions)
else:
ax.text(0.5, 0.5, 'No significant wind',
horizontalalignment='center',
verticalalignment='center',
transform=ax.transAxes)
ax.set_title(title)
def create_visualizations(df):
"""
Create static visualizations using matplotlib.
Plots temperature, wind speed, and snow depth.
"""
fig = plt.figure(figsize=(20, 24))
gs = GridSpec(5, 2, figure=fig)
ax1 = fig.add_subplot(gs[0, :])
ax2 = fig.add_subplot(gs[1, :])
ax3 = fig.add_subplot(gs[2, :])
if not df['temperature'].isna().all():
ax1.plot(df['timestamp'], df['temperature'], linewidth=2)
ax1.set_title('Temperature Over Time')
ax1.set_ylabel('Temperature (°F)')
ax1.set_xlabel('')
ax1.grid(True)
if not df['wind_speed'].isna().all():
ax2.plot(df['timestamp'], df['wind_speed'], linewidth=2)
ax2.set_title('Wind Speed Over Time')
ax2.set_ylabel('Wind Speed (mph)')
ax2.set_xlabel('')
ax2.grid(True)
if 'snowDepth' in df.columns and not df['snowDepth'].isna().all():
ax3.plot(df['timestamp'], df['snowDepth'], linewidth=2)
ax3.set_ylim(0, 80)
else:
ax3.text(0.5, 0.5, 'No snow depth data available',
horizontalalignment='center',
verticalalignment='center',
transform=ax3.transAxes)
ax3.set_title('Snow Depth')
ax3.set_ylabel('Snow Depth (inches)')
ax3.set_xlabel('')
ax3.grid(True)
for ax in [ax1, ax2, ax3]:
ax.tick_params(axis='x', rotation=45)
ax.xaxis.set_major_formatter(plt.matplotlib.dates.DateFormatter('%Y-%m-%d %H:%M'))
dates = sorted(df['date'].unique())
for i, date in enumerate(dates):
if i < 2:
ax = fig.add_subplot(gs[4, i], projection='polar')
day_data = df[df['date'] == date].copy()
create_wind_rose(ax, day_data, pd.to_datetime(date).strftime('%Y-%m-%d'))
plt.tight_layout()
return fig
def get_weather_data(station_id, hours):
"""
Main function to get and process weather data.
Combines API data and scraped snow depth data.
"""
try:
raw_data = get_raw_data(station_id)
if raw_data is None:
return None, "Failed to fetch data from API"
df = parse_raw_data(raw_data)
if df is None:
return None, "Failed to parse API data"
df = process_weather_data(df)
if df is None:
return None, "Failed to process API data"
# Attempt to scrape snow depth data using Selenium
snow_df = scrape_snow_depth()
if not snow_df.empty:
df = df.sort_values('timestamp')
snow_df = snow_df.sort_values('timestamp')
df = pd.merge_asof(df, snow_df, on='timestamp', tolerance=pd.Timedelta('30min'), direction='nearest')
print("\nProcessed combined data sample:")
print(df.head())
return df, None
except Exception as e:
return None, f"Error: {str(e)}"
def fetch_and_display(station_id, hours):
"""
Fetch data and create visualization.
"""
df, error = get_weather_data(station_id, hours)
if error:
return None, error
if df is not None and not df.empty:
fig = create_visualizations(df)
return fig, "Data fetched successfully!"
return None, "No data available for the specified parameters."
# Create Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Weather Data Viewer")
gr.Markdown("Displays temperature, wind speed, and snow depth from NWS stations.")
with gr.Row():
station_id = gr.Textbox(label="Station ID", value="YCTIM")
hours = gr.Slider(minimum=24, maximum=168, value=72, label="Hours of Data", step=24)
fetch_btn = gr.Button("Fetch Data")
plot_output = gr.Plot()
message = gr.Textbox(label="Status")
fetch_btn.click(
fn=fetch_and_display,
inputs=[station_id, hours],
outputs=[plot_output, message]
)
# Launch the app
demo.launch()