mcp-hf / a2a_agents /telegram_agent.py
elanuk
t
d2a1db5
raw
history blame
2.06 kB
import json
def create_telegram_message(alert_json: dict) -> dict:
"""
Creates a Telegram message with an inline keyboard from a structured alert JSON.
"""
message = (
f"🚨 *Weather Alert* 🚨\n\n"
f"πŸ“ *Location:* {alert_json['location']['village']}, {alert_json['location']['district']}\n"
f"🌾 *Crop:* {alert_json['crop']['name'].capitalize()} ({alert_json['crop']['stage']})\n"
f"⚠️ *Urgency:* {alert_json['alert']['urgency'].upper()}\n\n"
f"πŸ“ *Details:* {alert_json['alert']['message']}\n\n"
f"βœ… *Recommended Actions:*\n"
)
for action in alert_json['alert']['action_items']:
message += f"- {action.replace('_', ' ').capitalize()}\n"
keyboard = {
"inline_keyboard": [
[
{"text": "Acknowledge", "callback_data": f"ack_{alert_json['alert_id']}"},
{"text": "More Info", "callback_data": f"info_{alert_json['alert_id']}"}
]
]
}
return {
"text": message,
"reply_markup": keyboard
}
if __name__ == '__main__':
sample_alert = {
"alert_id": "BH_PAT_001_20250723",
"timestamp": "2025-07-23T06:00:00Z",
"location": {
"village": "Kumhrar",
"district": "Patna",
"state": "Bihar",
"coordinates": [25.5941, 85.1376]
},
"crop": {
"name": "rice",
"stage": "flowering",
"planted_estimate": "2025-06-15"
},
"alert": {
"type": "weather_warning",
"urgency": "high",
"message": "Heavy rainfall (40-60mm) expected in next 2 days. Delay fertilizer application. Ensure proper drainage.",
"action_items": ["delay_fertilizer", "check_drainage"],
"valid_until": "2025-07-25T18:00:00Z"
},
"weather": {
"forecast_days": 3,
"rain_probability": 85,
"expected_rainfall": "45mm"
}
}
telegram_message = create_telegram_message(sample_alert)
print(json.dumps(telegram_message, indent=2))