Fred808 commited on
Commit
1bdd4ff
·
verified ·
1 Parent(s): da7b702

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +322 -453
app.py CHANGED
@@ -1,3 +1,6 @@
 
 
 
1
  import re
2
  import os
3
  import time
@@ -5,256 +8,198 @@ import requests
5
  import base64
6
  import asyncio
7
  from datetime import datetime, timedelta
8
- from bs4 import BeautifulSoup
9
- from sqlalchemy import select
10
-
11
  from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form
12
  from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse
13
-
14
- import openai
15
-
16
- # For sentiment analysis using TextBlob
17
- from textblob import TextBlob
18
-
19
- # SQLAlchemy Imports (Async)
20
  from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
21
  from sqlalchemy.orm import sessionmaker, declarative_base
22
- from sqlalchemy import Column, Integer, String, DateTime, Text, Float
23
-
24
- # --- Environment Variables and API Keys ---
25
- SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "default_fallback_value")
26
- PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "default_fallback_value")
27
- DATABASE_URL = os.getenv("DATABASE_URL", "default_fallback_value") # Example using SQLite
28
- NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "default_fallback_value")
29
- openai.api_key = os.getenv("OPENAI_API_KEY", "default_fallback_value")
30
 
31
- # WhatsApp Business API credentials (Cloud API)
32
- WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID", "default_value")
33
- WHATSAPP_ACCESS_TOKEN = os.getenv("WHATSAPP_ACCESS_TOKEN", "default_value")
34
- MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "default_value")
35
-
36
- # --- Database Setup ---
37
  Base = declarative_base()
38
 
39
- class ChatHistory(Base):
40
- __tablename__ = "chat_history"
41
- id = Column(Integer, primary_key=True, index=True)
42
- user_id = Column(String, index=True)
43
- timestamp = Column(DateTime, default=datetime.utcnow)
44
- direction = Column(String) # 'inbound' or 'outbound'
45
- message = Column(Text)
46
-
47
- class Order(Base):
48
- __tablename__ = "orders"
49
- id = Column(Integer, primary_key=True, index=True)
50
- order_id = Column(String, unique=True, index=True)
51
- user_id = Column(String, index=True)
52
- dish = Column(String)
53
- quantity = Column(String)
54
- price = Column(String, default="0")
55
- status = Column(String, default="Pending Payment")
56
- payment_reference = Column(String, nullable=True)
57
- delivery_address = Column(String, default="") # New field for address
58
- timestamp = Column(DateTime, default=datetime.utcnow)
59
-
60
  class UserProfile(Base):
61
  __tablename__ = "user_profiles"
62
  id = Column(Integer, primary_key=True, index=True)
63
- user_id = Column(String, unique=True, index=True)
64
- phone_number = Column(String, unique=True, index=True, nullable=True)
65
- name = Column(String, default="Valued Customer")
66
- email = Column(String, default="[email protected]")
67
- preferences = Column(Text, default="")
68
- last_interaction = Column(DateTime, default=datetime.utcnow)
69
- loyalty_points = Column(Integer, default=0) # New field for loyalty points
70
- preferred_language = Column(String, default="English") # New field for language preference
71
-
72
- class SentimentLog(Base):
73
- __tablename__ = "sentiment_logs"
74
- id = Column(Integer, primary_key=True, index=True)
75
- user_id = Column(String, index=True)
76
- timestamp = Column(DateTime, default=datetime.utcnow)
77
- sentiment_score = Column(Float)
78
- message = Column(Text)
79
-
80
- class OrderTracking(Base):
81
- __tablename__ = "order_tracking"
82
- id = Column(Integer, primary_key=True, index=True)
83
- order_id = Column(String, index=True)
84
- status = Column(String) # e.g., "Order Placed", "Payment Confirmed", etc.
85
- message = Column(Text, nullable=True) # Optional additional details
86
- timestamp = Column(DateTime, default=datetime.utcnow)
87
-
88
- class Feedback(Base):
89
- __tablename__ = "feedback"
90
- id = Column(Integer, primary_key=True, index=True)
91
- user_id = Column(String, index=True)
92
- rating = Column(Integer)
93
- comment = Column(Text, nullable=True)
94
- timestamp = Column(DateTime, default=datetime.utcnow)
95
-
96
- # --- Create Engine and Session ---
97
- engine = create_async_engine(DATABASE_URL, echo=True)
98
- async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
99
-
100
- async def init_db():
101
- async with engine.begin() as conn:
102
- await conn.run_sync(Base.metadata.create_all)
103
-
104
- # --- Global In-Memory Stores ---
105
- user_state = {} # e.g., { user_id: ConversationState }
106
- conversation_context = {} # { user_id: [ { "timestamp": ..., "role": "user"/"bot", "message": ... }, ... ] }
107
- proactive_timer = {}
108
-
109
- # --- Utility Functions ---
110
- async def log_chat_to_db(user_id: str, direction: str, message: str):
111
  async with async_session() as session:
