Spaces:
Sleeping
Sleeping
import re | |
import os | |
import time | |
import requests | |
import base64 | |
import asyncio | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Optional | |
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form | |
from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse | |
from fastapi.encoders import jsonable_encoder | |
from sqlalchemy import select, update, func | |
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession | |
from sqlalchemy.orm import sessionmaker, declarative_base | |
from sqlalchemy import Column, Integer, String, DateTime, Text, Float, Boolean | |
import logging | |
import hmac | |
import hashlib | |
import json | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger("delivery_service") | |
# -------------------- | |
# DATABASE SETUP | |
# -------------------- | |
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./test.db") | |
engine = create_async_engine(DATABASE_URL, echo=True) | |
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) | |
Base = declarative_base() | |
# -------------------- | |
# DATABASE MODELS | |
# -------------------- | |
class UserProfile(Base): | |
__tablename__ = "user_profiles" | |
id = Column(Integer, primary_key=True, index=True) | |
user_id = Column(String(36), unique=True, index=True) | |
phone_number = Column(String(20), nullable=True) | |
name = Column(String(100), nullable=True) | |
email = Column(String(100), nullable=True) | |
preferences = Column(Text, default="{}") | |
last_interaction = Column(DateTime, default=datetime.utcnow) | |
ux_mode = Column(String(20), default="default") | |
accessibility_prefs = Column(Text, default="{}") | |
current_order_status = Column(String(50), default="None") | |
class UXPreferences(Base): | |
__tablename__ = "ux_preferences" | |
user_id = Column(String(36), primary_key=True) | |
interaction_speed = Column(String(20), default="normal") | |
color_scheme = Column(String(20), default="light") | |
input_preference = Column(String(20), default="buttons") | |
class ChatHistory(Base): | |
__tablename__ = "chat_history" | |
id = Column(Integer, primary_key=True, index=True) | |
user_id = Column(String(36), index=True) | |
message = Column(Text) | |
timestamp = Column(DateTime, default=datetime.utcnow) | |
class Order(Base): | |
__tablename__ = "orders" | |
order_id = Column(String(36), primary_key=True, index=True) | |
user_id = Column(String(36), index=True) | |
status = Column(String(20), default="pending") | |
payment_reference = Column(String(100), nullable=True) | |
last_location = Column(Text, nullable=True) # Stored as a JSON string | |
class OrderTracking(Base): | |
__tablename__ = "order_tracking" | |
id = Column(Integer, primary_key=True, index=True) | |
order_id = Column(String(36), index=True) | |
status = Column(String(50)) | |
message = Column(Text) | |
timestamp = Column(DateTime, default=datetime.utcnow) | |
location = Column(Text, nullable=True) # Stored as a JSON string | |
# -------------------- | |
# INITIALIZATION | |
# -------------------- | |
async def init_db(): | |
async with engine.begin() as conn: | |
await conn.run_sync(Base.metadata.create_all) | |
logger.info("Database tables created.") | |
# -------------------- | |
# STATE MANAGEMENT | |
# -------------------- | |
class ConversationState: | |
def __init__(self): | |
self.flow: Optional[str] = None | |
self.step: int = 0 | |
self.data: Dict = {} | |
self.context: List[Dict] = [] | |
self.last_active: datetime = datetime.utcnow() | |
def update_last_active(self): | |
self.last_active = datetime.utcnow() | |
def add_context(self, role: str, message: str): | |
self.context.append({ | |
"timestamp": datetime.utcnow(), | |
"role": role, | |
"message": message | |
}) | |
# Keep only the last 5 messages | |
if len(self.context) > 5: | |
self.context = self.context[-5:] | |
# Global state dictionary | |
user_state: Dict[str, ConversationState] = {} | |
# -------------------- | |
# CORE UTILITIES | |
# -------------------- | |
async def update_user_last_interaction(user_id: str): | |
async with async_session() as session: | |
await session.execute( | |
update(UserProfile) | |
.where(UserProfile.user_id == user_id) | |
.values(last_interaction=datetime.utcnow()) | |
) | |
await session.commit() | |
async def get_or_create_user_profile(user_id: str, phone: str = None) -> UserProfile: | |
async with async_session() as session: | |
result = await session.execute( | |
select(UserProfile).where(UserProfile.user_id == user_id) | |
) | |
profile = result.scalar_one_or_none() | |
if not profile: | |
profile = UserProfile( | |
user_id=user_id, | |
phone_number=phone, | |
last_interaction=datetime.utcnow() | |
) | |
session.add(profile) | |
await session.commit() | |
await session.refresh(profile) | |
return profile | |
# -------------------- | |
# ENHANCED UX FEATURES | |
# -------------------- | |
class DeliveryUXManager: | |
async def generate_response_template(user_id: str) -> Dict: | |
"""Generate a personalized response structure.""" | |
profile = await get_or_create_user_profile(user_id) | |
return { | |
"meta": { | |
"ux_mode": profile.ux_mode, | |
"color_scheme": "light", | |
"interaction_mode": "text" | |
}, | |
"content": { | |
"text": "", | |
"quick_replies": [], | |
"carousel": None, | |
"status_overlay": None | |
} | |
} | |
async def handle_ux_preferences(user_id: str, preference: str): | |
"""Update UX preferences.""" | |
async with async_session() as session: | |
prefs = await session.get(UXPreferences, user_id) | |
if not prefs: | |
prefs = UXPreferences(user_id=user_id) | |
session.add(prefs) | |
# Update specific preference based on input | |
if preference.startswith("color_"): | |
prefs.color_scheme = preference.split("_")[1] | |
elif preference.startswith("speed_"): | |
prefs.interaction_speed = preference.split("_")[1] | |
await session.commit() | |
# -------------------- | |
# HELPER FUNCTIONS FOR PAYMENT CALLBACK & TRACKING | |
# -------------------- | |
async def log_order_tracking(order_id: str, status: str, message: str): | |
async with async_session() as session: | |
tracking_entry = OrderTracking( | |
order_id=order_id, | |
status=status, | |
message=message | |
) | |
session.add(tracking_entry) | |
await session.commit() | |
async def send_whatsapp_message(number: str, message: str): | |
# Dummy implementation for sending WhatsApp messages | |
logger.info(f"Sending WhatsApp message to {number}: {message}") | |
async def update_user_order_status(user_id: str, order_id: str, status: str): | |
async with async_session() as session: | |
await session.execute( | |
update(UserProfile) | |
.where(UserProfile.user_id == user_id) | |
.values(current_order_status=status) | |
) | |
await session.commit() | |
async def get_order_details(order_id: str) -> Optional[Order]: | |
async with async_session() as session: | |
result = await session.execute( | |
select(Order).where(Order.order_id == order_id) | |
) | |
return result.scalar_one_or_none() | |
def calculate_eta(last_location: dict) -> str: | |
# Implement actual ETA calculation logic here. | |
return "30 minutes" | |
# -------------------- | |
# MAIN APPLICATION | |
# -------------------- | |
app = FastAPI(title="Delivery Service Chatbot") | |
async def on_startup(): | |
await init_db() | |
async def enhanced_chatbot_handler(request: Request, bg: BackgroundTasks): | |
data = await request.json() | |
user_id = data["user_id"] | |
message = data.get("message", "") | |
# Initialize conversation state if it doesn't exist | |
if user_id not in user_state: | |
user_state[user_id] = ConversationState() | |
state = user_state[user_id] | |
state.update_last_active() | |
# Dummy conversation handling logic | |
response = await DeliveryUXManager.generate_response_template(user_id) | |
response["content"]["text"] = f"Received your message: {message}" | |
state.add_context("user", message) | |
# Background task to update user's last interaction in the DB | |
bg.add_task(update_user_last_interaction, user_id) | |
return JSONResponse(response) | |
async def update_ux_preferences(request: Request): | |
data = await request.json() | |
await DeliveryUXManager.handle_ux_preferences( | |
data["user_id"], | |
data["preference"] | |
) | |
return {"status": "preferences updated"} | |
async def get_chat_history(user_id: str): | |
async with async_session() as session: | |
result = await session.execute( | |
select(ChatHistory).where(ChatHistory.user_id == user_id) | |
) | |
history = result.scalars().all() | |
return [jsonable_encoder({ | |
"user_id": entry.user_id, | |
"message": entry.message, | |
"timestamp": entry.timestamp.isoformat() | |
}) for entry in history] | |
async def get_order(order_id: str): | |
async with async_session() as session: | |
result = await session.execute( | |
select(Order).where(Order.order_id == order_id) | |
) | |
order = result.scalar_one_or_none() | |
if order: | |
return jsonable_encoder({ | |
"order_id": order.order_id, | |
"user_id": order.user_id, | |
"status": order.status, | |
"payment_reference": order.payment_reference, | |
"last_location": order.last_location | |
}) | |
else: | |
raise HTTPException(status_code=404, detail="Order not found.") | |
async def get_user_profile(user_id: str): | |
profile = await get_or_create_user_profile(user_id) | |
return { | |
"user_id": profile.user_id, | |
"phone_number": profile.phone_number, | |
"name": profile.name, | |
"email": profile.email, | |
"preferences": profile.preferences, | |
"last_interaction": profile.last_interaction.isoformat(), | |
"ux_mode": profile.ux_mode, | |
"current_order_status": profile.current_order_status | |
} | |
async def get_analytics(): | |
async with async_session() as session: | |
msg_result = await session.execute( | |
select(func.count()).select_from(ChatHistory) | |
) | |
total_messages = msg_result.scalar() or 0 | |
order_result = await session.execute( | |
select(func.count()).select_from(Order) | |
) | |
total_orders = order_result.scalar() or 0 | |
# This query assumes a sentiment_logs table exists with a sentiment_score column. | |
sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs") | |
avg_sentiment = sentiment_result.scalar() or 0 | |
return { | |
"total_messages": total_messages, | |
"total_orders": total_orders, | |
"average_sentiment": avg_sentiment | |
} | |
async def process_voice(file: UploadFile = File(...)): | |
contents = await file.read() | |
simulated_text = "Simulated speech-to-text conversion result." | |
return {"transcription": simulated_text} | |
async def payment_callback(request: Request): | |
PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "dummy_secret") | |
MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "+1234567890") | |
try: | |
if request.method == "POST": | |
# Verify Paystack signature | |
signature = request.headers.get("x-paystack-signature") | |
if not signature: | |
raise HTTPException(status_code=403, detail="Missing signature") | |
body = await request.body() | |
computed_sha = hmac.new( | |
PAYSTACK_SECRET_KEY.encode(), | |
body, | |
digestmod=hashlib.sha512 | |
).hexdigest() | |
if not hmac.compare_digest(computed_sha, signature): | |
raise HTTPException(status_code=403, detail="Invalid signature") | |
data = await request.json() | |
event = data.get("event") | |
order_id = data.get("data", {}).get("reference") | |
status = "Paid" if event == "charge.success" else "Failed" | |
elif request.method == "GET": | |
data = request.query_params | |
order_id = data.get("reference") | |
status = data.get("status", "Paid") | |
if not order_id: | |
raise HTTPException(status_code=400, detail="Missing order reference") | |
async with async_session() as session: | |
result = await session.execute( | |
select(Order) | |
.where(Order.order_id == order_id) | |
.with_for_update() | |
) | |
order = result.scalar_one_or_none() | |
if not order: | |
raise HTTPException(status_code=404, detail="Order not found") | |
valid_statuses = ["Paid", "Failed", "Pending"] | |
if status not in valid_statuses: | |
raise HTTPException(status_code=400, detail="Invalid status") | |
order.status = status | |
order.payment_reference = order_id | |
# Add tracking update | |
tracking_entry = OrderTracking( | |
order_id=order_id, | |
status="Payment Updated", | |
message=f"Payment status changed to {status}" | |
) | |
session.add(tracking_entry) | |
await session.commit() | |
# Send notifications concurrently | |
await asyncio.gather( | |
log_order_tracking(order_id, "Payment Updated", f"Payment status changed to {status}"), | |
send_whatsapp_message( | |
MANAGEMENT_WHATSAPP_NUMBER, | |
f"Payment Update: Order {order_id} - {status}" | |
), | |
update_user_order_status(order.user_id, order_id, status) | |
) | |
if request.method == "GET": | |
redirect_url = os.getenv("PAYMENT_REDIRECT_URL", "https://default-redirect.com") | |
return RedirectResponse( | |
url=redirect_url, | |
status_code=303 | |
) | |
return JSONResponse(content={"status": "success", "order_id": order_id}) | |
except HTTPException as he: | |
raise he | |
except Exception as e: | |
logger.error(f"Payment callback error: {str(e)}") | |
raise HTTPException(status_code=500, detail="Internal server error") | |
from fastapi_cache.decorator import cache | |
# Cache for 1 minute | |
async def get_order_tracking(order_id: str, page: int = 1, limit: int = 10): | |
""" | |
Get order tracking history with pagination. | |
""" | |
try: | |
async with async_session() as session: | |
order_result = await session.execute( | |
select(Order).where(Order.order_id == order_id) | |
) | |
if not order_result.scalar_one_or_none(): | |
raise HTTPException(status_code=404, detail="Order not found") | |
tracking_result = await session.execute( | |
select(OrderTracking) | |
.where(OrderTracking.order_id == order_id) | |
.order_by(OrderTracking.timestamp.desc()) | |
.offset((page-1)*limit) | |
.limit(limit) | |
) | |
tracking_updates = tracking_result.scalars().all() | |
if not tracking_updates: | |
return JSONResponse(content=[]) | |
response = [ | |
{ | |
"status": update.status, | |
"message": update.message, | |
"timestamp": update.timestamp.isoformat(), | |
"location": json.loads(update.location) if update.location else None | |
} | |
for update in tracking_updates | |
] | |
# Add estimated delivery time if order is shipped | |
order = await get_order_details(order_id) | |
if order and order.status.lower() == "shipped": | |
loc = json.loads(order.last_location) if order.last_location else {} | |
response[0]["estimated_delivery"] = calculate_eta(loc) | |
return JSONResponse(content=response) | |
except HTTPException as he: | |
raise he | |
except Exception as e: | |
logger.error(f"Tracking error: {str(e)}") | |
raise HTTPException(status_code=500, detail="Internal server error") | |
# -------------------- | |
# PROACTIVE FEATURES | |
# -------------------- | |
async def send_proactive_update(user_id: str, update_type: str): | |
"""Send unsolicited updates to users.""" | |
state = user_state.get(user_id) | |
if not state: | |
return # User not active | |
response = await DeliveryUXManager.generate_response_template(user_id) | |
if update_type == "delivery_eta": | |
response["content"]["text"] = "📦 Your package is arriving in 15 minutes!" | |
response["content"]["quick_replies"] = [ | |
{"title": "Track Live", "action": "track_live"}, | |
{"title": "Delay Delivery", "action": "delay_delivery"} | |
] | |
return response | |
# -------------------- | |
# ERROR HANDLING | |
# -------------------- | |
async def ux_error_handler(request: Request, exc: HTTPException): | |
return JSONResponse({ | |
"meta": {"error": True}, | |
"content": { | |
"text": f"⚠️ Error: {exc.detail}", | |
"quick_replies": [{"title": "Main Menu", "action": "reset"}] | |
} | |
}, status_code=exc.status_code) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |