import re import os import time import requests import base64 import asyncio from datetime import datetime, timedelta from bs4 import BeautifulSoup from sqlalchemy import select from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse import openai # For sentiment analysis using TextBlob from textblob import TextBlob # SQLAlchemy Imports (Async) from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy import Column, Integer, String, DateTime, Text, Float # Environment Variables SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "default_fallback_value") PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "default_fallback_value") DATABASE_URL = os.getenv("DATABASE_URL", "default_fallback_value") NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "default_fallback_value") openai.api_key = os.getenv("OPENAI_API_KEY", "default_fallback_value") # WhatsApp Business API credentials (Cloud API) WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID", "default_value") WHATSAPP_ACCESS_TOKEN = os.getenv("WHATSAPP_ACCESS_TOKEN", "default_value") MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "default_value") # --- Database Setup --- Base = declarative_base() class ChatHistory(Base): __tablename__ = "chat_history" id = Column(Integer, primary_key=True, index=True) user_id = Column(String, index=True) timestamp = Column(DateTime, default=datetime.utcnow) direction = Column(String) # 'inbound' or 'outbound' message = Column(Text) class Order(Base): __tablename__ = "orders" id = Column(Integer, primary_key=True, index=True) order_id = Column(String, unique=True, index=True) user_id = Column(String, index=True) pickup_location = Column(String) dropoff_location = Column(String) package_details = Column(String) status = Column(String, default="Pending") timestamp = Column(DateTime, default=datetime.utcnow) class UserProfile(Base): __tablename__ = "user_profiles" id = Column(Integer, primary_key=True, index=True) user_id = Column(String, unique=True, index=True) phone_number = Column(String, unique=True, index=True, nullable=True) name = Column(String, default="Valued Customer") email = Column(String, default="unknown@example.com") preferences = Column(Text, default="") last_interaction = Column(DateTime, default=datetime.utcnow) class SentimentLog(Base): __tablename__ = "sentiment_logs" id = Column(Integer, primary_key=True, index=True) user_id = Column(String, index=True) timestamp = Column(DateTime, default=datetime.utcnow) sentiment_score = Column(Float) message = Column(Text) # Initialize Database engine = create_async_engine(DATABASE_URL, echo=True) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async def init_db(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # --- Global In-Memory Stores --- user_state = {} # { user_id: ConversationState } conversation_context = {} # { user_id: [ { "timestamp": ..., "role": "user"/"bot", "message": ... }, ... ] } # --- Conversation State Management --- SESSION_TIMEOUT = timedelta(minutes=5) class ConversationState: def __init__(self): self.flow = None # e.g., "track_order", "schedule_delivery" self.step = 0 self.data = {} self.last_active = datetime.utcnow() def update_last_active(self): self.last_active = datetime.utcnow() def is_expired(self): return datetime.utcnow() - self.last_active > SESSION_TIMEOUT def reset(self): self.flow = None self.step = 0 self.data = {} self.last_active = datetime.utcnow() # --- Utility Functions --- async def log_chat_to_db(user_id: str, direction: str, message: str): async with async_session() as session: entry = ChatHistory(user_id=user_id, direction=direction, message=message) session.add(entry) await session.commit() async def log_sentiment(user_id: str, message: str, score: float): async with async_session() as session: entry = SentimentLog(user_id=user_id, sentiment_score=score, message=message) session.add(entry) await session.commit() def analyze_sentiment(text: str) -> float: blob = TextBlob(text) return blob.sentiment.polarity # --- Delivery Service UX Functions --- def generate_main_menu() -> str: """Generate the main menu with quick reply options.""" menu_text = "Hi there! 👋 Welcome to [Delivery Service Co.]. I’m here to help with your deliveries. What would you like to do today?\n\n" menu_text += "1. Track an Order\n" menu_text += "2. Schedule a Delivery\n" menu_text += "3. FAQs & Support\n" menu_text += "4. Talk to an Agent\n" menu_text += "\nPlease reply with the number of your choice." return menu_text def generate_faq_menu() -> str: """Generate the FAQ menu with quick reply options.""" faq_text = "What do you need help with? Choose a category below:\n\n" faq_text += "1. Pricing & Fees\n" faq_text += "2. Delivery Times\n" faq_text += "3. Order Cancellations\n" faq_text += "4. Other Questions\n" faq_text += "\nPlease reply with the number of your choice." return faq_text def handle_faq_response(choice: str) -> str: """Provide detailed answers based on FAQ category.""" if choice == "1": return "Our pricing is based on distance, package size, and weight. For more details, visit [Link] or let me know if you have a specific question." elif choice == "2": return "Standard delivery times are between 9 AM and 6 PM. Express delivery is available for an additional fee." elif choice == "3": return "You can cancel your order up to 1 hour before the scheduled pickup time. Refunds are processed within 3-5 business days." elif choice == "4": return "Please type your question, and I’ll do my best to assist. If you’d like to speak with a live agent, just type 'agent'." else: return "I didn’t quite catch that. Please choose a valid option from the list." async def track_order_flow(user_id: str, order_id: str) -> str: """Handle the order tracking flow.""" async with async_session() as session: result = await session.execute( select(Order).where(Order.order_id == order_id) ) order = result.scalars().first() if order: return f"Your order (ID: {order_id}) is currently {order.status} and is expected to arrive by {order.timestamp + timedelta(hours=2)}." else: return "Hmm, that order ID doesn’t seem right. Please check and try again or type 'help' for assistance." async def schedule_delivery_flow(user_id: str, step: int, user_input: str = None) -> str: """Handle the delivery scheduling flow.""" state = user_state.get(user_id, ConversationState()) if step == 1: state.flow = "schedule_delivery" state.step = 1 state.data = {} user_state[user_id] = state return "Great! Let’s schedule your delivery. Please share your pickup and drop-off locations. You can type in the addresses or share your location." elif step == 2: if user_input: state.data["locations"] = user_input state.step = 2 return "Please provide details about your package (size, weight, and any special instructions)." else: return "I didn’t catch that. Please share your pickup and drop-off locations." elif step == 3: if user_input: state.data["package_details"] = user_input state.step = 3 return "Thank you! Your delivery is scheduled. We will confirm the pickup time shortly. Would you like to receive updates via WhatsApp? (Yes/No)" else: return "Please provide package details (size, weight, and any special instructions)." elif step == 4: if user_input.lower() in ["yes", "y"]: state.data["updates"] = True state.reset() return "Awesome! You’ll receive updates on your delivery. Type 'menu' to return to the main menu." else: state.data["updates"] = False state.reset() return "Got it. You won’t receive updates. Type 'menu' to return to the main menu." # --- FastAPI Setup & Endpoints --- app = FastAPI() @app.on_event("startup") async def on_startup(): await init_db() @app.post("/chatbot") async def chatbot_response(request: Request, background_tasks: BackgroundTasks): data = await request.json() user_id = data.get("user_id") user_message = data.get("message", "").strip() if not user_id: raise HTTPException(status_code=400, detail="Missing user_id in payload.") # Initialize conversation context if not present if user_id not in conversation_context: conversation_context[user_id] = [] # Append the inbound message to the conversation context conversation_context[user_id].append({ "timestamp": datetime.utcnow().isoformat(), "role": "user", "message": user_message }) # Log the chat to the database background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message) # Handle main menu options if user_message.lower() in ["menu", "hi", "hello"]: response_text = generate_main_menu() elif user_message == "1": # Track an Order response_text = "Please enter your Order ID, or type 'help' if you need assistance." elif user_message == "2": # Schedule a Delivery response_text = await schedule_delivery_flow(user_id, step=1) elif user_message == "3": # FAQs & Support response_text = generate_faq_menu() elif user_message == "4": # Talk to an Agent response_text = "Please hold on while I connect you to one of our agents." elif user_message.isdigit() and len(user_message) == 1 and user_message in ["1", "2", "3", "4"]: # FAQ Submenu response_text = handle_faq_response(user_message) elif "track" in user_message.lower(): # Order Tracking order_id = user_message.replace("track", "").strip() response_text = await track_order_flow(user_id, order_id) else: response_text = "I didn’t quite catch that. Please choose a valid option or type 'menu' to see the main menu." # Log the outbound response background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text) conversation_context[user_id].append({ "timestamp": datetime.utcnow().isoformat(), "role": "bot", "message": response_text }) return JSONResponse(content={"response": response_text}) # --- Other Endpoints (Chat History, Order Details, User Profile, Analytics, Voice, Payment Callback) --- @app.get("/chat_history/{user_id}") async def get_chat_history(user_id: str): async with async_session() as session: result = await session.execute( ChatHistory.__table__.select().where(ChatHistory.user_id == user_id) ) history = result.fetchall() return [dict(row) for row in history] @app.get("/order/{order_id}") async def get_order(order_id: str): async with async_session() as session: result = await session.execute( Order.__table__.select().where(Order.order_id == order_id) ) order = result.fetchone() if order: return dict(order) else: raise HTTPException(status_code=404, detail="Order not found.") @app.get("/user_profile/{user_id}") async def get_user_profile(user_id: str): profile = await get_or_create_user_profile(user_id) return { "user_id": profile.user_id, "phone_number": profile.phone_number, "name": profile.name, "email": profile.email, "preferences": profile.preferences, "last_interaction": profile.last_interaction.isoformat() } @app.get("/analytics") async def get_analytics(): async with async_session() as session: msg_result = await session.execute(ChatHistory.__table__.count()) total_messages = msg_result.scalar() or 0 order_result = await session.execute(Order.__table__.count()) total_orders = order_result.scalar() or 0 sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs") avg_sentiment = sentiment_result.scalar() or 0 return { "total_messages": total_messages, "total_orders": total_orders, "average_sentiment": avg_sentiment } @app.post("/voice") async def process_voice(file: UploadFile = File(...)): contents = await file.read() simulated_text = "Simulated speech-to-text conversion result." return {"transcription": simulated_text} # --- Payment Callback Endpoint with Payment Tracking and Redirection --- @app.api_route("/payment_callback", methods=["GET", "POST"]) async def payment_callback(request: Request): # GET: User redirection after payment if request.method == "GET": params = request.query_params order_id = params.get("reference") status = params.get("status", "Paid") if not order_id: raise HTTPException(status_code=400, detail="Missing order reference in callback.") async with async_session() as session: result = await session.execute( Order.__table__.select().where(Order.order_id == order_id) ) order = result.scalar_one_or_none() if order: order.status = status await session.commit() else: raise HTTPException(status_code=404, detail="Order not found.") # Notify management via WhatsApp about the payment update await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, f"Payment Update:\nOrder ID: {order_id} is now {status}." ) # Redirect user back to the chat interface (adjust URL as needed) redirect_url = f"https://yourdomain.com/chat?order_id={order_id}&status=success" return RedirectResponse(url=redirect_url) # POST: Server-to-server callback from Paystack else: data = await request.json() order_id = data.get("reference") new_status = data.get("status", "Paid") if not order_id: raise HTTPException(status_code=400, detail="Missing order reference in callback.") async with async_session() as session: result = await session.execute( Order.__table__.select().where(Order.order_id == order_id) ) order = result.scalar_one_or_none() if order: order.status = new_status await session.commit() await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, f"Payment Update:\nOrder ID: {order_id} is now {new_status}." ) return JSONResponse(content={"message": "Order updated successfully."}) else: raise HTTPException(status_code=404, detail="Order not found.") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)