112
- entry = ChatHistory(user_id=user_id, direction=direction, message=message)
113
- session.add(entry)
 
 
 
114
  await session.commit()
115
 
116
- async def log_sentiment(user_id: str, message: str, score: float):
117
  async with async_session() as session:
118
- entry = SentimentLog(user_id=user_id, sentiment_score=score, message=message)
119
- session.add(entry)
120
- await session.commit()
121
-
122
- def analyze_sentiment(text: str) -> float:
123
- blob = TextBlob(text)
124
- return blob.sentiment.polarity
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
 
126
- # --- New Features Implementation ---
127
- async def send_main_menu(user_id: str):
128
- menu_message = (
129
- "Hi there! 👋 Welcome to [Delivery Service Co.]. I’m here to help with your deliveries. "
130
- "What would you like to do today?"
131
- )
132
- quick_replies = [
133
- {"title": "Track an Order", "payload": "track_order"},
134
- {"title": "Schedule a Delivery", "payload": "schedule_delivery"},
135
- {"title": "FAQs & Support", "payload": "faqs"},
136
- {"title": "Loyalty Points", "payload": "loyalty_points"},
137
- {"title": "Talk to an Agent", "payload": "live_agent"},
138
- ]
139
- await log_chat_to_db(user_id, "outbound", menu_message)
140
- return {"response": menu_message, "quick_replies": quick_replies}
141
-
142
- async def track_order(user_id: str, order_id: str):
143
- # Simulate fetching real-time tracking data
144
- tracking_data = {
145
- "status": "On the way",
146
- "estimated_time": "30 minutes",
147
- "driver_location": "https://maps.google.com/?q=6.5244,3.3792", # Example location
148
- }
149
- tracking_message = (
150
- f"🚚 Your order ({order_id}) is currently {tracking_data['status']} and is expected to arrive in {tracking_data['estimated_time']}. "
151
- f"Tap below to track your package in real-time."
152
- )
153
- quick_replies = [
154
- {"title": "Track on Map", "url": tracking_data["driver_location"]},
155
- {"title": "Back to Menu", "payload": "main_menu"},
156
- ]
157
- await log_chat_to_db(user_id, "outbound", tracking_message)
158
- return {"response": tracking_message, "quick_replies": quick_replies}
159
-
160
- async def recommend_package(user_id: str, package_description: str):
161
- # Simulate AI analysis
162
- package_size = "Medium"
163
- price = 2500
164
- recommendation_message = (
165
- f"Based on your description, we recommend a {package_size} package for ₦{price}. "
166
- "Does this sound right?"
167
- )
168
- quick_replies = [
169
- {"title": "Yes, proceed", "payload": f"confirm_package:{package_size}:{price}"},
170
- {"title": "No, adjust size", "payload": "adjust_package"},
171
- ]
172
- await log_chat_to_db(user_id, "outbound", recommendation_message)
173
- return {"response": recommendation_message, "quick_replies": quick_replies}
174
-
175
- async def check_loyalty_points(user_id: str):
176
- # Simulate fetching loyalty points
177
- points = 200
178
- discount = 500
179
- loyalty_message = (
180
- f"🎉 You’ve earned 50 points for this delivery! You now have {points} points. "
181
- f"Redeem them for a ₦{discount} discount on your next order."
182
- )
183
- quick_replies = [
184
- {"title": "Redeem Points", "payload": "redeem_points"},
185
- {"title": "Back to Menu", "payload": "main_menu"},
186
- ]
187
- await log_chat_to_db(user_id, "outbound", loyalty_message)
188
- return {"response": loyalty_message, "quick_replies": quick_replies}
189
-
190
- async def send_proactive_update(user_id: str, order_id: str, status: str):
191
- if status == "picked_up":
192
- message = f"🚚 Your order ({order_id}) has been picked up and is on the way!"
193
- elif status == "nearby":
194
- message = f"🚚 Your driver is 10 minutes away! Please ensure someone is available to receive the package."
195
- await log_chat_to_db(user_id, "outbound", message)
196
- return {"response": message}
197
-
198
- async def set_language(user_id: str, language: str):
199
- supported_languages = ["English", "Français", "Español"]
200
- if language in supported_languages:
201
- user_state[user_id]["language"] = language
202
- message = f"Language set to {language}. How can I assist you today?"
203
- else:
204
- message = "Sorry, that language is not supported. Please choose from: English, Français, Español."
205
- quick_replies = [{"title": lang, "payload": f"set_language:{lang}"} for lang in supported_languages]
206
- await log_chat_to_db(user_id, "outbound", message)
207
- return {"response": message, "quick_replies": quick_replies}
208
-
209
- async def request_feedback(user_id: str):
210
- feedback_message = "How was your delivery experience? Tap to rate:"
211
- quick_replies = [
212
- {"title": "⭐️⭐️⭐️⭐️⭐️", "payload": "rate:5"},
213
- {"title": "⭐️⭐️⭐️⭐️", "payload": "rate:4"},
214
- {"title": "⭐️⭐️⭐️", "payload": "rate:3"},
215
- {"title": "⭐️⭐️", "payload": "rate:2"},
216
- {"title": "⭐️", "payload": "rate:1"},
217
- ]
218
- await log_chat_to_db(user_id, "outbound", feedback_message)
219
- return {"response": feedback_message, "quick_replies": quick_replies}
220
-
221
- async def show_environmental_impact(user_id: str):
222
- impact_message = "🌍 Your delivery saved 2kg of CO2 emissions! Thank you for choosing eco-friendly shipping."
223
- await log_chat_to_db(user_id, "outbound", impact_message)
224
- return {"response": impact_message}
225
-
226
- async def start_onboarding(user_id: str):
227
- tutorial_message = (
228
- "Let me guide you through how to schedule a delivery. Tap ‘Next’ to continue."
229
- )
230
- quick_replies = [
231
- {"title": "Next", "payload": "tutorial_step_1"},
232
- {"title": "Skip Tutorial", "payload": "main_menu"},
233
- ]
234
- await log_chat_to_db(user_id, "outbound", tutorial_message)
235
- return {"response": tutorial_message, "quick_replies": quick_replies}
236
-
237
- async def suggest_faqs(user_id: str, user_input: str):
238
- # Simulate AI-powered FAQ suggestions
239
- suggested_faqs = [
240
- "How long does delivery take?",
241
- "Can I change my delivery time?",
242
- "What are your pricing options?",
243
- ]
244
- faq_message = (
245
- f"It looks like you’re asking about delivery times. Here are some related FAQs:"
246
- )
247
- quick_replies = [{"title": faq, "payload": f"faq:{faq}"} for faq in suggested_faqs]
248
- await log_chat_to_db(user_id, "outbound", faq_message)
249
- return {"response": faq_message, "quick_replies": quick_replies}
250
 
