Spaces:
Sleeping
Sleeping
import re | |
import os | |
import time | |
import requests | |
import base64 | |
import asyncio | |
from datetime import datetime, timedelta | |
from bs4 import BeautifulSoup | |
from sqlalchemy import select | |
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form | |
from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse | |
import openai | |
# For sentiment analysis using TextBlob | |
from textblob import TextBlob | |
# SQLAlchemy Imports (Async) | |
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession | |
from sqlalchemy.orm import sessionmaker, declarative_base | |
from sqlalchemy import Column, Integer, String, DateTime, Text, Float | |
# --- Environment Variables and API Keys --- | |
SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "default_fallback_value") | |
PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "default_fallback_value") | |
DATABASE_URL = os.getenv("DATABASE_URL", "default_fallback_value") # Example using SQLite | |
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "default_fallback_value") | |
openai.api_key = os.getenv("OPENAI_API_KEY", "default_fallback_value") | |
# WhatsApp Business API credentials (Cloud API) | |
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID", "default_value") | |
WHATSAPP_ACCESS_TOKEN = os.getenv("WHATSAPP_ACCESS_TOKEN", "default_value") | |
MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "default_value") | |
# --- Database Setup --- | |
Base = declarative_base() | |
class ChatHistory(Base): | |
__tablename__ = "chat_history" | |
id = Column(Integer, primary_key=True, index=True) | |
user_id = Column(String, index=True) | |
timestamp = Column(DateTime, default=datetime.utcnow) | |
direction = Column(String) # 'inbound' or 'outbound' | |
message = Column(Text) | |
class Order(Base): | |
__tablename__ = "orders" | |
id = Column(Integer, primary_key=True, index=True) | |
order_id = Column(String, unique=True, index=True) | |
user_id = Column(String, index=True) | |
dish = Column(String) | |
quantity = Column(String) | |
price = Column(String, default="0") | |
status = Column(String, default="Pending Payment") | |
payment_reference = Column(String, nullable=True) | |
delivery_address = Column(String, default="") # New field for address | |
timestamp = Column(DateTime, default=datetime.utcnow) | |
class UserProfile(Base): | |
__tablename__ = "user_profiles" | |
id = Column(Integer, primary_key=True, index=True) | |
user_id = Column(String, unique=True, index=True) | |
phone_number = Column(String, unique=True, index=True, nullable=True) | |
name = Column(String, default="Valued Customer") | |
email = Column(String, default="[email protected]") | |
preferences = Column(Text, default="") | |
last_interaction = Column(DateTime, default=datetime.utcnow) | |
loyalty_points = Column(Integer, default=0) # New field for loyalty points | |
preferred_language = Column(String, default="English") # New field for language preference | |
class SentimentLog(Base): | |
__tablename__ = "sentiment_logs" | |
id = Column(Integer, primary_key=True, index=True) | |
user_id = Column(String, index=True) | |
timestamp = Column(DateTime, default=datetime.utcnow) | |
sentiment_score = Column(Float) | |
message = Column(Text) | |
class OrderTracking(Base): | |
__tablename__ = "order_tracking" | |
id = Column(Integer, primary_key=True, index=True) | |
order_id = Column(String, index=True) | |
status = Column(String) # e.g., "Order Placed", "Payment Confirmed", etc. | |
message = Column(Text, nullable=True) # Optional additional details | |
timestamp = Column(DateTime, default=datetime.utcnow) | |
class Feedback(Base): | |
__tablename__ = "feedback" | |
id = Column(Integer, primary_key=True, index=True) | |
user_id = Column(String, index=True) | |
rating = Column(Integer) | |
comment = Column(Text, nullable=True) | |
timestamp = Column(DateTime, default=datetime.utcnow) | |
# --- Create Engine and Session --- | |
engine = create_async_engine(DATABASE_URL, echo=True) | |
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) | |
async def init_db(): | |
async with engine.begin() as conn: | |
await conn.run_sync(Base.metadata.create_all) | |
# --- Global In-Memory Stores --- | |
user_state = {} # e.g., { user_id: ConversationState } | |
conversation_context = {} # { user_id: [ { "timestamp": ..., "role": "user"/"bot", "message": ... }, ... ] } | |
proactive_timer = {} | |
# --- Utility Functions --- | |
async def log_chat_to_db(user_id: str, direction: str, message: str): | |
async with async_session() as session: | |
entry = ChatHistory(user_id=user_id, direction=direction, message=message) | |
session.add(entry) | |
await session.commit() | |
async def log_sentiment(user_id: str, message: str, score: float): | |
async with async_session() as session: | |
entry = SentimentLog(user_id=user_id, sentiment_score=score, message=message) | |
session.add(entry) | |
await session.commit() | |
def analyze_sentiment(text: str) -> float: | |
blob = TextBlob(text) | |
return blob.sentiment.polarity | |
# --- New Features Implementation --- | |
async def send_main_menu(user_id: str): | |
menu_message = ( | |
"Hi there! 👋 Welcome to [Delivery Service Co.]. I’m here to help with your deliveries. " | |
"What would you like to do today?" | |
) | |
quick_replies = [ | |
{"title": "Track an Order", "payload": "track_order"}, | |
{"title": "Schedule a Delivery", "payload": "schedule_delivery"}, | |
{"title": "FAQs & Support", "payload": "faqs"}, | |
{"title": "Loyalty Points", "payload": "loyalty_points"}, | |
{"title": "Talk to an Agent", "payload": "live_agent"}, | |
] | |
await log_chat_to_db(user_id, "outbound", menu_message) | |
return {"response": menu_message, "quick_replies": quick_replies} | |
async def track_order(user_id: str, order_id: str): | |
# Simulate fetching real-time tracking data | |
tracking_data = { | |
"status": "On the way", | |
"estimated_time": "30 minutes", | |
"driver_location": "https://maps.google.com/?q=6.5244,3.3792", # Example location | |
} | |
tracking_message = ( | |
f"🚚 Your order ({order_id}) is currently {tracking_data['status']} and is expected to arrive in {tracking_data['estimated_time']}. " | |
f"Tap below to track your package in real-time." | |
) | |
quick_replies = [ | |
{"title": "Track on Map", "url": tracking_data["driver_location"]}, | |
{"title": "Back to Menu", "payload": "main_menu"}, | |
] | |
await log_chat_to_db(user_id, "outbound", tracking_message) | |
return {"response": tracking_message, "quick_replies": quick_replies} | |
async def recommend_package(user_id: str, package_description: str): | |
# Simulate AI analysis | |
package_size = "Medium" | |
price = 2500 | |
recommendation_message = ( | |
f"Based on your description, we recommend a {package_size} package for ₦{price}. " | |
"Does this sound right?" | |
) | |
quick_replies = [ | |
{"title": "Yes, proceed", "payload": f"confirm_package:{package_size}:{price}"}, | |
{"title": "No, adjust size", "payload": "adjust_package"}, | |
] | |
await log_chat_to_db(user_id, "outbound", recommendation_message) | |
return {"response": recommendation_message, "quick_replies": quick_replies} | |
async def check_loyalty_points(user_id: str): | |
# Simulate fetching loyalty points | |
points = 200 | |
discount = 500 | |
loyalty_message = ( | |
f"🎉 You’ve earned 50 points for this delivery! You now have {points} points. " | |
f"Redeem them for a ₦{discount} discount on your next order." | |
) | |
quick_replies = [ | |
{"title": "Redeem Points", "payload": "redeem_points"}, | |
{"title": "Back to Menu", "payload": "main_menu"}, | |
] | |
await log_chat_to_db(user_id, "outbound", loyalty_message) | |
return {"response": loyalty_message, "quick_replies": quick_replies} | |
async def send_proactive_update(user_id: str, order_id: str, status: str): | |
if status == "picked_up": | |
message = f"🚚 Your order ({order_id}) has been picked up and is on the way!" | |
elif status == "nearby": | |
message = f"🚚 Your driver is 10 minutes away! Please ensure someone is available to receive the package." | |
await log_chat_to_db(user_id, "outbound", message) | |
return {"response": message} | |
async def set_language(user_id: str, language: str): | |
supported_languages = ["English", "Français", "Español"] | |
if language in supported_languages: | |
user_state[user_id]["language"] = language | |
message = f"Language set to {language}. How can I assist you today?" | |
else: | |
message = "Sorry, that language is not supported. Please choose from: English, Français, Español." | |
quick_replies = [{"title": lang, "payload": f"set_language:{lang}"} for lang in supported_languages] | |
await log_chat_to_db(user_id, "outbound", message) | |
return {"response": message, "quick_replies": quick_replies} | |
async def request_feedback(user_id: str): | |
feedback_message = "How was your delivery experience? Tap to rate:" | |
quick_replies = [ | |
{"title": "⭐️⭐️⭐️⭐️⭐️", "payload": "rate:5"}, | |
{"title": "⭐️⭐️⭐️⭐️", "payload": "rate:4"}, | |
{"title": "⭐️⭐️⭐️", "payload": "rate:3"}, | |
{"title": "⭐️⭐️", "payload": "rate:2"}, | |
{"title": "⭐️", "payload": "rate:1"}, | |
] | |
await log_chat_to_db(user_id, "outbound", feedback_message) | |
return {"response": feedback_message, "quick_replies": quick_replies} | |
async def show_environmental_impact(user_id: str): | |
impact_message = "🌍 Your delivery saved 2kg of CO2 emissions! Thank you for choosing eco-friendly shipping." | |
await log_chat_to_db(user_id, "outbound", impact_message) | |
return {"response": impact_message} | |
async def start_onboarding(user_id: str): | |
tutorial_message = ( | |
"Let me guide you through how to schedule a delivery. Tap ‘Next’ to continue." | |
) | |
quick_replies = [ | |
{"title": "Next", "payload": "tutorial_step_1"}, | |
{"title": "Skip Tutorial", "payload": "main_menu"}, | |
] | |
await log_chat_to_db(user_id, "outbound", tutorial_message) | |
return {"response": tutorial_message, "quick_replies": quick_replies} | |
async def suggest_faqs(user_id: str, user_input: str): | |
# Simulate AI-powered FAQ suggestions | |
suggested_faqs = [ | |
"How long does delivery take?", | |
"Can I change my delivery time?", | |
"What are your pricing options?", | |
] | |
faq_message = ( | |
f"It looks like you’re asking about delivery times. Here are some related FAQs:" | |
) | |
quick_replies = [{"title": faq, "payload": f"faq:{faq}"} for faq in suggested_faqs] | |
await log_chat_to_db(user_id, "outbound", faq_message) | |
return {"response": faq_message, "quick_replies": quick_replies} | |
async def schedule_offline(user_id: str): | |
offline_message = ( | |
"You’re offline. Your delivery has been scheduled and will be confirmed once you’re back online." | |
) | |
await log_chat_to_db(user_id, "outbound", offline_message) | |
return {"response": offline_message} | |
# --- FastAPI Setup & Endpoints --- | |
app = FastAPI() | |
async def on_startup(): | |
await init_db() | |
async def chatbot_response(request: Request, background_tasks: BackgroundTasks): | |
data = await request.json() | |
user_id = data.get("user_id") | |
phone_number = data.get("phone_number") | |
user_message = data.get("message", "").strip() | |
is_image = data.get("is_image", False) | |
image_b64 = data.get("image_base64", None) | |
if not user_id: | |
raise HTTPException(status_code=400, detail="Missing user_id in payload.") | |
# Initialize conversation context for the user if not present. | |
if user_id not in conversation_context: | |
conversation_context[user_id] = [] | |
# Append the inbound message to the conversation context. | |
conversation_context[user_id].append({ | |
"timestamp": datetime.utcnow().isoformat(), | |
"role": "user", | |
"message": user_message | |
}) | |
background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message) | |
await update_user_last_interaction(user_id) | |
await get_or_create_user_profile(user_id, phone_number) | |
# Handle image queries | |
if is_image and image_b64: | |
if len(image_b64) >= 180_000: | |
raise HTTPException(status_code=400, detail="Image too large.") | |
return StreamingResponse(stream_image_completion(image_b64), media_type="text/plain") | |
sentiment_score = analyze_sentiment(user_message) | |
background_tasks.add_task(log_sentiment, user_id, user_message, sentiment_score) | |
sentiment_modifier = "" | |
if sentiment_score < -0.3: | |
sentiment_modifier = "I'm sorry if you're having a tough time. " | |
elif sentiment_score > 0.3: | |
sentiment_modifier = "Great to hear from you! " | |
# --- Order Tracking Handling --- | |
order_id_match = re.search(r"ord-\d+", user_message.lower()) | |
if order_id_match: | |
order_id = order_id_match.group(0) | |
try: | |
# Call the /track_order endpoint | |
tracking_response = await track_order(order_id) | |
return JSONResponse(content={"response": tracking_response}) | |
except HTTPException as e: | |
return JSONResponse(content={"response": f"⚠️ {e.detail}"}) | |
# --- Order Flow Handling --- | |
order_response = process_order_flow(user_id, user_message) | |
if order_response: | |
background_tasks.add_task(log_chat_to_db, user_id, "outbound", order_response) | |
conversation_context[user_id].append({ | |
"timestamp": datetime.utcnow().isoformat(), | |
"role": "bot", | |
"message": order_response | |
}) | |
return JSONResponse(content={"response": sentiment_modifier + order_response}) | |
# --- Menu Display --- | |
if "menu" in user_message.lower(): | |
if user_id in user_state: | |
del user_state[user_id] | |
menu_with_images = [] | |
for index, item in enumerate(menu_items, start=1): | |
image_url = google_image_scrape(item["name"]) | |
menu_with_images.append({ | |
"number": index, | |
"name": item["name"], | |
"description": item["description"], | |
"price": item["price"], | |
"image_url": image_url | |
}) | |
response_payload = { | |
"response": sentiment_modifier + "Here’s our delicious menu:", | |
"menu": menu_with_images, | |
"follow_up": ( | |
"To order, type the *number* or *name* of the dish you'd like. " | |
"For example, type '1' or 'Jollof Rice' to order Jollof Rice.\n\n" | |
"You can also ask for nutritional facts by typing, for example, 'Nutritional facts for Jollof Rice'." | |
) | |
} | |
background_tasks.add_task(log_chat_to_db, user_id, "outbound", str(response_payload)) | |
conversation_context[user_id].append({ | |
"timestamp": datetime.utcnow().isoformat(), | |
"role": "bot", | |
"message": response_payload["response"] | |
}) | |
return JSONResponse(content=response_payload) | |
# --- Dish Selection via Menu --- | |
if any(item["name"].lower() in user_message.lower() for item in menu_items) or \ | |
any(str(index) == user_message.strip() for index, item in enumerate(menu_items, start=1)): | |
selected_dish = None | |
if user_message.strip().isdigit(): | |
dish_number = int(user_message.strip()) | |
if 1 <= dish_number <= len(menu_items): | |
selected_dish = menu_items[dish_number - 1]["name"] | |
else: | |
for item in menu_items: | |
if item["name"].lower() in user_message.lower(): | |
selected_dish = item["name"] | |
break | |
if selected_dish: | |
state = ConversationState() | |
state.flow = "order" | |
# Set step to 2 since the dish is already selected | |
state.step = 2 | |
state.data["dish"] = selected_dish | |
state.update_last_active() | |
user_state[user_id] = state | |
response_text = f"You selected {selected_dish}. How many servings would you like?" | |
background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text) | |
conversation_context[user_id].append({ | |
"timestamp": datetime.utcnow().isoformat(), | |
"role": "bot", | |
"message": response_text | |
}) | |
return JSONResponse(content={"response": sentiment_modifier + response_text}) | |
else: | |
response_text = "Sorry, I couldn't find that dish in the menu. Please try again." | |
background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text) | |
conversation_context[user_id].append({ | |
"timestamp": datetime.utcnow().isoformat(), | |
"role": "bot", | |
"message": response_text | |
}) | |
return JSONResponse(content={"response": sentiment_modifier + response_text}) | |
# --- Nutritional Facts --- | |
if "nutritional facts for" in user_message.lower(): | |
dish_name = user_message.lower().replace("nutritional facts for", "").strip().title() | |
dish = next((item for item in menu_items if item["name"].lower() == dish_name.lower()), None) | |
if dish: | |
response_text = f"Nutritional facts for {dish['name']}:\n{dish['nutrition']}" | |
else: | |
response_text = f"Sorry, I couldn't find nutritional facts for {dish_name}." | |
background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text) | |
conversation_context[user_id].append({ | |
"timestamp": datetime.utcnow().isoformat(), | |
"role": "bot", | |
"message": response_text | |
}) | |
return JSONResponse(content={"response": sentiment_modifier + response_text}) | |
# --- Fallback: LLM Response Streaming with Conversation Context --- | |
recent_context = conversation_context.get(user_id, [])[-5:] | |
context_str = "\n".join([f"{entry['role'].capitalize()}: {entry['message']}" for entry in recent_context]) | |
prompt = f"Conversation context:\n{context_str}\nUser query: {user_message}\nGenerate a helpful, personalized response for a restaurant chatbot." | |
def stream_response(): | |
for chunk in stream_text_completion(prompt): | |
yield chunk | |
fallback_log = f"LLM fallback response for prompt: {prompt}" | |
background_tasks.add_task(log_chat_to_db, user_id, "outbound", fallback_log) | |
return StreamingResponse(stream_response(), media_type="text/plain") | |
# --- Other Endpoints (Chat History, Order Details, User Profile, Analytics, Voice, Payment Callback) --- | |
async def get_chat_history(user_id: str): | |
async with async_session() as session: | |
result = await session.execute( | |
ChatHistory.__table__.select().where(ChatHistory.user_id == user_id) | |
) | |
history = result.fetchall() | |
return [dict(row) for row in history] | |
async def get_order(order_id: str): | |
async with async_session() as session: | |
result = await session.execute( | |
Order.__table__.select().where(Order.order_id == order_id) | |
) | |
order = result.fetchone() | |
if order: | |
return dict(order) | |
else: | |
raise HTTPException(status_code=404, detail="Order not found.") | |
async def get_user_profile(user_id: str): | |
profile = await get_or_create_user_profile(user_id) | |
return { | |
"user_id": profile.user_id, | |
"phone_number": profile.phone_number, | |
"name": profile.name, | |
"email": profile.email, | |
"preferences": profile.preferences, | |
"last_interaction": profile.last_interaction.isoformat() | |
} | |
async def get_analytics(): | |
async with async_session() as session: | |
msg_result = await session.execute(ChatHistory.__table__.count()) | |
total_messages = msg_result.scalar() or 0 | |
order_result = await session.execute(Order.__table__.count()) | |
total_orders = order_result.scalar() or 0 | |
sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs") | |
avg_sentiment = sentiment_result.scalar() or 0 | |
return { | |
"total_messages": total_messages, | |
"total_orders": total_orders, | |
"average_sentiment": avg_sentiment | |
} | |
async def process_voice(file: UploadFile = File(...)): | |
contents = await file.read() | |
simulated_text = "Simulated speech-to-text conversion result." | |
return {"transcription": simulated_text} | |
# --- Payment Callback Endpoint with Payment Tracking and Redirection --- | |
async def payment_callback(request: Request): | |
# GET: User redirection after payment | |
if request.method == "GET": | |
params = request.query_params | |
order_id = params.get("reference") | |
status = params.get("status", "Paid") | |
if not order_id: | |
raise HTTPException(status_code=400, detail="Missing order reference in callback.") | |
async with async_session() as session: | |
result = await session.execute( | |
Order.__table__.select().where(Order.order_id == order_id) | |
) | |
order = result.scalar_one_or_none() | |
if order: | |
order.status = status | |
await session.commit() | |
else: | |
raise HTTPException(status_code=404, detail="Order not found.") | |
# Record payment confirmation tracking update | |
await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {status}.") | |
# Notify management via WhatsApp about the payment update | |
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, | |
f"Payment Update:\nOrder ID: {order_id} is now {status}." | |
) | |
# Redirect user back to the chat interface (adjust URL as needed) | |
redirect_url = f"https://wa.link/am87s2" | |
return RedirectResponse(url=redirect_url) | |
# POST: Server-to-server callback from Paystack | |
else: | |
data = await request.json() | |
order_id = data.get("reference") | |
new_status = data.get("status", "Paid") | |
if not order_id: | |
raise HTTPException(status_code=400, detail="Missing order reference in callback.") | |
async with async_session() as session: | |
result = await session.execute( | |
Order.__table__.select().where(Order.order_id == order_id) | |
) | |
order = result.scalar_one_or_none() | |
if order: | |
order.status = new_status | |
await session.commit() | |
await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {new_status}.") | |
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, | |
f"Payment Update:\nOrder ID: {order_id} is now {new_status}." | |
) | |
return JSONResponse(content={"message": "Order updated successfully."}) | |
else: | |
raise HTTPException(status_code=404, detail="Order not found.") | |
async def track_order(order_id: str): | |
""" | |
Fetch order tracking details for a given order ID. | |
""" | |
async with async_session() as session: | |
result = await session.execute( | |
select(OrderTracking) | |
.where(OrderTracking.order_id == order_id) | |
.order_by(OrderTracking.timestamp) | |
) | |
tracking_updates = result.scalars().all() | |
if tracking_updates: | |
response = [] | |
for update in tracking_updates: | |
response.append({ | |
"status": update.status, | |
"message": update.message, | |
"timestamp": update.timestamp.isoformat(), | |
}) | |
return JSONResponse(content=response) | |
else: | |
raise HTTPException(status_code=404, detail="No tracking information found for this order.") | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |