import os import logging import random from datetime import datetime, timedelta, date from typing import Optional, Dict, List, Any import csv from io import StringIO from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from dotenv import dotenv_values # Import your tools from tools import ( open_meteo, tomorrow_io, google_weather, openweathermap, accuweather, openai_llm, geographic_tools, crop_calendar_tools, alert_generation_tools ) from a2a_agents import sms_agent, whatsapp_agent, ussd_agent, ivr_agent, telegram_agent from utils.weather_utils import get_tool_config # Configuration config = dotenv_values(".env") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") logging.basicConfig(level=LOG_LEVEL) logger = logging.getLogger(__name__) # Verify API keys openai_key = config.get("OPENAI_API_KEY") or os.getenv("OPENAI_API_KEY") if not openai_key: logger.warning("OpenAI API key not found - AI features will be limited") else: logger.info("OpenAI API key found") app = FastAPI(title="MCP Weather Server", version="1.0.0") # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["https://mcp-ui.vercel.app", "*"], # Add * for development allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], expose_headers=["*"] ) # Pydantic models class MCPRequest(BaseModel): tool: str parameters: dict class AlertRequest(BaseModel): alert_json: dict class WorkflowRequest(BaseModel): state: str district: str # Crop calendar constants CROP_CALENDAR = { "rice": { "season": "Kharif", "planting": "June-July", "harvesting": "October-November", "duration_days": 120, "stages": ["Nursery/Seedling", "Transplanting", "Vegetative", "Tillering", "Panicle Initiation", "Flowering", "Milk/Dough", "Maturity", "Harvesting"] }, "wheat": { "season": "Rabi", "planting": "November-December", "harvesting": "March-April", "duration_days": 120, "stages": ["Sowing", "Germination", "Tillering", "Jointing", "Booting", "Heading", "Flowering", "Grain Filling", "Maturity", "Harvesting"] }, "maize": { "season": "Kharif/Zaid", "planting": "June-July / March-April", "harvesting": "September-October / June", "duration_days": 110, "stages": ["Sowing", "Emergence", "Vegetative", "Tasseling", "Silking", "Grain Filling", "Maturity", "Harvesting"] }, "sugarcane": { "season": "Annual", "planting": "February-March", "harvesting": "December-January", "duration_days": 300, "stages": ["Planting", "Germination", "Tillering", "Grand Growth", "Maturation", "Ripening", "Harvesting"] }, "mustard": { "season": "Rabi", "planting": "October-November", "harvesting": "February-March", "duration_days": 110, "stages": ["Sowing", "Germination", "Rosette", "Stem Elongation", "Flowering", "Pod Formation", "Pod Filling", "Maturity", "Harvesting"] } } # District-specific crop preferences for Bihar DISTRICT_CROPS = { 'patna': {'primary': ['rice', 'wheat', 'potato'], 'secondary': ['mustard', 'gram'], 'specialty': ['sugarcane']}, 'gaya': {'primary': ['wheat', 'rice'], 'secondary': ['barley', 'mustard'], 'specialty': ['gram']}, 'bhagalpur': {'primary': ['rice', 'maize', 'wheat'], 'secondary': ['jute'], 'specialty': ['groundnut']}, 'muzaffarpur': {'primary': ['sugarcane', 'rice', 'wheat'], 'secondary': ['potato', 'mustard'], 'specialty': ['lentil']}, 'darbhanga': {'primary': ['rice', 'wheat', 'maize'], 'secondary': ['gram'], 'specialty': ['bajra']}, 'siwan': {'primary': ['rice', 'wheat'], 'secondary': ['gram', 'lentil'], 'specialty': ['mustard']}, 'begusarai': {'primary': ['rice', 'wheat'], 'secondary': ['jute', 'mustard'], 'specialty': ['moong']}, 'katihar': {'primary': ['maize', 'rice'], 'secondary': ['jute'], 'specialty': ['jowar']} } def get_current_season(month: int) -> str: """Determine current agricultural season""" if month in [6, 7, 8, 9]: # June to September return 'kharif' elif month in [10, 11, 12, 1, 2, 3]: # October to March return 'rabi' else: # April, May return 'zaid' def select_regional_crop(district: str, state: str) -> str: """Select appropriate crop based on district, season, and preferences""" if state.lower() != 'bihar': return 'rice' # fallback current_month = datetime.now().month current_season = get_current_season(current_month) # Get district preferences district_prefs = DISTRICT_CROPS.get(district.lower(), { 'primary': ['rice', 'wheat'], 'secondary': ['gram'], 'specialty': ['maize'] }) # Season-specific crop filtering seasonal_crops = { 'kharif': ['rice', 'maize', 'sugarcane', 'jowar', 'bajra', 'groundnut'], 'rabi': ['wheat', 'barley', 'gram', 'lentil', 'mustard', 'potato'], 'zaid': ['maize', 'moong', 'watermelon', 'cucumber'] } # Combine district and seasonal preferences all_district_crops = (district_prefs.get('primary', []) + district_prefs.get('secondary', []) + district_prefs.get('specialty', [])) suitable_crops = [crop for crop in all_district_crops if crop in seasonal_crops.get(current_season, [])] if not suitable_crops: suitable_crops = district_prefs.get('primary', ['rice']) # Weighted selection (primary crops more likely) weighted_crops = [] for crop in suitable_crops: if crop in district_prefs.get('primary', []): weighted_crops.extend([crop] * 5) elif crop in district_prefs.get('secondary', []): weighted_crops.extend([crop] * 3) else: weighted_crops.extend([crop] * 1) selected_crop = random.choice(weighted_crops) if weighted_crops else 'rice' logger.info(f"Selected crop: {selected_crop} for {district} in {current_season} season") return selected_crop def estimate_crop_stage(crop: str, current_month: int) -> str: """Estimate current crop stage based on crop type and month""" if crop not in CROP_CALENDAR: return 'Growing' crop_data = CROP_CALENDAR[crop] stages = crop_data['stages'] # Month-based stage estimation stage_mappings = { 'rice': {6: 0, 7: 1, 8: 2, 9: 3, 10: 4, 11: 5, 12: 6, 1: 7, 2: 8}, 'wheat': {11: 0, 12: 1, 1: 2, 2: 3, 3: 4, 4: 5}, 'maize': {6: 0, 7: 1, 8: 2, 9: 3, 10: 4, 11: 5, 3: 0, 4: 1, 5: 2}, 'sugarcane': {2: 0, 3: 1, 4: 2, 5: 3, 6: 3, 7: 3, 8: 4, 9: 4, 10: 5, 11: 6, 12: 6, 1: 6}, 'mustard': {10: 0, 11: 1, 12: 2, 1: 3, 2: 4, 3: 5} } crop_mapping = stage_mappings.get(crop, {}) stage_index = crop_mapping.get(current_month, len(stages) // 2) # Default to middle stage stage_index = min(stage_index, len(stages) - 1) return stages[stage_index] if stages else 'Growing' async def get_location_coordinates(village: str, district: str) -> tuple[list, str]: """Get coordinates for village or district with fallback""" location_coords = None location_source = "" # Try village coordinates first try: village_location = await geographic_tools.reverse_geocode(village) if "error" not in village_location and "latitude" in village_location: location_coords = [village_location["latitude"], village_location["longitude"]] location_source = f"village_{village}" logger.info(f"Using village coordinates for {village}: {location_coords}") except Exception as e: logger.warning(f"Village geocoding failed for {village}: {e}") # Fallback to district coordinates if not location_coords: try: district_location = await geographic_tools.reverse_geocode(district) if "error" not in district_location and "latitude" in district_location: location_coords = [district_location["latitude"], district_location["longitude"]] location_source = f"district_{district}" logger.info(f"Using district coordinates for {district}: {location_coords}") except Exception as e: logger.warning(f"District geocoding failed for {district}: {e}") # Final fallback if not location_coords: logger.warning(f"No coordinates found, using Patna fallback") location_coords = [25.5941, 85.1376] # Patna coordinates location_source = "fallback_patna" return location_coords, location_source async def generate_weather_based_alert(weather_data: dict, crop: str, crop_stage: str, village: str, district: str) -> tuple[str, str, str, list]: """Generate alert based on weather conditions""" current_weather = weather_data.get('current_weather', {}) daily_forecast = weather_data.get('daily', {}) current_temp = current_weather.get('temperature', 25) current_windspeed = current_weather.get('windspeed', 10) precipitation_list = daily_forecast.get('precipitation_sum', [0, 0, 0]) next_3_days_rain = sum(precipitation_list[:3]) if precipitation_list else 0 # Generate alert based on conditions if next_3_days_rain > 25: alert_type = "heavy_rain_warning" urgency = "high" alert_message = (f"Heavy rainfall ({next_3_days_rain:.1f}mm) expected in next 3 days " f"near {village}, {district}. {crop} at {crop_stage} stage may be affected. " f"Delay fertilizer application and ensure proper drainage.") action_items = ["delay_fertilizer", "check_drainage", "monitor_crops", "prepare_harvest_protection"] elif next_3_days_rain > 10: alert_type = "moderate_rain_warning" urgency = "medium" alert_message = (f"Moderate rainfall ({next_3_days_rain:.1f}mm) expected in next 3 days " f"near {village}, {district}. Monitor {crop} at {crop_stage} stage carefully.") action_items = ["monitor_soil", "check_drainage", "adjust_irrigation"] elif next_3_days_rain < 2 and current_temp > 35: alert_type = "heat_drought_warning" urgency = "high" alert_message = (f"High temperature ({current_temp:.1f}°C) with minimal rainfall expected " f"near {village}, {district}. {crop} at {crop_stage} stage needs extra care. " f"Increase irrigation frequency.") action_items = ["increase_irrigation", "mulch_crops", "monitor_plant_stress"] elif current_temp < 10: alert_type = "cold_warning" urgency = "medium" alert_message = (f"Low temperature ({current_temp:.1f}°C) expected near {village}, {district}. " f"Protect {crop} crops from cold damage.") action_items = ["protect_crops", "cover_seedlings", "adjust_irrigation_timing"] elif current_windspeed > 30: alert_type = "high_wind_warning" urgency = "medium" alert_message = (f"High winds ({current_windspeed:.1f} km/h) expected near {village}, {district}. " f"Secure {crop} crop supports and structures.") action_items = ["secure_supports", "check_structures", "monitor_damage"] else: alert_type = "weather_update" urgency = "low" alert_message = (f"Normal weather conditions expected near {village}, {district}. " f"{crop} at {crop_stage} stage. Temperature {current_temp:.1f}°C, " f"rainfall {next_3_days_rain:.1f}mm.") action_items = ["routine_monitoring", "maintain_irrigation"] return alert_type, urgency, alert_message, action_items async def generate_ai_alert(latitude: float, longitude: float, crop: str, crop_stage: str, village: str, district: str) -> Optional[dict]: """Generate AI-powered weather alert using available tools""" if not openai_key: logger.warning("No OpenAI API key - skipping AI alert generation") return None try: # First get weather data for the AI context current_weather_data = await open_meteo.get_current_weather( latitude=latitude, longitude=longitude ) forecast_data = await open_meteo.get_weather_forecast( latitude=latitude, longitude=longitude, days=7 ) # Prepare weather context for AI current_weather = current_weather_data.get('current_weather', {}) daily_forecast = forecast_data.get('daily', {}) weather_context = { 'temperature': current_weather.get('temperature', 25), 'windspeed': current_weather.get('windspeed', 10), 'precipitation_forecast': daily_forecast.get('precipitation_sum', [0, 0, 0])[:3] } # Use the generate_weather_alert function from your tools ai_alert = await alert_generation_tools.generate_weather_alert( crop=crop, weather_data=weather_context, growth_stage=crop_stage, api_key=openai_key, latitude=latitude, longitude=longitude ) # Extract AI response (adjust based on your actual function return format) if isinstance(ai_alert, dict): alert_description = ai_alert.get('alert', 'Weather update for agricultural activities') impact_description = ai_alert.get('impact', 'Monitor crops regularly') recommendations = ai_alert.get('recommendations', 'Continue routine farming activities') else: # If it returns a string or other format alert_description = str(ai_alert) impact_description = 'Monitor crops regularly' recommendations = 'Follow weather updates and maintain good practices' # Create enhanced alert message alert_message = f"🤖 AI Weather Alert for {village}, {district}: {alert_description}" if impact_description and impact_description.lower() not in ['none', 'n/a', '']: alert_message += f" 🌾 Crop Impact ({crop} - {crop_stage}): {impact_description}" return { 'alert': alert_description, 'impact': impact_description, 'recommendations': recommendations, 'enhanced_message': alert_message } except Exception as e: logger.error(f"AI alert generation failed: {e}") return None async def generate_dynamic_alert(district: str, state: str) -> dict: """Main function to generate comprehensive weather alert""" try: # Step 1: Get villages for the district villages_data = await geographic_tools.list_villages(state, district) if "error" in villages_data: raise Exception(f"District '{district}' not found in {state}") available_villages = villages_data.get("villages", []) if not available_villages: raise Exception(f"No villages found for {district}") # Step 2: Select random village selected_village = random.choice(available_villages) logger.info(f"Selected village: {selected_village} from {len(available_villages)} villages") # Step 3: Get coordinates location_coords, location_source = await get_location_coordinates(selected_village, district) # Step 4: Generate crop selection and stage regional_crop = select_regional_crop(district, state) crop_stage = estimate_crop_stage(regional_crop, datetime.now().month) # Step 5: Get weather data try: current_weather_data = await open_meteo.get_current_weather( latitude=location_coords[0], longitude=location_coords[1] ) forecast_data = await open_meteo.get_weather_forecast( latitude=location_coords[0], longitude=location_coords[1], days=7 ) logger.info(f"Weather data retrieved for {selected_village}, {district}") except Exception as weather_error: logger.error(f"Failed to get weather data: {weather_error}") raise Exception(f"Unable to retrieve weather conditions for {selected_village}, {district}") # Step 6: Generate alerts alert_type, urgency, alert_message, action_items = await generate_weather_based_alert( {'current_weather': current_weather_data.get('current_weather', {}), 'daily': forecast_data.get('daily', {})}, regional_crop, crop_stage, selected_village, district ) # Step 7: Try to enhance with AI if available ai_analysis = await generate_ai_alert( location_coords[0], location_coords[1], regional_crop, crop_stage, selected_village, district ) # Step 8: Prepare weather context current_weather = current_weather_data.get('current_weather', {}) daily_forecast = forecast_data.get('daily', {}) current_temp = current_weather.get('temperature', 25) current_windspeed = current_weather.get('windspeed', 10) precipitation_list = daily_forecast.get('precipitation_sum', [0, 0, 0]) next_3_days_rain = sum(precipitation_list[:3]) if precipitation_list else 0 rain_probability = min(90, max(10, int(next_3_days_rain * 10))) if next_3_days_rain > 0 else 10 estimated_humidity = min(95, max(40, 60 + int(next_3_days_rain * 2))) weather_context = { "forecast_days": 7, "rain_probability": rain_probability, "expected_rainfall": f"{next_3_days_rain:.1f}mm", "temperature": f"{current_temp:.1f}°C", "humidity": f"{estimated_humidity}%", "wind_speed": f"{current_windspeed:.1f} km/h", "coordinates_source": location_source } # Generate unique alert ID alert_id = f"{state.upper()[:2]}_{district.upper()[:3]}_{selected_village.upper()[:3]}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Build final response result = { "alert_id": alert_id, "timestamp": datetime.now().isoformat() + "Z", "location": { "village": selected_village, "district": district, "state": state.capitalize(), "coordinates": location_coords, "coordinates_source": location_source, "total_villages_in_district": len(available_villages) }, "crop": { "name": regional_crop, "stage": crop_stage, "season": get_current_season(datetime.now().month), "planted_estimate": "2025-06-15" }, "alert": { "type": alert_type, "urgency": urgency, "message": ai_analysis['enhanced_message'] if ai_analysis else alert_message, "action_items": action_items, "valid_until": (datetime.now() + timedelta(days=3)).isoformat() + "Z", "ai_generated": ai_analysis is not None }, "weather": weather_context, "data_source": "open_meteo_api_with_ai_enhancement" if ai_analysis else "open_meteo_api" } if ai_analysis: result["ai_analysis"] = { "alert": ai_analysis['alert'], "impact": ai_analysis['impact'], "recommendations": ai_analysis['recommendations'] } return result except Exception as e: logger.error(f"Error generating alert for {district}, {state}: {e}") raise Exception(f"Failed to generate weather alert for {district}: {str(e)}") # API Routes @app.get("/") async def root(): return {"message": "MCP Weather Server v1.0 - AI-Powered Agricultural Alerts", "status": "running"} @app.get("/api/health") async def health_check(): return { "status": "healthy", "message": "API is working", "openai_available": openai_key is not None, "timestamp": datetime.now().isoformat() } @app.post("/api/run-workflow") async def run_workflow(request: WorkflowRequest): """Main workflow endpoint for generating comprehensive agricultural alerts""" logger.info(f"Received workflow request: {request.state}, {request.district}") workflow_results = [] try: # Workflow header workflow_results.extend([ f"🌾 Agricultural Alert Workflow for {request.district.title()}, {request.state.title()}", "=" * 70, "", "🌤️ Weather Data Collection", "-" * 30 ]) # Generate dynamic alert sample_alert = await generate_dynamic_alert(request.district, request.state) workflow_results.extend([ "✅ Weather data retrieved successfully", f" 📍 Location: {sample_alert['location']['village']}, {sample_alert['location']['district']}", f" 🌡️ Temperature: {sample_alert['weather']['temperature']}", f" 🌧️ Expected Rainfall: {sample_alert['weather']['expected_rainfall']}", f" 💨 Wind Speed: {sample_alert['weather']['wind_speed']}", f" 🌾 Crop: {sample_alert['crop']['name']} ({sample_alert['crop']['stage']})", f" 🚨 Alert Type: {sample_alert['alert']['type']} - {sample_alert['alert']['urgency'].upper()} priority", "" ]) # Generate agent responses agents = [ ("📱 WhatsApp Agent", whatsapp_agent.create_whatsapp_message), ("📱 SMS Agent", sms_agent.create_sms_message), ("📞 USSD Agent", ussd_agent.create_ussd_menu), ("🎙️ IVR Agent", ivr_agent.create_ivr_script), ("🤖 Telegram Agent", telegram_agent.create_telegram_message) ] agent_responses = {} for agent_name, agent_func in agents: workflow_results.extend([agent_name, "-" * 30]) try: response = agent_func(sample_alert) workflow_results.append("✅ Message generated successfully") agent_responses[agent_name] = response # Add preview for some agents if "WhatsApp" in agent_name: text = response.get('text', 'N/A') workflow_results.append(f" Preview: {text[:100]}..." if len(text) > 100 else f" Preview: {text}") elif "SMS" in agent_name: workflow_results.append(f" Preview: {str(response)[:100]}...") except Exception as e: workflow_results.append(f"❌ Error: {str(e)}") agent_responses[agent_name] = f"Error: {str(e)}" workflow_results.append("") # Summary workflow_results.extend([ "✅ Workflow Summary", "-" * 30, f"🎯 Successfully generated alerts for {sample_alert['location']['village']}, {request.district}", f"📊 Data Sources: {sample_alert['data_source']}", f"🤖 AI Enhanced: {'Yes' if sample_alert['alert']['ai_generated'] else 'No'}", f"⏰ Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}", f"📱 Agents Processed: {len([r for r in agent_responses.values() if not str(r).startswith('Error:')])}/{len(agents)}" ]) # Generate CSV csv_content = generate_csv_export(sample_alert, agent_responses) return { "message": "\n".join(workflow_results), "status": "success", "csv": csv_content, "raw_data": { "state": request.state, "district": request.district, "alert_data": sample_alert, "agent_responses": agent_responses } } except Exception as e: error_msg = f"❌ Workflow failed: {str(e)}" workflow_results.append(error_msg) logger.exception(f"Workflow error for {request.district}, {request.state}") return { "message": "\n".join(workflow_results), "status": "error", "csv": "", "error": str(e) } def generate_csv_export(alert_data: dict, agent_responses: dict) -> str: """Generate CSV export of workflow results""" try: csv_buffer = StringIO() writer = csv.writer(csv_buffer) # Headers headers = ["Field", "Value"] writer.writerow(headers) # Alert data writer.writerow(["Alert ID", alert_data["alert_id"]]) writer.writerow(["Village", alert_data["location"]["village"]]) writer.writerow(["District", alert_data["location"]["district"]]) writer.writerow(["State", alert_data["location"]["state"]]) writer.writerow(["Coordinates", str(alert_data["location"]["coordinates"])]) writer.writerow(["Crop", alert_data["crop"]["name"]]) writer.writerow(["Crop Stage", alert_data["crop"]["stage"]]) writer.writerow(["Temperature", alert_data["weather"]["temperature"]]) writer.writerow(["Rainfall", alert_data["weather"]["expected_rainfall"]]) writer.writerow(["Alert Type", alert_data["alert"]["type"]]) writer.writerow(["Urgency", alert_data["alert"]["urgency"]]) writer.writerow(["Alert Message", alert_data["alert"]["message"]]) # Agent responses writer.writerow([]) # Empty row writer.writerow(["Agent", "Response"]) for agent_name, response in agent_responses.items(): clean_agent_name = agent_name.replace("📱 ", "").replace("📞 ", "").replace("🎙️ ", "").replace("🤖 ", "") writer.writerow([clean_agent_name, str(response)[:500]]) # Limit response length return csv_buffer.getvalue() except Exception as e: logger.error(f"CSV generation error: {e}") return f"Error generating CSV: {str(e)}" # Other API endpoints @app.post("/mcp") async def mcp_endpoint(request: MCPRequest): """MCP tool execution endpoint""" logger.info(f"Received request for tool: {request.tool}") tool_config = get_tool_config(request.tool) if not tool_config: logger.error(f"Tool not found: {request.tool}") raise HTTPException(status_code=404, detail="Tool not found") try: # Route to appropriate module module_map = { "open_meteo": open_meteo, "tomorrow_io": tomorrow_io, "google_weather": google_weather, "openweathermap": openweathermap, "accuweather": accuweather, "openai_llm": openai_llm, "geographic_tools": geographic_tools, "crop_calendar_tools": crop_calendar_tools, "alert_generation_tools": alert_generation_tools } module = module_map.get(tool_config["module"]) if not module: raise HTTPException(status_code=500, detail="Invalid tool module") # Add API key if needed params = request.parameters.copy() if tool_config["module"] in ["tomorrow_io", "google_weather", "openweathermap", "accuweather"]: api_key_map = { "tomorrow_io": "TOMORROW_IO_API_KEY", "google_weather": "GOOGLE_WEATHER_API_KEY", "openweathermap": "OPENWEATHERMAP_API_KEY", "accuweather": "ACCUWEATHER_API_KEY" } key_name = api_key_map[tool_config["module"]] params["api_key"] = config.get(key_name) elif tool_config["module"] in ["openai_llm", "alert_generation_tools"]: params["api_key"] = openai_key # Execute tool function result = await getattr(module, request.tool)(**params) logger.info(f"Successfully executed tool: {request.tool}") return result except Exception as e: logger.exception(f"Error executing tool: {request.tool}") raise HTTPException(status_code=500, detail=str(e)) # A2A Agent endpoints @app.post("/a2a/sms") async def a2a_sms_endpoint(request: AlertRequest): """SMS agent endpoint""" return {"message": sms_agent.create_sms_message(request.alert_json)} @app.post("/a2a/whatsapp") async def a2a_whatsapp_endpoint(request: AlertRequest): """WhatsApp agent endpoint""" return whatsapp_agent.create_whatsapp_message(request.alert_json) @app.post("/a2a/ussd") async def a2a_ussd_endpoint(request: AlertRequest): """USSD agent endpoint""" return {"menu": ussd_agent.create_ussd_menu(request.alert_json)} @app.post("/a2a/ivr") async def a2a_ivr_endpoint(request: AlertRequest): """IVR agent endpoint""" return {"script": ivr_agent.create_ivr_script(request.alert_json)} @app.post("/a2a/telegram") async def a2a_telegram_endpoint(request: AlertRequest): """Telegram agent endpoint""" return telegram_agent.create_telegram_message(request.alert_json) # MCP RPC handler for external integration @app.post("/mcp-rpc") async def mcp_rpc_handler(request: dict): """JSON-RPC handler for MCP integration""" method = request.get("method") params = request.get("params", {}) tool_name = params.get("tool_name") arguments = params.get("arguments", {}) req_id = request.get("id") # Handle run_workflow tool if method == "call_tool" and tool_name == "run_workflow": state = arguments.get("state") district = arguments.get("district") result = await run_workflow(WorkflowRequest(state=state, district=district)) return {"jsonrpc": "2.0", "result": result, "id": req_id} # Handle other tools dynamically if method == "call_tool": try: result = await mcp_endpoint(MCPRequest(tool=tool_name, parameters=arguments)) return {"jsonrpc": "2.0", "result": result, "id": req_id} except Exception as e: return {"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}, "id": req_id} return {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Unknown method"}, "id": req_id} # Additional utility endpoints @app.get("/api/districts/{state}") async def get_districts(state: str): """Get list of districts for a state""" try: result = await geographic_tools.list_villages(state) if "districts" in result: return {"districts": result["districts"]} return {"error": "State not found"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/villages/{state}/{district}") async def get_villages(state: str, district: str): """Get list of villages for a district""" try: result = await geographic_tools.list_villages(state, district) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/crops/{region}") async def get_crops(region: str): """Get list of crops for a region""" try: result = await crop_calendar_tools.get_crop_calendar(region) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/weather/{latitude}/{longitude}") async def get_weather(latitude: float, longitude: float): """Get current weather for coordinates""" try: current_weather = await open_meteo.get_current_weather(latitude=latitude, longitude=longitude) forecast = await open_meteo.get_weather_forecast(latitude=latitude, longitude=longitude, days=7) return { "current": current_weather, "forecast": forecast, "timestamp": datetime.now().isoformat() } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Error handlers @app.exception_handler(HTTPException) async def http_exception_handler(request, exc): logger.error(f"HTTP {exc.status_code}: {exc.detail}") return { "error": exc.detail, "status_code": exc.status_code, "timestamp": datetime.now().isoformat() } @app.exception_handler(Exception) async def general_exception_handler(request, exc): logger.exception("Unhandled exception occurred") return { "error": "Internal server error", "message": str(exc), "timestamp": datetime.now().isoformat() } if __name__ == "__main__": import uvicorn logger.info("Starting MCP Weather Server...") uvicorn.run( app, host="0.0.0.0", port=int(os.getenv("PORT", 8000)), log_level=LOG_LEVEL.lower() )