251
- async def schedule_offline(user_id: str):
252
- offline_message = (
253
- "You’re offline. Your delivery has been scheduled and will be confirmed once you’re back online."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
254
  )
255
- await log_chat_to_db(user_id, "outbound", offline_message)
256
- return {"response": offline_message}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
257
 
 
 
 
258
  # --- FastAPI Setup & Endpoints ---
259
  app = FastAPI()
260
 
@@ -262,164 +207,6 @@ app = FastAPI()
262
  async def on_startup():
263
  await init_db()
264
 
265
- @app.post("/chatbot")
266
- async def chatbot_response(request: Request, background_tasks: BackgroundTasks):
267
- data = await request.json()
268
- user_id = data.get("user_id")
269
- phone_number = data.get("phone_number")
270
- user_message = data.get("message", "").strip()
271
- is_image = data.get("is_image", False)
272
- image_b64 = data.get("image_base64", None)
273
-
274
- if not user_id:
275
- raise HTTPException(status_code=400, detail="Missing user_id in payload.")
276
-
277
- # Initialize conversation context for the user if not present.
278
- if user_id not in conversation_context:
279
- conversation_context[user_id] = []
280
- # Append the inbound message to the conversation context.
281
- conversation_context[user_id].append({
282
- "timestamp": datetime.utcnow().isoformat(),
283
- "role": "user",
284
- "message": user_message
285
- })
286
-
287
- background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message)
288
- await update_user_last_interaction(user_id)
289
- await get_or_create_user_profile(user_id, phone_number)
290
-
291
- # Handle image queries
292
- if is_image and image_b64:
293
- if len(image_b64) >= 180_000:
294
- raise HTTPException(status_code=400, detail="Image too large.")
295
- return StreamingResponse(stream_image_completion(image_b64), media_type="text/plain")
296
-
297
- sentiment_score = analyze_sentiment(user_message)
298
- background_tasks.add_task(log_sentiment, user_id, user_message, sentiment_score)
299
- sentiment_modifier = ""
300
- if sentiment_score < -0.3:
301
- sentiment_modifier = "I'm sorry if you're having a tough time. "
302
- elif sentiment_score > 0.3:
303
- sentiment_modifier = "Great to hear from you! "
304
-
305
- # --- Order Tracking Handling ---
306
- order_id_match = re.search(r"ord-\d+", user_message.lower())
307
- if order_id_match:
308
- order_id = order_id_match.group(0)
309
- try:
310
- # Call the /track_order endpoint
311
- tracking_response = await track_order(order_id)
312
- return JSONResponse(content={"response": tracking_response})
313
- except HTTPException as e:
314
- return JSONResponse(content={"response": f"⚠️ {e.detail}"})
315
-
316
- # --- Order Flow Handling ---
317
- order_response = process_order_flow(user_id, user_message)
318
- if order_response:
319
- background_tasks.add_task(log_chat_to_db, user_id, "outbound", order_response)
320
- conversation_context[user_id].append({
321
- "timestamp": datetime.utcnow().isoformat(),
322
- "role": "bot",
323
- "message": order_response
324
- })
325
- return JSONResponse(content={"response": sentiment_modifier + order_response})
326
-
327
- # --- Menu Display ---
328
- if "menu" in user_message.lower():
329
- if user_id in user_state:
330
- del user_state[user_id]
331
- menu_with_images = []
332
- for index, item in enumerate(menu_items, start=1):
333
- image_url = google_image_scrape(item["name"])
334
- menu_with_images.append({
335
- "number": index,
336
- "name": item["name"],
337
- "description": item["description"],
338
- "price": item["price"],
339
- "image_url": image_url
340
- })
341
- response_payload = {
342
- "response": sentiment_modifier + "Here’s our delicious menu:",
343
- "menu": menu_with_images,
344
- "follow_up": (
345
- "To order, type the *number* or *name* of the dish you'd like. "
346
- "For example, type '1' or 'Jollof Rice' to order Jollof Rice.\n\n"
347
- "You can also ask for nutritional facts by typing, for example, 'Nutritional facts for Jollof Rice'."
348
- )
349
- }
350
- background_tasks.add_task(log_chat_to_db, user_id, "outbound", str(response_payload))
351
- conversation_context[user_id].append({
352
- "timestamp": datetime.utcnow().isoformat(),
353
- "role": "bot",
354
- "message": response_payload["response"]
355
- })
356
- return JSONResponse(content=response_payload)
357
-
358
- # --- Dish Selection via Menu ---
359
- if any(item["name"].lower() in user_message.lower() for item in menu_items) or \
360
- any(str(index) == user_message.strip() for index, item in enumerate(menu_items, start=1)):
361
- selected_dish = None
362
- if user_message.strip().isdigit():
363
- dish_number = int(user_message.strip())
364
- if 1 <= dish_number <= len(menu_items):
365
- selected_dish = menu_items[dish_number - 1]["name"]
366
- else:
367
- for item in menu_items:
368
- if item["name"].lower() in user_message.lower():
369
- selected_dish = item["name"]
370
- break
371
- if selected_dish:
372
- state = ConversationState()
373
- state.flow = "order"
374
- # Set step to 2 since the dish is already selected
375
- state.step = 2
376
- state.data["dish"] = selected_dish
377
- state.update_last_active()
378
- user_state[user_id] = state
379
- response_text = f"You selected {selected_dish}. How many servings would you like?"
380
- background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text)
381
- conversation_context[user_id].append({
382
- "timestamp": datetime.utcnow().isoformat(),
383
- "role": "bot",
384
- "message": response_text
385
- })
386
- return JSONResponse(content={"response": sentiment_modifier + response_text})
387
- else:
388
- response_text = "Sorry, I couldn't find that dish in the menu. Please try again."
389
- background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text)
390
- conversation_context[user_id].append({
391
- "timestamp": datetime.utcnow().isoformat(),
392
- "role": "bot",
393
- "message": response_text
394
- })
395
- return JSONResponse(content={"response": sentiment_modifier + response_text})
396
-
397
- # --- Nutritional Facts ---
398
- if "nutritional facts for" in user_message.lower():
399
- dish_name = user_message.lower().replace("nutritional facts for", "").strip().title()
400
- dish = next((item for item in menu_items if item["name"].lower() == dish_name.lower()), None)
401
- if dish:
402
- response_text = f"Nutritional facts for {dish['name']}:\n{dish['nutrition']}"
403
- else:
404
- response_text = f"Sorry, I couldn't find nutritional facts for {dish_name}."
405
- background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text)
406
- conversation_context[user_id].append({
407
- "timestamp": datetime.utcnow().isoformat(),
408
- "role": "bot",
409
- "message": response_text
410
- })
411
- return JSONResponse(content={"response": sentiment_modifier + response_text})
412
-
413
- # --- Fallback: LLM Response Streaming with Conversation Context ---
414
- recent_context = conversation_context.get(user_id, [])[-5:]
415
- context_str = "\n".join([f"{entry['role'].capitalize()}: {entry['message']}" for entry in recent_context])
416
- prompt = f"Conversation context:\n{context_str}\nUser query: {user_message}\nGenerate a helpful, personalized response for a restaurant chatbot."
417
- def stream_response():
418
- for chunk in stream_text_completion(prompt):
419
- yield chunk
420
- fallback_log = f"LLM fallback response for prompt: {prompt}"
421
- background_tasks.add_task(log_chat_to_db, user_id, "outbound", fallback_log)
422
- return StreamingResponse(stream_response(), media_type="text/plain")
423
 
