File size: 4,337 Bytes
a59d348 2a02fdb d8f62d2 c1d7bb9 f6aff73 1eccd08 a59d348 8bfd5d0 a59d348 d899239 a59d348 2a02fdb d8f62d2 c1d7bb9 1eccd08 c1d7bb9 1eccd08 2a02fdb d8f62d2 f6aff73 ee4bae0 f6aff73 d8f62d2 f6aff73 d8f62d2 c1d7bb9 d8f62d2 8bfd5d0 c1d7bb9 d8f62d2 c1d7bb9 d8f62d2 c1d7bb9 d8f62d2 f6aff73 d899239 f6aff73 a59d348 d899239 a59d348 e598697 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
import logging
from fastapi import FastAPI
from txagent.txagent import TxAgent
from db.mongo import get_mongo_client
import asyncio
from pyfcm import FCMNotification
from datetime import datetime
import httpx
import os
# Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s")
logger = logging.getLogger("TxAgentAPI")
# Initialize agent synchronously with error handling
try:
agent = TxAgent(
model_name="mims-harvard/TxAgent-T1-Llama-3.1-8B",
rag_model_name="mims-harvard/ToolRAG-T1-GTE-Qwen2-1.5B",
enable_finish=True,
enable_rag=False,
force_finish=True,
enable_checker=True,
step_rag_num=4,
seed=42
)
agent.init_model()
agent.chat_prompt = (
"You are a clinical assistant AI. Analyze the patient's data and provide clear clinical recommendations."
)
logger.info("β
TxAgent initialized synchronously with chat_prompt: %s", agent.chat_prompt)
except Exception as e:
logger.error("β Failed to initialize TxAgent: %s", str(e))
raise
# Initialize collections synchronously
db = get_mongo_client()["cps_db"]
users_collection = db["users"]
patients_collection = db["patients"]
analysis_collection = db["patient_analysis_results"]
alerts_collection = db["clinical_alerts"]
notifications_collection = db["notifications"]
logger.info("π‘ Connected to MongoDB synchronously at %s", datetime.utcnow().isoformat())
# Retrieve secrets from environment variables (set in Hugging Face Space Secrets)
FCM_API_KEY = os.getenv("FCM_API_KEY")
SECRET_KEY = os.getenv("SECRET_KEY")
if not FCM_API_KEY or not SECRET_KEY:
logger.error("β Missing FCM_API_KEY or SECRET_KEY in environment variables")
raise ValueError("FCM_API_KEY and SECRET_KEY must be set in Hugging Face Space Secrets")
# FCM settings
push_service = FCMNotification(api_key=FCM_API_KEY)
# Auth Space URL
AUTH_SPACE_URL = "https://rocketfarmstudios-cps-api.hf.space"
async def send_push_notification(recipient_email, message):
try:
# Fetch the user's device token from the auth Space
async with httpx.AsyncClient() as client:
headers = {"Authorization": f"Bearer {create_access_token(recipient_email)}"}
response = await client.get(f"{AUTH_SPACE_URL}/me", headers=headers)
if response.status_code != 200:
logger.warning(f"Failed to fetch user {recipient_email} from auth Space: {response.text}")
return
user_data = response.json()
device_token = user_data.get("device_token")
if not device_token:
logger.warning(f"No device token found for {recipient_email} at {datetime.utcnow().isoformat()}")
return
# Send push notification
result = push_service.notify_single_device(
registration_id=device_token,
message_title="Risk Alert",
message_body=message,
sound="default",
priority="high"
)
logger.info(f"Push notification sent to {recipient_email} at {datetime.utcnow().isoformat()}: {result}")
except Exception as e:
logger.error(f"Failed to send push notification to {recipient_email} at {datetime.utcnow().isoformat()}: {str(e)}")
# JWT settings (minimal implementation for token creation)
from datetime import timedelta
import jwt
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(email: str):
to_encode = {"sub": email, "exp": datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.PyJWTError as e:
logger.error(f"Token decode error: {str(e)}")
return None
def setup_app(app: FastAPI):
@app.on_event("startup")
async def startup_event():
asyncio.create_task(analyze_all_patients())
async def analyze_all_patients():
from analysis import analyze_patient
patients = await patients_collection.find({}).to_list(length=None)
for patient in patients:
await analyze_patient(patient)
await asyncio.sleep(0.1) |