MichAngel-558 / app.py
Fred808's picture
Update app.py
6effb06 verified
raw
history blame
15.8 kB
import re
import os
import time
import requests
import base64
import asyncio
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from sqlalchemy import select
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form
from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse
import openai
# For sentiment analysis using TextBlob
from textblob import TextBlob
# SQLAlchemy Imports (Async)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Text, Float
# Environment Variables
SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "default_fallback_value")
PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "default_fallback_value")
DATABASE_URL = os.getenv("DATABASE_URL", "default_fallback_value")
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "default_fallback_value")
openai.api_key = os.getenv("OPENAI_API_KEY", "default_fallback_value")
# WhatsApp Business API credentials (Cloud API)
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID", "default_value")
WHATSAPP_ACCESS_TOKEN = os.getenv("WHATSAPP_ACCESS_TOKEN", "default_value")
MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "default_value")
# --- Database Setup ---
Base = declarative_base()
class ChatHistory(Base):
__tablename__ = "chat_history"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
direction = Column(String) # 'inbound' or 'outbound'
message = Column(Text)
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(String, unique=True, index=True)
user_id = Column(String, index=True)
pickup_location = Column(String)
dropoff_location = Column(String)
package_details = Column(String)
status = Column(String, default="Pending")
timestamp = Column(DateTime, default=datetime.utcnow)
class UserProfile(Base):
__tablename__ = "user_profiles"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, unique=True, index=True)
phone_number = Column(String, unique=True, index=True, nullable=True)
name = Column(String, default="Valued Customer")
email = Column(String, default="[email protected]")
preferences = Column(Text, default="")
last_interaction = Column(DateTime, default=datetime.utcnow)
class SentimentLog(Base):
__tablename__ = "sentiment_logs"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
sentiment_score = Column(Float)
message = Column(Text)
# Initialize Database
engine = create_async_engine(
DATABASE_URL,
echo=True,
connect_args={"ssl": ssl_context, "timeout": 30}, # increase timeout as needed
)
ssl_context = ssl.create_default_context(cafile="rds-combined-ca-bundle.pem")
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# --- Global In-Memory Stores ---
user_state = {} # { user_id: ConversationState }
conversation_context = {} # { user_id: [ { "timestamp": ..., "role": "user"/"bot", "message": ... }, ... ] }
# --- Conversation State Management ---
SESSION_TIMEOUT = timedelta(minutes=5)
class ConversationState:
def __init__(self):
self.flow = None # e.g., "track_order", "schedule_delivery"
self.step = 0
self.data = {}
self.last_active = datetime.utcnow()
def update_last_active(self):
self.last_active = datetime.utcnow()
def is_expired(self):
return datetime.utcnow() - self.last_active > SESSION_TIMEOUT
def reset(self):
self.flow = None
self.step = 0
self.data = {}
self.last_active = datetime.utcnow()
# --- Utility Functions ---
async def log_chat_to_db(user_id: str, direction: str, message: str):
async with async_session() as session:
entry = ChatHistory(user_id=user_id, direction=direction, message=message)
session.add(entry)
await session.commit()
async def log_sentiment(user_id: str, message: str, score: float):
async with async_session() as session:
entry = SentimentLog(user_id=user_id, sentiment_score=score, message=message)
session.add(entry)
await session.commit()
def analyze_sentiment(text: str) -> float:
blob = TextBlob(text)
return blob.sentiment.polarity
# --- Delivery Service UX Functions ---
def generate_main_menu() -> str:
"""Generate the main menu with quick reply options."""
menu_text = "Hi there! 👋 Welcome to [Delivery Service Co.]. I’m here to help with your deliveries. What would you like to do today?\n\n"
menu_text += "1. Track an Order\n"
menu_text += "2. Schedule a Delivery\n"
menu_text += "3. FAQs & Support\n"
menu_text += "4. Talk to an Agent\n"
menu_text += "\nPlease reply with the number of your choice."
return menu_text
def generate_faq_menu() -> str:
"""Generate the FAQ menu with quick reply options."""
faq_text = "What do you need help with? Choose a category below:\n\n"
faq_text += "1. Pricing & Fees\n"
faq_text += "2. Delivery Times\n"
faq_text += "3. Order Cancellations\n"
faq_text += "4. Other Questions\n"
faq_text += "\nPlease reply with the number of your choice."
return faq_text
def handle_faq_response(choice: str) -> str:
"""Provide detailed answers based on FAQ category."""
if choice == "1":
return "Our pricing is based on distance, package size, and weight. For more details, visit [Link] or let me know if you have a specific question."
elif choice == "2":
return "Standard delivery times are between 9 AM and 6 PM. Express delivery is available for an additional fee."
elif choice == "3":
return "You can cancel your order up to 1 hour before the scheduled pickup time. Refunds are processed within 3-5 business days."
elif choice == "4":
return "Please type your question, and I’ll do my best to assist. If you’d like to speak with a live agent, just type 'agent'."
else:
return "I didn’t quite catch that. Please choose a valid option from the list."
async def track_order_flow(user_id: str, order_id: str) -> str:
"""Handle the order tracking flow."""
async with async_session() as session:
result = await session.execute(
select(Order).where(Order.order_id == order_id)
)
order = result.scalars().first()
if order:
return f"Your order (ID: {order_id}) is currently {order.status} and is expected to arrive by {order.timestamp + timedelta(hours=2)}."
else:
return "Hmm, that order ID doesn’t seem right. Please check and try again or type 'help' for assistance."
async def schedule_delivery_flow(user_id: str, step: int, user_input: str = None) -> str:
"""Handle the delivery scheduling flow."""
state = user_state.get(user_id, ConversationState())
if step == 1:
state.flow = "schedule_delivery"
state.step = 1
state.data = {}
user_state[user_id] = state
return "Great! Let’s schedule your delivery. Please share your pickup and drop-off locations. You can type in the addresses or share your location."
elif step == 2:
if user_input:
state.data["locations"] = user_input
state.step = 2
return "Please provide details about your package (size, weight, and any special instructions)."
else:
return "I didn’t catch that. Please share your pickup and drop-off locations."
elif step == 3:
if user_input:
state.data["package_details"] = user_input
state.step = 3
return "Thank you! Your delivery is scheduled. We will confirm the pickup time shortly. Would you like to receive updates via WhatsApp? (Yes/No)"
else:
return "Please provide package details (size, weight, and any special instructions)."
elif step == 4:
if user_input.lower() in ["yes", "y"]:
state.data["updates"] = True
state.reset()
return "Awesome! You’ll receive updates on your delivery. Type 'menu' to return to the main menu."
else:
state.data["updates"] = False
state.reset()
return "Got it. You won’t receive updates. Type 'menu' to return to the main menu."
# --- FastAPI Setup & Endpoints ---
app = FastAPI()
@app.on_event("startup")
async def on_startup():
await init_db()
@app.post("/chatbot")
async def chatbot_response(request: Request, background_tasks: BackgroundTasks):
data = await request.json()
user_id = data.get("user_id")
user_message = data.get("message", "").strip()
if not user_id:
raise HTTPException(status_code=400, detail="Missing user_id in payload.")
# Initialize conversation context if not present
if user_id not in conversation_context:
conversation_context[user_id] = []
# Append the inbound message to the conversation context
conversation_context[user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"role": "user",
"message": user_message
})
# Log the chat to the database
background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message)
# Handle main menu options
if user_message.lower() in ["menu", "hi", "hello"]:
response_text = generate_main_menu()
elif user_message == "1": # Track an Order
response_text = "Please enter your Order ID, or type 'help' if you need assistance."
elif user_message == "2": # Schedule a Delivery
response_text = await schedule_delivery_flow(user_id, step=1)
elif user_message == "3": # FAQs & Support
response_text = generate_faq_menu()
elif user_message == "4": # Talk to an Agent
response_text = "Please hold on while I connect you to one of our agents."
elif user_message.isdigit() and len(user_message) == 1 and user_message in ["1", "2", "3", "4"]: # FAQ Submenu
response_text = handle_faq_response(user_message)
elif "track" in user_message.lower(): # Order Tracking
order_id = user_message.replace("track", "").strip()
response_text = await track_order_flow(user_id, order_id)
else:
response_text = "I didn’t quite catch that. Please choose a valid option or type 'menu' to see the main menu."
# Log the outbound response
background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text)
conversation_context[user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"role": "bot",
"message": response_text
})
return JSONResponse(content={"response": response_text})
# --- Other Endpoints (Chat History, Order Details, User Profile, Analytics, Voice, Payment Callback) ---
@app.get("/chat_history/{user_id}")
async def get_chat_history(user_id: str):
async with async_session() as session:
result = await session.execute(
ChatHistory.__table__.select().where(ChatHistory.user_id == user_id)
)
history = result.fetchall()
return [dict(row) for row in history]
@app.get("/order/{order_id}")
async def get_order(order_id: str):
async with async_session() as session:
result = await session.execute(
Order.__table__.select().where(Order.order_id == order_id)
)
order = result.fetchone()
if order:
return dict(order)
else:
raise HTTPException(status_code=404, detail="Order not found.")
@app.get("/user_profile/{user_id}")
async def get_user_profile(user_id: str):
profile = await get_or_create_user_profile(user_id)
return {
"user_id": profile.user_id,
"phone_number": profile.phone_number,
"name": profile.name,
"email": profile.email,
"preferences": profile.preferences,
"last_interaction": profile.last_interaction.isoformat()
}
@app.get("/analytics")
async def get_analytics():
async with async_session() as session:
msg_result = await session.execute(ChatHistory.__table__.count())
total_messages = msg_result.scalar() or 0
order_result = await session.execute(Order.__table__.count())
total_orders = order_result.scalar() or 0
sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs")
avg_sentiment = sentiment_result.scalar() or 0
return {
"total_messages": total_messages,
"total_orders": total_orders,
"average_sentiment": avg_sentiment
}
@app.post("/voice")
async def process_voice(file: UploadFile = File(...)):
contents = await file.read()
simulated_text = "Simulated speech-to-text conversion result."
return {"transcription": simulated_text}
# --- Payment Callback Endpoint with Payment Tracking and Redirection ---
@app.api_route("/payment_callback", methods=["GET", "POST"])
async def payment_callback(request: Request):
# GET: User redirection after payment
if request.method == "GET":
params = request.query_params
order_id = params.get("reference")
status = params.get("status", "Paid")
if not order_id:
raise HTTPException(status_code=400, detail="Missing order reference in callback.")
async with async_session() as session:
result = await session.execute(
Order.__table__.select().where(Order.order_id == order_id)
)
order = result.scalar_one_or_none()
if order:
order.status = status
await session.commit()
else:
raise HTTPException(status_code=404, detail="Order not found.")
# Notify management via WhatsApp about the payment update
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER,
f"Payment Update:\nOrder ID: {order_id} is now {status}."
)
# Redirect user back to the chat interface (adjust URL as needed)
redirect_url = f"https://yourdomain.com/chat?order_id={order_id}&status=success"
return RedirectResponse(url=redirect_url)
# POST: Server-to-server callback from Paystack
else:
data = await request.json()
order_id = data.get("reference")
new_status = data.get("status", "Paid")
if not order_id:
raise HTTPException(status_code=400, detail="Missing order reference in callback.")
async with async_session() as session:
result = await session.execute(
Order.__table__.select().where(Order.order_id == order_id)
)
order = result.scalar_one_or_none()
if order:
order.status = new_status
await session.commit()
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER,
f"Payment Update:\nOrder ID: {order_id} is now {new_status}."
)
return JSONResponse(content={"message": "Order updated successfully."})
else:
raise HTTPException(status_code=404, detail="Order not found.")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)