424
  # --- Other Endpoints (Chat History, Order Details, User Profile, Analytics, Voice, Payment Callback) ---
425
  @app.get("/chat_history/{user_id}")
@@ -476,82 +263,164 @@ async def process_voice(file: UploadFile = File(...)):
476
  simulated_text = "Simulated speech-to-text conversion result."
477
  return {"transcription": simulated_text}
478
 
479
- # --- Payment Callback Endpoint with Payment Tracking and Redirection ---
480
  @app.api_route("/payment_callback", methods=["GET", "POST"])
481
  async def payment_callback(request: Request):
482
- # GET: User redirection after payment
483
- if request.method == "GET":
484
- params = request.query_params
485
- order_id = params.get("reference")
486
- status = params.get("status", "Paid")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
487
  if not order_id:
488
- raise HTTPException(status_code=400, detail="Missing order reference in callback.")
 
489
  async with async_session() as session:
 
490
  result = await session.execute(
491
- Order.__table__.select().where(Order.order_id == order_id)
 
 
492
  )
493
  order = result.scalar_one_or_none()
494
- if order:
495
- order.status = status
496
- await session.commit()
497
- else:
498
- raise HTTPException(status_code=404, detail="Order not found.")
499
- # Record payment confirmation tracking update
500
- await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {status}.")
501
- # Notify management via WhatsApp about the payment update
502
- await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER,
503
- f"Payment Update:\nOrder ID: {order_id} is now {status}."
504
- )
505
- # Redirect user back to the chat interface (adjust URL as needed)
506
- redirect_url = f"https://wa.link/am87s2"
507
- return RedirectResponse(url=redirect_url)
508
- # POST: Server-to-server callback from Paystack
509
- else:
510
- data = await request.json()
511
- order_id = data.get("reference")
512
- new_status = data.get("status", "Paid")
513
- if not order_id:
514
- raise HTTPException(status_code=400, detail="Missing order reference in callback.")
515
- async with async_session() as session:
516
- result = await session.execute(
517
- Order.__table__.select().where(Order.order_id == order_id)
518
  )
