Spaces:
Sleeping
Sleeping
import re | |
import os | |
import time | |
import requests | |
import base64 | |
import asyncio | |
from datetime import datetime, timedelta | |
from bs4 import BeautifulSoup | |
from sqlalchemy import select | |
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form | |
from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse | |
import openai | |
# For sentiment analysis using TextBlob | |
from textblob import TextBlob | |
# SQLAlchemy Imports (Async) | |
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession | |
from sqlalchemy.orm import sessionmaker, declarative_base | |
from sqlalchemy import Column, Integer, String, DateTime, Text, Float | |
# Environment Variables | |
SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "default_fallback_value") | |
PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "default_fallback_value") | |
DATABASE_URL = os.getenv("DATABASE_URL", "default_fallback_value") | |
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "default_fallback_value") | |
openai.api_key = os.getenv("OPENAI_API_KEY", "default_fallback_value") | |
# WhatsApp Business API credentials (Cloud API) | |
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID", "default_value") | |
WHATSAPP_ACCESS_TOKEN = os.getenv("WHATSAPP_ACCESS_TOKEN", "default_value") | |
MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "default_value") | |
# --- Database Setup --- | |
Base = declarative_base() | |
class ChatHistory(Base): | |
__tablename__ = "chat_history" | |
id = Column(Integer, primary_key=True, index=True) | |
user_id = Column(String, index=True) | |
timestamp = Column(DateTime, default=datetime.utcnow) | |
direction = Column(String) # 'inbound' or 'outbound' | |
message = Column(Text) | |
class Order(Base): | |
__tablename__ = "orders" | |
id = Column(Integer, primary_key=True, index=True) | |
order_id = Column(String, unique=True, index=True) | |
user_id = Column(String, index=True) | |
pickup_location = Column(String) | |
dropoff_location = Column(String) | |
package_details = Column(String) | |
status = Column(String, default="Pending") | |
timestamp = Column(DateTime, default=datetime.utcnow) | |
class UserProfile(Base): | |
__tablename__ = "user_profiles" | |
id = Column(Integer, primary_key=True, index=True) | |
user_id = Column(String, unique=True, index=True) | |
phone_number = Column(String, unique=True, index=True, nullable=True) | |
name = Column(String, default="Valued Customer") | |
email = Column(String, default="[email protected]") | |
preferences = Column(Text, default="") | |
last_interaction = Column(DateTime, default=datetime.utcnow) | |
class SentimentLog(Base): | |
__tablename__ = "sentiment_logs" | |
id = Column(Integer, primary_key=True, index=True) | |
user_id = Column(String, index=True) | |
timestamp = Column(DateTime, default=datetime.utcnow) | |
sentiment_score = Column(Float) | |
message = Column(Text) | |
# Initialize Database | |
engine = create_async_engine( | |
DATABASE_URL, | |
echo=True, | |
connect_args={"ssl": ssl_context, "timeout": 30}, # increase timeout as needed | |
) | |
ssl_context = ssl.create_default_context(cafile="rds-combined-ca-bundle.pem") | |
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) | |
async def init_db(): | |
async with engine.begin() as conn: | |
await conn.run_sync(Base.metadata.create_all) | |
# --- Global In-Memory Stores --- | |
user_state = {} # { user_id: ConversationState } | |
conversation_context = {} # { user_id: [ { "timestamp": ..., "role": "user"/"bot", "message": ... }, ... ] } | |
# --- Conversation State Management --- | |
SESSION_TIMEOUT = timedelta(minutes=5) | |
class ConversationState: | |
def __init__(self): | |
self.flow = None # e.g., "track_order", "schedule_delivery" | |
self.step = 0 | |
self.data = {} | |
self.last_active = datetime.utcnow() | |
def update_last_active(self): | |
self.last_active = datetime.utcnow() | |
def is_expired(self): | |
return datetime.utcnow() - self.last_active > SESSION_TIMEOUT | |
def reset(self): | |
self.flow = None | |
self.step = 0 | |
self.data = {} | |
self.last_active = datetime.utcnow() | |
# --- Utility Functions --- | |
async def log_chat_to_db(user_id: str, direction: str, message: str): | |
async with async_session() as session: | |
entry = ChatHistory(user_id=user_id, direction=direction, message=message) | |
session.add(entry) | |
await session.commit() | |
async def log_sentiment(user_id: str, message: str, score: float): | |
async with async_session() as session: | |
entry = SentimentLog(user_id=user_id, sentiment_score=score, message=message) | |
session.add(entry) | |
await session.commit() | |
def analyze_sentiment(text: str) -> float: | |
blob = TextBlob(text) | |
return blob.sentiment.polarity | |
# --- Delivery Service UX Functions --- | |
def generate_main_menu() -> str: | |
"""Generate the main menu with quick reply options.""" | |
menu_text = "Hi there! 👋 Welcome to [Delivery Service Co.]. I’m here to help with your deliveries. What would you like to do today?\n\n" | |
menu_text += "1. Track an Order\n" | |
menu_text += "2. Schedule a Delivery\n" | |
menu_text += "3. FAQs & Support\n" | |
menu_text += "4. Talk to an Agent\n" | |
menu_text += "\nPlease reply with the number of your choice." | |
return menu_text | |
def generate_faq_menu() -> str: | |
"""Generate the FAQ menu with quick reply options.""" | |
faq_text = "What do you need help with? Choose a category below:\n\n" | |
faq_text += "1. Pricing & Fees\n" | |
faq_text += "2. Delivery Times\n" | |
faq_text += "3. Order Cancellations\n" | |
faq_text += "4. Other Questions\n" | |
faq_text += "\nPlease reply with the number of your choice." | |
return faq_text | |
def handle_faq_response(choice: str) -> str: | |
"""Provide detailed answers based on FAQ category.""" | |
if choice == "1": | |
return "Our pricing is based on distance, package size, and weight. For more details, visit [Link] or let me know if you have a specific question." | |
elif choice == "2": | |
return "Standard delivery times are between 9 AM and 6 PM. Express delivery is available for an additional fee." | |
elif choice == "3": | |
return "You can cancel your order up to 1 hour before the scheduled pickup time. Refunds are processed within 3-5 business days." | |
elif choice == "4": | |
return "Please type your question, and I’ll do my best to assist. If you’d like to speak with a live agent, just type 'agent'." | |
else: | |
return "I didn’t quite catch that. Please choose a valid option from the list." | |
async def track_order_flow(user_id: str, order_id: str) -> str: | |
"""Handle the order tracking flow.""" | |
async with async_session() as session: | |
result = await session.execute( | |
select(Order).where(Order.order_id == order_id) | |
) | |
order = result.scalars().first() | |
if order: | |
return f"Your order (ID: {order_id}) is currently {order.status} and is expected to arrive by {order.timestamp + timedelta(hours=2)}." | |
else: | |
return "Hmm, that order ID doesn’t seem right. Please check and try again or type 'help' for assistance." | |
async def schedule_delivery_flow(user_id: str, step: int, user_input: str = None) -> str: | |
"""Handle the delivery scheduling flow.""" | |
state = user_state.get(user_id, ConversationState()) | |
if step == 1: | |
state.flow = "schedule_delivery" | |
state.step = 1 | |
state.data = {} | |
user_state[user_id] = state | |
return "Great! Let’s schedule your delivery. Please share your pickup and drop-off locations. You can type in the addresses or share your location." | |
elif step == 2: | |
if user_input: | |
state.data["locations"] = user_input | |
state.step = 2 | |
return "Please provide details about your package (size, weight, and any special instructions)." | |
else: | |
return "I didn’t catch that. Please share your pickup and drop-off locations." | |
elif step == 3: | |
if user_input: | |
state.data["package_details"] = user_input | |
state.step = 3 | |
return "Thank you! Your delivery is scheduled. We will confirm the pickup time shortly. Would you like to receive updates via WhatsApp? (Yes/No)" | |
else: | |
return "Please provide package details (size, weight, and any special instructions)." | |
elif step == 4: | |
if user_input.lower() in ["yes", "y"]: | |
state.data["updates"] = True | |
state.reset() | |
return "Awesome! You’ll receive updates on your delivery. Type 'menu' to return to the main menu." | |
else: | |
state.data["updates"] = False | |
state.reset() | |
return "Got it. You won’t receive updates. Type 'menu' to return to the main menu." | |
# --- FastAPI Setup & Endpoints --- | |
app = FastAPI() | |
async def on_startup(): | |
await init_db() | |
async def chatbot_response(request: Request, background_tasks: BackgroundTasks): | |
data = await request.json() | |
user_id = data.get("user_id") | |
user_message = data.get("message", "").strip() | |
if not user_id: | |
raise HTTPException(status_code=400, detail="Missing user_id in payload.") | |
# Initialize conversation context if not present | |
if user_id not in conversation_context: | |
conversation_context[user_id] = [] | |
# Append the inbound message to the conversation context | |
conversation_context[user_id].append({ | |
"timestamp": datetime.utcnow().isoformat(), | |
"role": "user", | |
"message": user_message | |
}) | |
# Log the chat to the database | |
background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message) | |
# Handle main menu options | |
if user_message.lower() in ["menu", "hi", "hello"]: | |
response_text = generate_main_menu() | |
elif user_message == "1": # Track an Order | |
response_text = "Please enter your Order ID, or type 'help' if you need assistance." | |
elif user_message == "2": # Schedule a Delivery | |
response_text = await schedule_delivery_flow(user_id, step=1) | |
elif user_message == "3": # FAQs & Support | |
response_text = generate_faq_menu() | |
elif user_message == "4": # Talk to an Agent | |
response_text = "Please hold on while I connect you to one of our agents." | |
elif user_message.isdigit() and len(user_message) == 1 and user_message in ["1", "2", "3", "4"]: # FAQ Submenu | |
response_text = handle_faq_response(user_message) | |
elif "track" in user_message.lower(): # Order Tracking | |
order_id = user_message.replace("track", "").strip() | |
response_text = await track_order_flow(user_id, order_id) | |
else: | |
response_text = "I didn’t quite catch that. Please choose a valid option or type 'menu' to see the main menu." | |
# Log the outbound response | |
background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text) | |
conversation_context[user_id].append({ | |
"timestamp": datetime.utcnow().isoformat(), | |
"role": "bot", | |
"message": response_text | |
}) | |
return JSONResponse(content={"response": response_text}) | |
# --- Other Endpoints (Chat History, Order Details, User Profile, Analytics, Voice, Payment Callback) --- | |
async def get_chat_history(user_id: str): | |
async with async_session() as session: | |
result = await session.execute( | |
ChatHistory.__table__.select().where(ChatHistory.user_id == user_id) | |
) | |
history = result.fetchall() | |
return [dict(row) for row in history] | |
async def get_order(order_id: str): | |
async with async_session() as session: | |
result = await session.execute( | |
Order.__table__.select().where(Order.order_id == order_id) | |
) | |
order = result.fetchone() | |
if order: | |
return dict(order) | |
else: | |
raise HTTPException(status_code=404, detail="Order not found.") | |
async def get_user_profile(user_id: str): | |
profile = await get_or_create_user_profile(user_id) | |
return { | |
"user_id": profile.user_id, | |
"phone_number": profile.phone_number, | |
"name": profile.name, | |
"email": profile.email, | |
"preferences": profile.preferences, | |
"last_interaction": profile.last_interaction.isoformat() | |
} | |
async def get_analytics(): | |
async with async_session() as session: | |
msg_result = await session.execute(ChatHistory.__table__.count()) | |
total_messages = msg_result.scalar() or 0 | |
order_result = await session.execute(Order.__table__.count()) | |
total_orders = order_result.scalar() or 0 | |
sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs") | |
avg_sentiment = sentiment_result.scalar() or 0 | |
return { | |
"total_messages": total_messages, | |
"total_orders": total_orders, | |
"average_sentiment": avg_sentiment | |
} | |
async def process_voice(file: UploadFile = File(...)): | |
contents = await file.read() | |
simulated_text = "Simulated speech-to-text conversion result." | |
return {"transcription": simulated_text} | |
# --- Payment Callback Endpoint with Payment Tracking and Redirection --- | |
async def payment_callback(request: Request): | |
# GET: User redirection after payment | |
if request.method == "GET": | |
params = request.query_params | |
order_id = params.get("reference") | |
status = params.get("status", "Paid") | |
if not order_id: | |
raise HTTPException(status_code=400, detail="Missing order reference in callback.") | |
async with async_session() as session: | |
result = await session.execute( | |
Order.__table__.select().where(Order.order_id == order_id) | |
) | |
order = result.scalar_one_or_none() | |
if order: | |
order.status = status | |
await session.commit() | |
else: | |
raise HTTPException(status_code=404, detail="Order not found.") | |
# Notify management via WhatsApp about the payment update | |
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, | |
f"Payment Update:\nOrder ID: {order_id} is now {status}." | |
) | |
# Redirect user back to the chat interface (adjust URL as needed) | |
redirect_url = f"https://yourdomain.com/chat?order_id={order_id}&status=success" | |
return RedirectResponse(url=redirect_url) | |
# POST: Server-to-server callback from Paystack | |
else: | |
data = await request.json() | |
order_id = data.get("reference") | |
new_status = data.get("status", "Paid") | |
if not order_id: | |
raise HTTPException(status_code=400, detail="Missing order reference in callback.") | |
async with async_session() as session: | |
result = await session.execute( | |
Order.__table__.select().where(Order.order_id == order_id) | |
) | |
order = result.scalar_one_or_none() | |
if order: | |
order.status = new_status | |
await session.commit() | |
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, | |
f"Payment Update:\nOrder ID: {order_id} is now {new_status}." | |
) | |
return JSONResponse(content={"message": "Order updated successfully."}) | |
else: | |
raise HTTPException(status_code=404, detail="Order not found.") | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |