Spaces:
Runtime error
Runtime error
| # app.py | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| import matplotlib.pyplot as plt | |
| from matplotlib.gridspec import GridSpec | |
| import json | |
| import time | |
| from bs4 import BeautifulSoup | |
| # Selenium-related imports | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.webdriver.chrome.service import Service | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| def get_raw_data(station_id): | |
| """ | |
| Get raw data from the NWS API. | |
| """ | |
| headers = { | |
| 'User-Agent': '(Weather Data Viewer, [email protected])', | |
| 'Accept': 'application/json' | |
| } | |
| # Calculate date range for last 3 days | |
| end_time = datetime.utcnow() | |
| start_time = end_time - timedelta(hours=72) | |
| params = { | |
| 'start': start_time.isoformat() + 'Z', | |
| 'end': end_time.isoformat() + 'Z' | |
| } | |
| url = f"https://api.weather.gov/stations/{station_id}/observations" | |
| try: | |
| print("\nFetching observations...") | |
| print(f"URL: {url}") | |
| print(f"Time range: {start_time} to {end_time}") | |
| response = requests.get(url, headers=headers, params=params) | |
| print(f"Response status: {response.status_code}") | |
| if response.status_code != 200: | |
| print(f"Response content: {response.text}") | |
| response.raise_for_status() | |
| data = response.json() | |
| if 'features' in data: | |
| print(f"\nNumber of observations: {len(data['features'])}") | |
| if len(data['features']) > 0: | |
| print("\nFirst observation properties:") | |
| print(json.dumps(data['features'][0]['properties'], indent=2)) | |
| keys = set() | |
| for feature in data['features']: | |
| keys.update(feature['properties'].keys()) | |
| print("\nAll available property keys:") | |
| print(sorted(list(keys))) | |
| return data | |
| except Exception as e: | |
| print(f"Error fetching data: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def scrape_snow_depth(): | |
| """ | |
| Uses Selenium with a headless browser to load the weather.gov timeseries page, | |
| waits until an element containing 'Snow Depth' is present, then extracts the table data. | |
| Returns a DataFrame with columns "timestamp" and "snowDepth". | |
| """ | |
| url = ("https://www.weather.gov/wrh/timeseries?" | |
| "site=YCTIM&hours=720&units=english&chart=on&headers=on&" | |
| "obs=tabular&hourly=false&pview=standard&font=12&plot=") | |
| # Set up headless Chrome options | |
| chrome_options = Options() | |
| chrome_options.add_argument("--headless") | |
| chrome_options.add_argument("--no-sandbox") | |
| chrome_options.add_argument("--disable-dev-shm-usage") | |
| # Tell Selenium where Chromium is located | |
| chrome_options.binary_location = "/usr/bin/chromium-browser" | |
| # Initialize Chrome using the Service object (Selenium 4+) | |
| service = Service(ChromeDriverManager().install()) | |
| driver = webdriver.Chrome(service=service, options=chrome_options) | |
| driver.get(url) | |
| try: | |
| # Wait up to 30 seconds for any element containing the text "Snow Depth" to appear | |
| WebDriverWait(driver, 30).until( | |
| EC.presence_of_element_located((By.XPATH, "//*[contains(text(), 'Snow Depth')]")) | |
| ) | |
| except Exception as e: | |
| print("Timeout waiting for 'Snow Depth' element to appear:", e) | |
| # Allow extra time for dynamic content to load | |
| time.sleep(5) | |
| page_source = driver.page_source | |
| driver.quit() | |
| soup = BeautifulSoup(page_source, 'html.parser') | |
| # Look through all tables for one that contains "Snow Depth" in its text | |
| tables = soup.find_all("table") | |
| target_table = None | |
| for table in tables: | |
| table_text = table.get_text() | |
| print("Found table text snippet:", table_text[:100]) | |
| if "Snow Depth" in table_text: | |
| target_table = table | |
| break | |
| if target_table is None: | |
| print("No table with 'Snow Depth' found in the page.") | |
| return pd.DataFrame() | |
| # Look for header cells in the table | |
| header_row = target_table.find("tr") | |
| if not header_row: | |
| print("No header row found in the table.") | |
| return pd.DataFrame() | |
| headers = [th.get_text(strip=True) for th in header_row.find_all("th")] | |
| print("Table headers found:", headers) | |
| # Identify column indices (using case-insensitive match) | |
| time_index = None | |
| snow_index = None | |
| for i, header in enumerate(headers): | |
| if "time" in header.lower(): | |
| time_index = i | |
| if "snow" in header.lower(): | |
| snow_index = i | |
| if time_index is None or snow_index is None: | |
| print("Required columns ('Time' and 'Snow Depth') not found in the table headers.") | |
| return pd.DataFrame() | |
| # Extract rows (skip header) | |
| data = [] | |
| rows = target_table.find_all("tr")[1:] | |
| for row in rows: | |
| cells = row.find_all("td") | |
| if len(cells) <= max(time_index, snow_index): | |
| continue | |
| time_text = cells[time_index].get_text(strip=True) | |
| snow_text = cells[snow_index].get_text(strip=True) | |
| data.append((time_text, snow_text)) | |
| df = pd.DataFrame(data, columns=["Time", "Snow Depth"]) | |
| # Convert the "Time" column to datetime | |
| df["Time"] = pd.to_datetime(df["Time"], errors="coerce") | |
| # Convert "Snow Depth" to numeric (in inches) | |
| df["Snow Depth"] = pd.to_numeric(df["Snow Depth"], errors="coerce") | |
| print("Scraped snow depth data:") | |
| print(df.head()) | |
| # Rename columns to match API data | |
| return df.rename(columns={"Time": "timestamp", "Snow Depth": "snowDepth"}) | |
| def parse_raw_data(data): | |
| """ | |
| Parse the raw JSON API data into a DataFrame. | |
| """ | |
| if not data or 'features' not in data: | |
| return None | |
| records = [] | |
| for feature in data['features']: | |
| props = feature['properties'] | |
| # Extract any snow-related fields if present | |
| snow_fields = {k: v for k, v in props.items() if 'snow' in k.lower()} | |
| if snow_fields: | |
| print("\nFound snow-related fields:") | |
| for k, v in snow_fields.items(): | |
| print(f"{k}: {v}") | |
| record = { | |
| 'timestamp': props['timestamp'], | |
| 'temperature': props.get('temperature', {}).get('value'), | |
| 'wind_speed': props.get('windSpeed', {}).get('value'), | |
| 'wind_direction': props.get('windDirection', {}).get('value') | |
| } | |
| # Add any snow fields | |
| for k, v in snow_fields.items(): | |
| if isinstance(v, dict) and 'value' in v: | |
| record[k] = v['value'] | |
| else: | |
| record[k] = v | |
| records.append(record) | |
| df = pd.DataFrame(records) | |
| print("\nDataFrame columns from API:") | |
| print(df.columns.tolist()) | |
| print("\nSample of raw API data:") | |
| print(df.head()) | |
| return df | |
| def process_weather_data(df): | |
| """ | |
| Process the weather DataFrame. | |
| """ | |
| if df is None or df.empty: | |
| return None | |
| # Convert timestamp column to datetime | |
| df['timestamp'] = pd.to_datetime(df['timestamp']) | |
| df['date'] = df['timestamp'].dt.date | |
| # Convert temperature from Celsius to Fahrenheit if available | |
| if df['temperature'].notna().all(): | |
| df['temperature'] = (df['temperature'] * 9/5) + 32 | |
| # Convert wind speed from km/h to mph if available (original unit is km/h) | |
| if df['wind_speed'].notna().all(): | |
| df['wind_speed'] = df['wind_speed'] * 0.621371 | |
| return df | |
| def create_wind_rose(ax, data, title): | |
| """ | |
| Create a wind rose subplot. | |
| """ | |
| if data.empty or data['wind_direction'].isna().all() or data['wind_speed'].isna().all(): | |
| ax.text(0.5, 0.5, 'No wind data available', | |
| horizontalalignment='center', | |
| verticalalignment='center', | |
| transform=ax.transAxes) | |
| ax.set_title(title) | |
| return | |
| plot_data = data.copy() | |
| direction_bins = np.arange(0, 361, 45) | |
| directions = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW'] | |
| mask = plot_data['wind_direction'].notna() & plot_data['wind_speed'].notna() | |
| plot_data = plot_data[mask] | |
| if plot_data.empty: | |
| ax.text(0.5, 0.5, 'No valid wind data', | |
| horizontalalignment='center', | |
| verticalalignment='center', | |
| transform=ax.transAxes) | |
| ax.set_title(title) | |
| return | |
| plot_data.loc[:, 'direction_bin'] = pd.cut(plot_data['wind_direction'], | |
| bins=direction_bins, | |
| labels=directions, | |
| include_lowest=True) | |
| wind_stats = plot_data.groupby('direction_bin', observed=True)['wind_speed'].mean() | |
| all_directions = pd.Series(0.0, index=directions) | |
| wind_stats = wind_stats.combine_first(all_directions) | |
| angles = np.linspace(0, 2*np.pi, len(directions), endpoint=False) | |
| values = [wind_stats[d] for d in directions] | |
| if any(v > 0 for v in values): | |
| ax.bar(angles, values, width=0.5, alpha=0.6) | |
| ax.set_xticks(angles) | |
| ax.set_xticklabels(directions) | |
| else: | |
| ax.text(0.5, 0.5, 'No significant wind', | |
| horizontalalignment='center', | |
| verticalalignment='center', | |
| transform=ax.transAxes) | |
| ax.set_title(title) | |
| def create_visualizations(df): | |
| """ | |
| Create static visualizations using matplotlib. | |
| Plots temperature, wind speed, and snow depth. | |
| """ | |
| fig = plt.figure(figsize=(20, 24)) | |
| gs = GridSpec(5, 2, figure=fig) | |
| ax1 = fig.add_subplot(gs[0, :]) | |
| ax2 = fig.add_subplot(gs[1, :]) | |
| ax3 = fig.add_subplot(gs[2, :]) | |
| if not df['temperature'].isna().all(): | |
| ax1.plot(df['timestamp'], df['temperature'], linewidth=2) | |
| ax1.set_title('Temperature Over Time') | |
| ax1.set_ylabel('Temperature (°F)') | |
| ax1.set_xlabel('') | |
| ax1.grid(True) | |
| if not df['wind_speed'].isna().all(): | |
| ax2.plot(df['timestamp'], df['wind_speed'], linewidth=2) | |
| ax2.set_title('Wind Speed Over Time') | |
| ax2.set_ylabel('Wind Speed (mph)') | |
| ax2.set_xlabel('') | |
| ax2.grid(True) | |
| if 'snowDepth' in df.columns and not df['snowDepth'].isna().all(): | |
| ax3.plot(df['timestamp'], df['snowDepth'], linewidth=2) | |
| ax3.set_ylim(0, 80) | |
| else: | |
| ax3.text(0.5, 0.5, 'No snow depth data available', | |
| horizontalalignment='center', | |
| verticalalignment='center', | |
| transform=ax3.transAxes) | |
| ax3.set_title('Snow Depth') | |
| ax3.set_ylabel('Snow Depth (inches)') | |
| ax3.set_xlabel('') | |
| ax3.grid(True) | |
| for ax in [ax1, ax2, ax3]: | |
| ax.tick_params(axis='x', rotation=45) | |
| ax.xaxis.set_major_formatter(plt.matplotlib.dates.DateFormatter('%Y-%m-%d %H:%M')) | |
| dates = sorted(df['date'].unique()) | |
| for i, date in enumerate(dates): | |
| if i < 2: | |
| ax = fig.add_subplot(gs[4, i], projection='polar') | |
| day_data = df[df['date'] == date].copy() | |
| create_wind_rose(ax, day_data, pd.to_datetime(date).strftime('%Y-%m-%d')) | |
| plt.tight_layout() | |
| return fig | |
| def get_weather_data(station_id, hours): | |
| """ | |
| Main function to get and process weather data. | |
| Combines API data and scraped snow depth data. | |
| """ | |
| try: | |
| raw_data = get_raw_data(station_id) | |
| if raw_data is None: | |
| return None, "Failed to fetch data from API" | |
| df = parse_raw_data(raw_data) | |
| if df is None: | |
| return None, "Failed to parse API data" | |
| df = process_weather_data(df) | |
| if df is None: | |
| return None, "Failed to process API data" | |
| # Attempt to scrape snow depth data using Selenium | |
| snow_df = scrape_snow_depth() | |
| if not snow_df.empty: | |
| df = df.sort_values('timestamp') | |
| snow_df = snow_df.sort_values('timestamp') | |
| df = pd.merge_asof(df, snow_df, on='timestamp', tolerance=pd.Timedelta('30min'), direction='nearest') | |
| print("\nProcessed combined data sample:") | |
| print(df.head()) | |
| return df, None | |
| except Exception as e: | |
| return None, f"Error: {str(e)}" | |
| def fetch_and_display(station_id, hours): | |
| """ | |
| Fetch data and create visualization. | |
| """ | |
| df, error = get_weather_data(station_id, hours) | |
| if error: | |
| return None, error | |
| if df is not None and not df.empty: | |
| fig = create_visualizations(df) | |
| return fig, "Data fetched successfully!" | |
| return None, "No data available for the specified parameters." | |
| # Create Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Weather Data Viewer") | |
| gr.Markdown("Displays temperature, wind speed, and snow depth from NWS stations.") | |
| with gr.Row(): | |
| station_id = gr.Textbox(label="Station ID", value="YCTIM") | |
| hours = gr.Slider(minimum=24, maximum=168, value=72, label="Hours of Data", step=24) | |
| fetch_btn = gr.Button("Fetch Data") | |
| plot_output = gr.Plot() | |
| message = gr.Textbox(label="Status") | |
| fetch_btn.click( | |
| fn=fetch_and_display, | |
| inputs=[station_id, hours], | |
| outputs=[plot_output, message] | |
| ) | |
| # Launch the app | |
| demo.launch() |