519
- order = result.scalar_one_or_none()
520
- if order:
521
- order.status = new_status
522
- await session.commit()
523
- await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {new_status}.")
524
- await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER,
525
- f"Payment Update:\nOrder ID: {order_id} is now {new_status}."
 
 
 
 
 
 
 
 
 
 
 
526
  )
527
- return JSONResponse(content={"message": "Order updated successfully."})
528
- else:
529
- raise HTTPException(status_code=404, detail="Order not found.")
530
-
531
- @app.get("/track_order/{order_id}")
532
- async def track_order(order_id: str):
 
 
 
 
 
 
 
 
 
 
533
  """
534
- Fetch order tracking details for a given order ID.
535
  """
536
- async with async_session() as session:
537
- result = await session.execute(
538
- select(OrderTracking)
539
- .where(OrderTracking.order_id == order_id)
540
- .order_by(OrderTracking.timestamp)
541
- )
542
- tracking_updates = result.scalars().all()
543
- if tracking_updates:
544
- response = []
545
- for update in tracking_updates:
546
- response.append({
 
 
 
 
 
 
 
 
 
 
 
 
 
547
  "status": update.status,
548
  "message": update.message,
549
  "timestamp": update.timestamp.isoformat(),
550
- })
 
 
 
 
 
 
 
 
 
551
  return JSONResponse(content=response)
552
- else:
553
- raise HTTPException(status_code=404, detail="No tracking information found for this order.")
554
 
