mcp-hf / server.py
elanuk's picture
Update server.py
410b4b5 verified
raw
history blame
39 kB
import os
import logging
import random
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from dotenv import dotenv_values
import asyncio
from datetime import datetime, timedelta
import csv
from io import StringIO
from tools import open_meteo, tomorrow_io, google_weather, openweathermap, accuweather, openai_llm, geographic_tools, crop_calendar_tools, alert_generation_tools
from a2a_agents import sms_agent, whatsapp_agent, ussd_agent, ivr_agent, telegram_agent
from utils.weather_utils import get_tool_config
config = dotenv_values(".env")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
logging.basicConfig(level=LOG_LEVEL)
logger = logging.getLogger(__name__)
app = FastAPI()
# CORS middleware for frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["https://mcp-ui.vercel.app"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
expose_headers=["*"]
)
class MCPRequest(BaseModel):
tool: str
parameters: dict
class AlertRequest(BaseModel):
alert_json: dict
class WorkflowRequest(BaseModel):
state: str
district: str
def get_regional_crop_for_area(district: str, state: str):
"""Get typical crop for the region"""
if state.lower() == 'bihar':
district_crops = {
'patna': 'rice',
'gaya': 'wheat',
'bhagalpur': 'rice',
'muzaffarpur': 'sugarcane',
'darbhanga': 'rice',
'siwan': 'rice',
'begusarai': 'rice',
'katihar': 'maize',
}
return district_crops.get(district.lower(), 'rice')
return 'rice'
def get_current_crop_stage(crop: str):
"""Determine crop stage based on current date"""
current_month = datetime.now().month
if crop == 'rice':
if current_month in [6, 7]:
return 'planting'
elif current_month in [8, 9]:
return 'growing'
elif current_month in [10, 11]:
return 'flowering'
else:
return 'harvesting'
elif crop == 'wheat':
if current_month in [11, 12]:
return 'planting'
elif current_month in [1, 2]:
return 'growing'
elif current_month in [3, 4]:
return 'flowering'
else:
return 'harvesting'
elif crop == 'sugarcane':
if current_month in [2, 3, 4]:
return 'planting'
elif current_month in [5, 6, 7, 8]:
return 'growing'
elif current_month in [9, 10, 11]:
return 'maturing'
else:
return 'harvesting'
elif crop == 'maize':
if current_month in [6, 7]:
return 'planting'
elif current_month in [8, 9]:
return 'growing'
elif current_month in [10, 11]:
return 'flowering'
else:
return 'harvesting'
return 'growing'
async def generate_dynamic_alert(district: str, state: str):
"""Generate dynamic alert data using geographic functions and REAL weather data"""
try:
# Step 1: Get villages for the district using your geographic tools
villages_data = await geographic_tools.list_villages(state, district)
if "error" in villages_data:
raise Exception(f"District '{district}' not found in {state}")
# Step 2: Pick a random village from the actual list
available_villages = villages_data.get("villages", [])
if not available_villages:
raise Exception(f"No villages found for {district}")
selected_village = random.choice(available_villages)
logger.info(f"Selected village: {selected_village} from {len(available_villages)} villages")
# Step 3: Try to get coordinates for the selected village first, then district
location_coords = None
location_source = ""
# Try village coordinates first
try:
village_location = await geographic_tools.reverse_geocode(selected_village)
if "error" not in village_location and "lat" in village_location:
location_coords = [village_location["lat"], village_location["lng"]]
location_source = f"village_{selected_village}"
logger.info(f"Using village coordinates for {selected_village}: {location_coords}")
except Exception as e:
logger.warning(f"Village geocoding failed for {selected_village}: {e}")
# Fallback to district coordinates if village lookup failed
if not location_coords:
try:
district_location = await geographic_tools.reverse_geocode(district)
if "error" not in district_location and "lat" in district_location:
location_coords = [district_location["lat"], district_location["lng"]]
location_source = f"district_{district}"
logger.info(f"Using district coordinates for {district}: {location_coords}")
except Exception as e:
logger.warning(f"District geocoding failed for {district}: {e}")
# Final fallback - but this should rarely happen now
if not location_coords:
logger.warning(f"No coordinates found for {selected_village} or {district}, using default")
location_coords = [25.5941, 85.1376] # Patna fallback
location_source = "fallback_patna"
# Step 4: Generate regional crop and stage using crop calendar data
regional_crop = await get_regional_crop_for_area(district, state)
crop_stage = await get_current_crop_stage_dynamic(regional_crop, district)
# Step 5: GET REAL WEATHER DATA using the actual coordinates
try:
logger.info(f"Fetching weather for coordinates: {location_coords} (source: {location_source})")
current_weather_data = await open_meteo.get_current_weather(
latitude=location_coords[0],
longitude=location_coords[1]
)
forecast_data = await open_meteo.get_weather_forecast(
latitude=location_coords[0],
longitude=location_coords[1],
days=7
)
current_weather = current_weather_data.get('current_weather', {})
daily_forecast = forecast_data.get('daily', {})
current_temp = current_weather.get('temperature', 25)
current_windspeed = current_weather.get('windspeed', 10)
precipitation_list = daily_forecast.get('precipitation_sum', [0, 0, 0])
next_3_days_rain = sum(precipitation_list[:3]) if precipitation_list else 0
rain_probability = min(90, max(10, int(next_3_days_rain * 10))) if next_3_days_rain > 0 else 10
# Higher precipitation = higher humidity estimate
estimated_humidity = min(95, max(40, 60 + int(next_3_days_rain * 2)))
real_weather = {
"forecast_days": 3,
"rain_probability": rain_probability,
"expected_rainfall": f"{next_3_days_rain:.1f}mm",
"temperature": f"{current_temp:.1f}°C",
"humidity": f"{estimated_humidity}%",
"wind_speed": f"{current_windspeed:.1f} km/h",
"coordinates_source": location_source # Track where coords came from
}
# Step 6: Generate alert message based on actual weather conditions
if next_3_days_rain > 25:
alert_type = "heavy_rain_warning"
urgency = "high"
alert_message = f"Heavy rainfall ({next_3_days_rain:.1f}mm) expected in next 3 days near {selected_village}, {district}. Delay fertilizer application. Ensure proper drainage."
action_items = ["delay_fertilizer", "check_drainage", "monitor_crops", "prepare_harvest_protection"]
elif next_3_days_rain > 10:
alert_type = "moderate_rain_warning"
urgency = "medium"
alert_message = f"Moderate rainfall ({next_3_days_rain:.1f}mm) expected in next 3 days near {selected_village}, {district}. Monitor soil moisture levels."
action_items = ["monitor_soil", "check_drainage", "adjust_irrigation"]
elif next_3_days_rain < 2 and current_temp > 35:
alert_type = "heat_drought_warning"
urgency = "high"
alert_message = f"High temperature ({current_temp:.1f}°C) with minimal rainfall expected near {selected_village}, {district}. Increase irrigation frequency."
action_items = ["increase_irrigation", "mulch_crops", "monitor_plant_stress"]
elif current_temp < 10:
alert_type = "cold_warning"
urgency = "medium"
alert_message = f"Low temperature ({current_temp:.1f}°C) expected near {selected_village}, {district}. Protect crops from cold damage."
action_items = ["protect_crops", "cover_seedlings", "adjust_irrigation_timing"]
elif current_windspeed > 30:
alert_type = "high_wind_warning"
urgency = "medium"
alert_message = f"High winds ({current_windspeed:.1f} km/h) expected near {selected_village}, {district}. Secure crop supports and structures."
action_items = ["secure_supports", "check_structures", "monitor_damage"]
else:
alert_type = "weather_update"
urgency = "low"
alert_message = f"Normal weather conditions expected near {selected_village}, {district}. Temperature {current_temp:.1f}°C, rainfall {next_3_days_rain:.1f}mm."
action_items = ["routine_monitoring", "maintain_irrigation"]
logger.info(f"Real weather data retrieved for {selected_village}, {district}: {current_temp}°C, {next_3_days_rain:.1f}mm rain (coords: {location_coords})")
except Exception as weather_error:
logger.error(f"Failed to get real weather data for {selected_village}, {district}: {weather_error}")
raise Exception(f"Unable to retrieve current weather conditions for {selected_village}, {district}")
return {
"alert_id": f"{state.upper()[:2]}_{district.upper()[:3]}_{selected_village.upper()[:3]}_{datetime.now().strftime('%Y%m%d_%H%M')}",
"timestamp": datetime.now().isoformat() + "Z",
"location": {
"village": selected_village,
"district": district,
"state": state.capitalize(),
"coordinates": location_coords,
"coordinates_source": location_source,
"total_villages_in_district": len(available_villages)
},
"crop": {
"name": regional_crop,
"stage": crop_stage,
"planted_estimate": "2025-06-15" # You could make this dynamic too
},
"alert": {
"type": alert_type,
"urgency": urgency,
"message": alert_message,
"action_items": action_items,
"valid_until": (datetime.now() + timedelta(days=3)).isoformat() + "Z"
},
"weather": real_weather,
"data_source": "open_meteo_api_with_dynamic_location"
}
except Exception as e:
logger.error(f"Error generating dynamic alert for {district}, {state}: {e}")
raise Exception(f"Failed to generate weather alert for {district}: {str(e)}")
import random
from datetime import datetime, date
# Enhanced crop selection function using your crop calendar data
async def get_regional_crop_for_area(district: str, state: str):
"""Get typical crop for the region based on season and district - now fully dynamic"""
if state.lower() != 'bihar':
return 'rice' # fallback for other states
current_month = datetime.now().month
current_season = get_current_season(current_month)
# Get crops that are currently in season using your crop calendar tools
try:
seasonal_crops_data = await crop_calendar_tools.get_prominent_crops('bihar', current_season)
if "error" not in seasonal_crops_data:
seasonal_crops = seasonal_crops_data.get('crops', [])
else:
seasonal_crops = []
except Exception as e:
logger.warning(f"Failed to get seasonal crops: {e}")
seasonal_crops = []
# District-specific crop preferences (what's commonly grown in each district)
district_crop_preferences = {
'patna': {
'primary': ['rice', 'wheat', 'potato'],
'secondary': ['mustard', 'gram', 'barley'],
'specialty': ['sugarcane']
},
'gaya': {
'primary': ['wheat', 'rice', 'gram'],
'secondary': ['barley', 'lentil', 'mustard'],
'specialty': ['arhar']
},
'bhagalpur': {
'primary': ['rice', 'maize', 'wheat'],
'secondary': ['jute', 'urd', 'moong'],
'specialty': ['groundnut']
},
'muzaffarpur': {
'primary': ['sugarcane', 'rice', 'wheat'],
'secondary': ['potato', 'mustard'],
'specialty': ['lentil']
},
'darbhanga': {
'primary': ['rice', 'wheat', 'maize'],
'secondary': ['gram', 'arhar'],
'specialty': ['bajra']
},
'siwan': {
'primary': ['rice', 'wheat'],
'secondary': ['gram', 'lentil', 'pea'],
'specialty': ['mustard']
},
'begusarai': {
'primary': ['rice', 'wheat'],
'secondary': ['jute', 'mustard'],
'specialty': ['moong', 'urd']
},
'katihar': {
'primary': ['maize', 'rice'],
'secondary': ['jute', 'urd', 'moong'],
'specialty': ['jowar', 'bajra']
},
'vaishali': {
'primary': ['rice', 'wheat', 'sugarcane'],
'secondary': ['potato', 'gram'],
'specialty': ['mustard']
},
'madhubani': {
'primary': ['rice', 'wheat', 'maize'],
'secondary': ['gram', 'lentil'],
'specialty': ['arhar']
}
}
# Get district preferences or use default
district_prefs = district_crop_preferences.get(district.lower(), {
'primary': ['rice', 'wheat'],
'secondary': ['gram', 'mustard'],
'specialty': ['maize']
})
# Combine all possible crops for this district
all_district_crops = (district_prefs.get('primary', []) +
district_prefs.get('secondary', []) +
district_prefs.get('specialty', []))
# Find crops that are both seasonal AND grown in this district
suitable_crops = []
if seasonal_crops:
suitable_crops = [crop for crop in all_district_crops if crop in seasonal_crops]
# If no seasonal match, use district preferences with seasonal weighting
if not suitable_crops:
if current_season == 'kharif':
# Monsoon crops preference
kharif_crops = ['rice', 'maize', 'arhar', 'moong', 'urd', 'jowar', 'bajra', 'groundnut', 'soybean']
suitable_crops = [crop for crop in all_district_crops if crop in kharif_crops]
elif current_season == 'rabi':
# Winter crops preference
rabi_crops = ['wheat', 'barley', 'gram', 'lentil', 'pea', 'mustard', 'linseed', 'potato']
suitable_crops = [crop for crop in all_district_crops if crop in rabi_crops]
elif current_season == 'zaid':
# Summer crops preference
zaid_crops = ['maize', 'moong', 'urd', 'watermelon', 'cucumber']
suitable_crops = [crop for crop in all_district_crops if crop in zaid_crops]
# If still no match, fall back to district primary crops
if not suitable_crops:
suitable_crops = district_prefs.get('primary', ['rice'])
# Weight selection based on crop category (primary crops more likely)
weighted_crops = []
for crop in suitable_crops:
if crop in district_prefs.get('primary', []):
weighted_crops.extend([crop] * 5) # 5x weight for primary crops
elif crop in district_prefs.get('secondary', []):
weighted_crops.extend([crop] * 3) # 3x weight for secondary crops
else:
weighted_crops.extend([crop] * 1) # 1x weight for specialty crops
selected_crop = random.choice(weighted_crops) if weighted_crops else 'rice'
logger.info(f"Selected crop: {selected_crop} for {district} in {current_season} season from options: {suitable_crops}")
return selected_crop
async def get_current_crop_stage_dynamic(crop: str, district: str = None):
"""Determine crop stage based on current date and crop calendar - now more accurate"""
try:
# Get crop calendar information
crop_info = await crop_calendar_tools.get_crop_calendar('bihar', crop)
if "error" in crop_info:
# Fallback to the old static method
return get_current_crop_stage_static(crop)
# Parse planting and harvesting periods
planting_period = crop_info.get('planting', '')
season = crop_info.get('season', '')
stages = crop_info.get('stages', [])
current_month = datetime.now().month
current_date = date.today()
# Estimate planting date based on season and current month
estimated_plant_date = estimate_planting_date(crop, season, planting_period, current_month)
if estimated_plant_date:
# Use the crop calendar function to estimate stage
try:
stage_data = await crop_calendar_tools.estimate_crop_stage(
crop,
estimated_plant_date.isoformat(),
current_date.isoformat()
)
if "error" not in stage_data:
stage = stage_data.get('stage', stages[0] if stages else 'Growing')
logger.info(f"Dynamic stage calculation for {crop}: {stage} (planted ~{estimated_plant_date})")
return stage
except Exception as e:
logger.warning(f"Error in dynamic stage calculation: {e}")
# Fallback to month-based estimation
return estimate_stage_by_month(crop, current_month, stages)
except Exception as e:
logger.error(f"Error in dynamic crop stage calculation: {e}")
return get_current_crop_stage_static(crop)
def get_current_season(month: int):
"""Determine current agricultural season"""
if month in [6, 7, 8, 9]: # June to September
return 'kharif'
elif month in [10, 11, 12, 1, 2, 3]: # October to March
return 'rabi'
else: # April, May
return 'zaid'
def estimate_planting_date(crop: str, season: str, planting_period: str, current_month: int):
"""Estimate when the crop was likely planted based on season and current month"""
from datetime import date, timedelta
current_year = datetime.now().year
try:
if 'june-july' in planting_period.lower() or 'june' in planting_period.lower():
if current_month >= 6:
return date(current_year, 6, 15) # Mid June this year
else:
return date(current_year - 1, 6, 15) # Mid June last year
elif 'november-december' in planting_period.lower() or 'november' in planting_period.lower():
if current_month >= 11:
return date(current_year, 11, 15) # Mid November this year
elif current_month <= 4:
return date(current_year - 1, 11, 15) # Mid November last year
else:
return date(current_year, 11, 15) # Will be planted this November
elif 'october-november' in planting_period.lower() or 'october' in planting_period.lower():
if current_month >= 10:
return date(current_year, 10, 15)
elif current_month <= 4:
return date(current_year - 1, 10, 15)
else:
return date(current_year, 10, 15)
elif 'march-april' in planting_period.lower() or 'march' in planting_period.lower():
if current_month >= 3 and current_month <= 8:
return date(current_year, 3, 15)
else:
return date(current_year - 1, 3, 15)
except Exception as e:
logger.warning(f"Error estimating planting date: {e}")
return None
def estimate_stage_by_month(crop: str, current_month: int, stages: list):
"""Estimate crop stage based on current month and crop type"""
if not stages:
return 'Growing'
# Month-based stage mapping for common crops
stage_mappings = {
'rice': {
6: 0, 7: 1, 8: 2, 9: 3, 10: 4, 11: 5, 12: 6, 1: 7, 2: 8, 3: 8, 4: 8, 5: 8
},
'wheat': {
11: 0, 12: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 7, 7: 8, 8: 8, 9: 8, 10: 8
},
'maize': {
6: 0, 7: 1, 8: 2, 9: 3, 10: 4, 11: 5, 12: 6, 1: 7, 2: 7, 3: 0, 4: 1, 5: 2 # Dual season
}
}
crop_mapping = stage_mappings.get(crop, {})
stage_index = crop_mapping.get(current_month, 2) # Default to middle stage
stage_index = min(stage_index, len(stages) - 1)
return stages[stage_index] if stage_index < len(stages) else stages[-1]
def get_current_crop_stage_static(crop: str):
"""Original static crop stage function as fallback"""
current_month = datetime.now().month
if crop == 'rice':
if current_month in [6, 7]:
return 'Transplanting'
elif current_month in [8, 9]:
return 'Vegetative'
elif current_month in [10, 11]:
return 'Flowering'
else:
return 'Maturity'
elif crop == 'wheat':
if current_month in [11, 12]:
return 'Sowing'
elif current_month in [1, 2]:
return 'Tillering'
elif current_month in [3, 4]:
return 'Flowering'
else:
return 'Harvesting'
elif crop == 'sugarcane':
if current_month in [2, 3, 4]:
return 'Planting'
elif current_month in [5, 6, 7, 8]:
return 'Vegetative'
elif current_month in [9, 10, 11]:
return 'Maturity'
else:
return 'Harvesting'
elif crop == 'maize':
if current_month in [6, 7]:
return 'Sowing'
elif current_month in [8, 9]:
return 'Vegetative'
elif current_month in [10, 11]:
return 'Grain Filling'
else:
return 'Harvesting'
return 'Growing'
@app.get("/")
async def root():
return {"message": "MCP Weather Server is running"}
@app.get("/api/health")
async def health_check():
return {"status": "healthy", "message": "API is working"}
# workflow endpoint for frontend
@app.post("/api/run-workflow")
async def run_workflow(request: WorkflowRequest):
logger.info(f"Received workflow request: {request.state}, {request.district}")
# Initialize variables
sample_alert = None
csv_content = ""
try:
# Create comprehensive workflow response
workflow_results = []
# Add workflow header
workflow_results.append(f"Workflow for {request.district}, {request.state}")
workflow_results.append("=" * 50)
# weather data collection
workflow_results.append("\n🌤️ Weather Data Collection")
workflow_results.append("-" * 30)
workflow_results.append("📡 Fetching real-time weather data...")
try:
sample_alert = await generate_dynamic_alert(request.district, request.state)
workflow_results.append("✅ Current weather data retrieved from Open-Meteo API")
workflow_results.append("✅ 7-day forecast collected")
workflow_results.append("✅ Agricultural indices calculated")
except Exception as weather_error:
logger.error(f"Weather data error: {weather_error}")
workflow_results.append(f"❌ Weather data collection failed: {str(weather_error)}")
return {
"message": "\n".join(workflow_results),
"status": "error",
"csv": "",
"error": f"Unable to retrieve weather data: {str(weather_error)}"
}
if not sample_alert:
return {
"message": "Failed to generate alert data",
"status": "error",
"csv": "",
"error": "Alert generation failed"
}
# Alert generation
workflow_results.append("\n🚨 Alert Generation")
workflow_results.append("-" * 30)
workflow_results.append("✅ Weather alerts generated")
workflow_results.append(f" - Data Source: {sample_alert.get('data_source', 'API')}")
workflow_results.append(f" - Alert Type: {sample_alert['alert']['type']}")
workflow_results.append(f" - Severity: {sample_alert['alert']['urgency']}")
workflow_results.append(f" - Village: {sample_alert['location']['village']}")
workflow_results.append(f" - Coordinates: {sample_alert['location']['coordinates']}")
workflow_results.append(f" - Crop: {sample_alert['crop']['name']} ({sample_alert['crop']['stage']})")
workflow_results.append(f" - Temperature: {sample_alert['weather']['temperature']}")
workflow_results.append(f" - Humidity: {sample_alert['weather']['humidity']}")
workflow_results.append(f" - Expected Rainfall: {sample_alert['weather']['expected_rainfall']}")
workflow_results.append(f" - Rain Probability: {sample_alert['weather']['rain_probability']}%")
# WhatsApp Agent Response
workflow_results.append("\n📱 WhatsApp Agent Response")
workflow_results.append("-" * 30)
try:
whatsapp_message = whatsapp_agent.create_whatsapp_message(sample_alert)
workflow_results.append(f"✅ Message created successfully")
workflow_results.append(f"Text: {whatsapp_message.get('text', 'N/A')}")
if 'buttons' in whatsapp_message:
workflow_results.append(f"Buttons: {len(whatsapp_message['buttons'])} button(s)")
except Exception as e:
workflow_results.append(f"❌ Error: {str(e)}")
# SMS Agent Response
workflow_results.append("\n📱 SMS Agent Response")
workflow_results.append("-" * 30)
try:
sms_message = sms_agent.create_sms_message(sample_alert)
workflow_results.append(f"✅ SMS created successfully")
workflow_results.append(f"Content: {str(sms_message)}")
except Exception as e:
workflow_results.append(f"❌ Error: {str(e)}")
# USSD Agent Response
workflow_results.append("\n📞 USSD Agent Response")
workflow_results.append("-" * 30)
try:
ussd_menu = ussd_agent.create_ussd_menu(sample_alert)
workflow_results.append(f"✅ USSD menu created successfully")
workflow_results.append(f"Menu: {str(ussd_menu)}")
except Exception as e:
workflow_results.append(f"❌ Error: {str(e)}")
# IVR Agent Response
workflow_results.append("\n🎙️ IVR Agent Response")
workflow_results.append("-" * 30)
try:
ivr_script = ivr_agent.create_ivr_script(sample_alert)
workflow_results.append(f"✅ IVR script created successfully")
workflow_results.append(f"Script: {str(ivr_script)}")
except Exception as e:
workflow_results.append(f"❌ Error: {str(e)}")
# Telegram Agent Response
workflow_results.append("\n🤖 Telegram Agent Response")
workflow_results.append("-" * 30)
try:
telegram_message = telegram_agent.create_telegram_message(sample_alert)
workflow_results.append(f"✅ Telegram message created successfully")
workflow_results.append(f"Content: {str(telegram_message)}")
except Exception as e:
workflow_results.append(f"❌ Error: {str(e)}")
# Summary
workflow_results.append("\n✅ Workflow Summary")
workflow_results.append("-" * 30)
workflow_results.append("Workflow execution completed with REAL weather data")
workflow_results.append(f"Location: {request.district}, {request.state}")
workflow_results.append(f"Weather Source: Open-Meteo API")
workflow_results.append(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Join all results into a single formatted string
formatted_output = "\n".join(workflow_results)
# Generate CSV
try:
csv_buffer = StringIO()
writer = csv.writer(csv_buffer)
# Write headers
headers = ["weather data", "whatsapp", "sms", "ussd", "ivr", "telegram"]
writer.writerow(headers)
# Prepare weather data as a single string with line breaks
weather_info = "\n".join([
f" - Data Source: {sample_alert.get('data_source', 'API')}",
f" - Alert Type: {sample_alert['alert']['type']}",
f" - Severity: {sample_alert['alert']['urgency']}",
f" - Village: {sample_alert['location']['village']}",
f" - Coordinates: {sample_alert['location']['coordinates']}",
f" - Crop: {sample_alert['crop']['name']} ({sample_alert['crop']['stage']})",
f" - Temperature: {sample_alert['weather']['temperature']}",
f" - Humidity: {sample_alert['weather']['humidity']}",
f" - Expected Rainfall: {sample_alert['weather']['expected_rainfall']}",
f" - Rain Probability: {sample_alert['weather']['rain_probability']}%"
])
weather_data = [weather_info]
# Extract agent outputs only (no status messages)
whatsapp_data = []
sms_data = []
ussd_data = []
ivr_data = []
telegram_data = []
# Get WhatsApp message
try:
whatsapp_message = whatsapp_agent.create_whatsapp_message(sample_alert)
whatsapp_text = whatsapp_message.get('text', 'N/A')
whatsapp_data.append(whatsapp_text)
if 'buttons' in whatsapp_message and whatsapp_message['buttons']:
whatsapp_data.append(f"Buttons: {whatsapp_message['buttons']}")
except Exception as e:
whatsapp_data.append(f"Error: {str(e)}")
# Get SMS message
try:
sms_message = sms_agent.create_sms_message(sample_alert)
sms_data.append(str(sms_message))
except Exception as e:
sms_data.append(f"Error: {str(e)}")
# Get USSD menu
try:
ussd_menu = ussd_agent.create_ussd_menu(sample_alert)
ussd_data.append(str(ussd_menu))
except Exception as e:
ussd_data.append(f"Error: {str(e)}")
# Get IVR script
try:
ivr_script = ivr_agent.create_ivr_script(sample_alert)
ivr_data.append(str(ivr_script))
except Exception as e:
ivr_data.append(f"Error: {str(e)}")
# Get Telegram message
try:
telegram_message = telegram_agent.create_telegram_message(sample_alert)
telegram_data.append(str(telegram_message))
except Exception as e:
telegram_data.append(f"Error: {str(e)}")
# Find the maximum number of rows needed
max_rows = max(
len(weather_data),
len(whatsapp_data) if whatsapp_data else 1,
len(sms_data) if sms_data else 1,
len(ussd_data) if ussd_data else 1,
len(ivr_data) if ivr_data else 1,
len(telegram_data) if telegram_data else 1
)
# Write data rows
for i in range(max_rows):
row = [
weather_data[i] if i < len(weather_data) else "",
whatsapp_data[i] if i < len(whatsapp_data) else "",
sms_data[i] if i < len(sms_data) else "",
ussd_data[i] if i < len(ussd_data) else "",
ivr_data[i] if i < len(ivr_data) else "",
telegram_data[i] if i < len(telegram_data) else ""
]
writer.writerow(row)
csv_content = csv_buffer.getvalue()
logger.info("CSV content generated successfully")
except Exception as csv_error:
logger.error(f"Error generating CSV: {csv_error}")
csv_content = f"Error generating CSV: {str(csv_error)}"
logger.info(f"Successfully completed workflow for {request.district}, {request.state}")
return {
"message": formatted_output,
"status": "success",
"csv": csv_content,
"raw_data": {
"state": request.state,
"district": request.district,
"alert_data": sample_alert
}
}
except Exception as e:
logger.exception(f"Error in workflow for {request.district}, {request.state}")
return {
"message": f"Error running workflow: {str(e)}",
"status": "error",
"csv": "",
"error": str(e)
}
@app.post("/mcp")
async def mcp_endpoint(request: MCPRequest):
logger.info(f"Received request for tool: {request.tool}")
tool_config = get_tool_config(request.tool)
if not tool_config:
logger.error(f"Tool not found: {request.tool}")
raise HTTPException(status_code=404, detail="Tool not found")
try:
if tool_config["module"] == "open_meteo":
result = await getattr(open_meteo, request.tool)(**request.parameters)
elif tool_config["module"] == "tomorrow_io":
api_key = config.get("TOMORROW_IO_API_KEY")
result = await getattr(tomorrow_io, request.tool)(**request.parameters, api_key=api_key)
elif tool_config["module"] == "google_weather":
api_key = config.get("GOOGLE_WEATHER_API_KEY")
result = await getattr(google_weather, request.tool)(**request.parameters, api_key=api_key)
elif tool_config["module"] == "openweathermap":
api_key = config.get("OPENWEATHERMAP_API_KEY")
result = await getattr(openweathermap, request.tool)(**request.parameters, api_key=api_key)
elif tool_config["module"] == "accuweather":
api_key = config.get("ACCUWEATHER_API_KEY")
result = await getattr(accuweather, request.tool)(**request.parameters, api_key=api_key)
elif tool_config["module"] == "openai_llm":
api_key = config.get("OPENAI_API_KEY")
result = await getattr(openai_llm, request.tool)(**request.parameters, api_key=api_key)
elif tool_config["module"] == "geographic_tools":
result = await getattr(geographic_tools, request.tool)(**request.parameters)
elif tool_config["module"] == "crop_calendar_tools":
result = await getattr(crop_calendar_tools, request.tool)(**request.parameters)
elif tool_config["module"] == "alert_generation_tools":
api_key = config.get("OPENAI_API_KEY")
result = await getattr(alert_generation_tools, request.tool)(**request.parameters, api_key=api_key)
else:
raise HTTPException(status_code=500, detail="Invalid tool module")
logger.info(f"Successfully executed tool: {request.tool}")
return result
except Exception as e:
logger.exception(f"Error executing tool: {request.tool}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/a2a/sms")
async def a2a_sms_endpoint(request: AlertRequest):
return {"message": sms_agent.create_sms_message(request.alert_json)}
@app.post("/a2a/whatsapp")
async def a2a_whatsapp_endpoint(request: AlertRequest):
return whatsapp_agent.create_whatsapp_message(request.alert_json)
@app.post("/a2a/ussd")
async def a2a_ussd_endpoint(request: AlertRequest):
return {"menu": ussd_agent.create_ussd_menu(request.alert_json)}
@app.post("/a2a/ivr")
async def a2a_ivr_endpoint(request: AlertRequest):
return {"script": ivr_agent.create_ivr_script(request.alert_json)}
@app.post("/a2a/telegram")
async def a2a_telegram_endpoint(request: AlertRequest):
return telegram_agent.create_telegram_message(request.alert_json)
# for smithery + context7
@app.post("/mcp")
async def mcp_rpc_handler(request: dict):
method = request.get("method")
params = request.get("params", {})
tool_name = params.get("tool_name")
arguments = params.get("arguments", {})
req_id = request.get("id")
# Handle run_workflow tool
if method == "call_tool" and tool_name == "run_workflow":
state = arguments.get("state")
district = arguments.get("district")
result = await run_workflow(WorkflowRequest(state=state, district=district))
return {"jsonrpc": "2.0", "result": result, "id": req_id}
# Handle other tools dynamically via your tool config
if method == "call_tool":
try:
result = await mcp_endpoint(MCPRequest(tool=tool_name, parameters=arguments))
return {"jsonrpc": "2.0", "result": result, "id": req_id}
except Exception as e:
return {"jsonrpc": "2.0", "error": {"code": -32000, "message": str(e)}, "id": req_id}
return {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Unknown method"}, "id": req_id}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)