555
- if __name__ == "__main__":
556
- import uvicorn
557
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # --------------------
2
+ # IMPORTS & SETUP
3
+ # --------------------
4
  import re
5
  import os
6
  import time
 
8
  import base64
9
  import asyncio
10
  from datetime import datetime, timedelta
11
+ from typing import Dict, List, Optional
 
 
12
  from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form
13
  from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse
14
+ from sqlalchemy import select, update
 
 
 
 
 
 
15
  from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
16
  from sqlalchemy.orm import sessionmaker, declarative_base
17
+ from sqlalchemy import Column, Integer, String, DateTime, Text, Float, Boolean
 
 
 
 
 
 
 
18
 
19
+ # --------------------
20
+ # DATABASE MODELS
21
+ # --------------------
 
 
 
22
  Base = declarative_base()
23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  class UserProfile(Base):
25
  __tablename__ = "user_profiles"
26
  id = Column(Integer, primary_key=True, index=True)
27
+ user_id = Column(String(36), unique=True, index=True)
28
+ # ... [keep previous fields] ...
29
+ ux_mode = Column(String(20), default="default")
30
+ accessibility_prefs = Column(Text, default="{}")
31
+
32
+ class UXPreferences(Base):
33
+ __tablename__ = "ux_preferences"
34
+ user_id = Column(String(36), primary_key=True)
35
+ interaction_speed = Column(String(20), default="normal")
36
+ color_scheme = Column(String(20), default="light")
37
+ input_preference = Column(String(20), default="buttons")
38
+
39
+ # --------------------
40
+ # STATE MANAGEMENT
41
+ # --------------------
42
+ class ConversationState:
43
+ def __init__(self):
44
+ self.flow: Optional[str] = None
45
+ self.step: int = 0
46
+ self.data: Dict = {}
47
+ self.context: List[Dict] = []
48
+ self.last_active: datetime = datetime.utcnow()
49
+
50
+ def update_last_active(self):
51
+ self.last_active = datetime.utcnow()
52
+
53
+ def add_context(self, role: str, message: str):
54
+ self.context.append({
55
+ "timestamp": datetime.utcnow(),
56
+ "role": role,
57
+ "message": message
58
+ })
59
+ # Keep only last 5 messages
60
+ if len(self.context) > 5:
61
+ self.context = self.context[-5:]
62
+
63
+ # --------------------
64
+ # CORE UTILITIES
65
+ # --------------------
66
+ async def update_user_last_interaction(user_id: str):
 
 
 
 
 
 
 
 
67
  async with async_session() as session:
68
+ await session.execute(
69
+ update(UserProfile)
70
+ .where(UserProfile.user_id == user_id)
71
+ .values(last_interaction=datetime.utcnow())
72
+ )
73
  await session.commit()
74
 
75
+ async def get_or_create_user_profile(user_id: str, phone: str = None) -> UserProfile:
76
  async with async_session() as session:
77
+ result = await session.execute(
78
+ select(UserProfile).where(UserProfile.user_id == user_id)
79
+ )
80
+ profile = result.scalar_one_or_none()
81
+
82
+ if not profile:
83
+ profile = UserProfile(
84
+ user_id=user_id,
85
+ phone_number=phone,
86
+ last_interaction=datetime.utcnow()
87
+ )
88
+ session.add(profile)
89
+ await session.commit()
90
+ await session.refresh(profile)
91
+
92
+ return profile
93
+
94
+ # --------------------
95
+ # ENHANCED UX FEATURES
96
+ # --------------------
97
+ class DeliveryUXManager:
98
+ @staticmethod
99
+ async def generate_response_template(user_id: str) -> Dict:
100
+ """Generate personalized response structure"""
101
+ profile = await get_or_create_user_profile(user_id)
102
+ return {
103
+ "meta": {
104
+ "ux_mode": profile.ux_mode,
105
+ "color_scheme": "light",
106
+ "interaction_mode": "text"
107
+ },
108
+ "content": {
109
+ "text": "",
110
+ "quick_replies": [],
111
+ "carousel": None,
112
+ "status_overlay": None
113
+ }
114
+ }
115
 
116
+ @staticmethod
117
+ async def handle_ux_preferences(user_id: str, preference: str):
118
+ """Update UX preferences"""
119
+ async with async_session() as session:
120
+ prefs = await session.get(UXPreferences, user_id)
121
+ if not prefs:
122
+ prefs = UXPreferences(user_id=user_id)
123
+ session.add(prefs)
124
+
125
+ # Update specific preference
126
+ if preference.startswith("color_"):
127
+ prefs.color_scheme = preference.split("_")[1]
128
+ elif preference.startswith("speed_"):
129
+ prefs.interaction_speed = preference.split("_")[1]
130
+
131
+ await session.commit()
132
+
133
+ # --------------------
134
+ # MAIN APPLICATION
135
+ # --------------------
136
+ app = FastAPI(title="Delivery Service Chatbot")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
 
138
+ @app.post("/chatbot")
139
+ async def enhanced_chatbot_handler(request: Request, bg: BackgroundTasks):
140
+ data = await request.json()
141
+ user_id = data["user_id"]
142
+ message = data.get("message", "")
143
+
144
+ # Initialize state
145
+ if user_id not in user_state:
146
+ user_state[user_id] = ConversationState()
147
+
148
+ state = user_state[user_id]
149
+ state.update_last_active()
150
+
151
+ # Process message
152
+ response = await DeliveryUXManager.generate_response_template(user_id)
153
+
154
+ # [Add conversation handling logic here]
155
+
156
+ return JSONResponse(response)
157
+
158
+ @app.post("/ux/preferences")
159
+ async def update_ux_preferences(request: Request):
160
+ data = await request.json()
161
+ await DeliveryUXManager.handle_ux_preferences(
162
+ data["user_id"],
163
+ data["preference"]
164
  )
165
+ return {"status": "preferences updated"}
166
+
167
+ # --------------------
168
+ # PROACTIVE FEATURES
169
+ # --------------------
170
+ async def send_proactive_update(user_id: str, update_type: str):
171
+ """Send unsolicited updates to users"""
172
+ state = user_state.get(user_id)
173
+ if not state:
174
+ return # User not active
175
+
176
+ response = await DeliveryUXManager.generate_response_template(user_id)
177
+
178
+ if update_type == "delivery_eta":
179
+ response["content"]["text"] = "📦 Your package is arriving in 15 minutes!"
180
+ response["content"]["quick_replies"] = [
181
+ {"title": "Track Live", "action": "track_live"},
182
+ {"title": "Delay Delivery", "action": "delay_delivery"}
183
+ ]
184
+
185
+ return response
186
+
187
+ # --------------------
188
+ # ERROR HANDLING
189
+ # --------------------
190
+ @app.exception_handler(HTTPException)
191
+ async def ux_error_handler(request: Request, exc: HTTPException):
192
+ return JSONResponse({
193
+ "meta": {"error": True},
194
+ "content": {
195
+ "text": f"⚠️ Error: {exc.detail}",
196
+ "quick_replies": [{"title": "Main Menu", "action": "reset"}]
197
+ }
198
+ }, status_code=exc.status_code)
199
 
200
+ if __name__ == "__main__":
201
+ import uvicorn
202
+ uvicorn.run(app, host="0.0.0.0", port=8000)
203
  # --- FastAPI Setup & Endpoints ---
204
  app = FastAPI()
205
 
 
207
  async def on_startup():
208
  await init_db()
209
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
210
 
211
  # --- Other Endpoints (Chat History, Order Details, User Profile, Analytics, Voice, Payment Callback) ---
212
  @app.get("/chat_history/{user_id}")
 
263
  simulated_text = "Simulated speech-to-text conversion result."
264
  return {"transcription": simulated_text}
265
 
266
+ # Enhanced payment callback with security checks
267
  @app.api_route("/payment_callback", methods=["GET", "POST"])
268
  async def payment_callback(request: Request):
269
+ """Handle payment gateway callbacks with HMAC verification"""
270
+ try:
271
+ if request.method == "POST":
272
+ # Verify Paystack signature
273
+ signature = request.headers.get("x-paystack-signature")
274
+ if not signature:
275
+ raise HTTPException(status_code=403, detail="Missing signature")
276
+
277
+ body = await request.body()
278
+ computed_sha = hmac.new(
279
+ PAYSTACK_SECRET_KEY.encode(),
280
+ body,
281
+ digestmod=hashlib.sha512
282
+ ).hexdigest()
283
+
284
+ if not hmac.compare_digest(computed_sha, signature):
285
+ raise HTTPException(status_code=403, detail="Invalid signature")
286
+
287
+ data = await request.json()
288
+ event = data.get("event")
289
+ order_id = data.get("data", {}).get("reference")
290
+ status = "Paid" if event == "charge.success" else "Failed"
291
+
292
+ elif request.method == "GET":
293
+ data = request.query_params
294
+ order_id = data.get("reference")
295
+ status = data.get("status", "Paid")
296
+
297
  if not order_id:
298
+ raise HTTPException(status_code=400, detail="Missing order reference")
299
+
300
  async with async_session() as session:
301
+ # Get order with lock to prevent race conditions
302
  result = await session.execute(
303
+ select(Order)
304
+ .where(Order.order_id == order_id)
305
+ .with_for_update()
306
  )
307
  order = result.scalar_one_or_none()
308
+
309
+ if not order:
310
+ raise HTTPException(status_code=404, detail="Order not found")
311
+
312
+ valid_statuses = ["Paid", "Failed", "Pending"]
313
+ if status not in valid_statuses:
314
+ raise HTTPException(status_code=400, detail="Invalid status")
315
+
316
+ order.status = status
317
+ order.payment_reference = order_id
318
+
319
+ # Add tracking update
320
+ tracking_entry = OrderTracking(
321
+ order_id=order_id,
322
+ status="Payment Updated",
323
+ message=f"Payment status changed to {status}"
 
 
 
 
 
 
 
 
324
  )
325
+ session.add(tracking_entry)
326
+
327
+ await session.commit()
328
+
329
+ # Send notifications
330
+ await asyncio.gather(
331
+ log_order_tracking(order_id, "Payment Updated", status),
332
+ send_whatsapp_message(
333
+ MANAGEMENT_WHATSAPP_NUMBER,
334
+ f"Payment Update: Order {order_id} - {status}"
335
+ ),
336
+ update_user_order_status(order.user_id, order_id, status)
337
+ )
338
+
339
+ if request.method == "GET":
340
+ return RedirectResponse(
341
+ url=os.getenv("PAYMENT_REDIRECT_URL", "https://default-redirect.com"),
342
+ status_code=303
343
  )
344
+ return JSONResponse(content={"status": "success", "order_id": order_id})
345
+
346
+ except HTTPException as he:
347
+ raise
348
+ except Exception as e:
349
+ logger.error(f"Payment callback error: {str(e)}")
350
+ raise HTTPException(status_code=500, detail="Internal server error")
351
+
352
+ # Enhanced tracking endpoint with caching
353
+ from fastapi_cache.decorator import cache
354
+
355
+ @app.get("/track_order/{order_id}", response_model=List[dict])
356
+ @cache(expire=60) # Cache for 1 minute
357
+ async def get_order_tracking(order_id: str,
358
+ page: int = 1,
359
+ limit: int = 10):
360
  """
361
+ Get order tracking history with pagination
362
  """
363
+ try:
364
+ async with async_session() as session:
365
+ # Get order first to verify existence
366
+ order_result = await session.execute(
367
+ select(Order).where(Order.order_id == order_id)
368
+ if not order_result.scalar_one_or_none():
369
+ raise HTTPException(status_code=404, detail="Order not found")
370
+
371
+ # Get tracking history
372
+ tracking_result = await session.execute(
373
+ select(OrderTracking)
374
+ .where(OrderTracking.order_id == order_id)
375
+ .order_by(OrderTracking.timestamp.desc())
376
+ .offset((page-1)*limit)
377
+ .limit(limit)
378
+ )
379
+
380
+ tracking_updates = tracking_result.scalars().all()
381
+
382
+ if not tracking_updates:
383
+ return JSONResponse(content=[])
384
+
385
+ response = [
386
+ {
387
  "status": update.status,
388
  "message": update.message,
389
  "timestamp": update.timestamp.isoformat(),
390
+ "location": jsonable_encoder(update.location) if update.location else None
391
+ }
392
+ for update in tracking_updates
393
+ ]
394
+
395
+ # Add estimated delivery time
396
+ order = await get_order_details(order_id)
397
+ if order.status == "shipped":
398
+ response[0]["estimated_delivery"] = calculate_eta(order.last_location)
399
+
400
  return JSONResponse(content=response)
 
 
401
 
402
+ except HTTPException as he:
403
+ raise
404
+ except Exception as e:
405
+ logger.error(f"Tracking error: {str(e)}")
406
+ raise HTTPException(status_code=500, detail="Internal server error")
407
+
408
+ # Helper functions
409
+ async def get_order_details(order_id: str):
410
+ async with async_session() as session:
411
+ result = await session.execute(
412
+ select(Order).where(Order.order_id == order_id)
413
+ return result.scalar_one_or_none()
414
+
415
+ async def update_user_order_status(user_id: str, order_id: str, status: str):
416
+ async with async_session() as session:
417
+ await session.execute(
418
+ update(UserProfile)
419
+ .where(UserProfile.user_id == user_id)
420
+ .values(current_order_status=status)
421
+ )
422
+ await session.commit()
423
+
424
+ def calculate_eta(last_location: dict) -> str:
425
+ # Implement actual ETA calculation logic
426
+ return "30 minutes"