from fastapi import FastAPI, HTTPException, Request, BackgroundTasks from starlette.responses import RedirectResponse from src.json_creation import ( final_counts, fetch_response, OllamaContextQuery, ChatManager, ) from src.whatsapp_integration import gathering_data, normal_message from datetime import datetime from typing import Optional import logging from pydantic import BaseModel from config import settings # PHONE_NUMBER_ID = os.environ.get("PHONE_NUMBER_ID") # BEARER_TOKEN = os.environ.get("BEARER_TOKEN") PHONE_NUMBER_ID = settings.PHONE_NUMBER_ID BEARER_TOKEN = settings.BEARER_TOKEN logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) app = FastAPI() chat_manager = ChatManager() class UserQueryInput(BaseModel): phone_number: str user_query: str @app.get("/") def read_root(): return RedirectResponse(url="/docs") @app.get("/fetch_data") def fetch_data( store_id: str, brand_id: str, start_date: Optional[str] = None, end_date: Optional[str] = None, ): if start_date and end_date: try: start_date_obj = datetime.strptime(start_date, "%Y-%m-%d") end_date_obj = datetime.strptime(end_date, "%Y-%m-%d") except ValueError: return {"error": "Dates must be in YYYY-MM-DD format."} data = fetch_response(store_id, brand_id, None, start_date_obj, end_date_obj) if data: return final_counts(data) else: return {"error": "Failed to fetch data"} @app.post("/process_message") def process_message(phone_number: str, user_query: str): str_chat_history = chat_manager.handle_message(phone_number, user_query) return {"chat_history": str_chat_history} @app.post("/process_query") def process_query(input_data: UserQueryInput): user_query = input_data.user_query ollama_class = OllamaContextQuery() chat_history = chat_manager.handle_message(input_data.phone_number, user_query) try: context_query, is_tool_invoke = ollama_class.ollama_context_query( chat_history, user_query ) main_llm_response = "" final_output = context_query if is_tool_invoke: main_llm_response = ollama_class.ollama_tool_call(user_query) final_output = ollama_class.summarised_output( messages=main_llm_response, chat_history=chat_history, context_query=context_query, user_query=input_data.user_query, ) session = chat_manager.get_or_create_session(input_data.phone_number) # if is_greet: # print("Here 1") # chat_manager.save_message( # session.id, input_data.phone_number, "ASSISTANT", main_llm_response # ) # print("Here 2") # return main_llm_response print("Here 1") chat_manager.save_message( session.id, input_data.phone_number, "ASSISTANT", final_output ) print("Here 2") return final_output except Exception as e: print(e) raise HTTPException(status_code=500, detail=str(e)) @app.get("/receive_msg", tags=["Whatsapp Webhook"]) async def whatsapp_webhook_get(request: Request): # Extract query parameters from the request query_params = request.query_params hub_mode = query_params.get("hub.mode") hub_challenge = query_params.get("hub.challenge") hub_verify_token = query_params.get("hub.verify_token") if ( hub_mode == "subscribe" and hub_challenge and hub_verify_token == "Sahl-analytics-bot" ): return int(hub_challenge) else: return {"detail": "Invalid request or verification token mismatch"}, 400 @app.post("/receive_msg", tags=["Whatsapp Webhook"]) async def whatsapp_webhook(request: Request, background_tasks: BackgroundTasks): data = await request.json() background_tasks.add_task(call_query, data) return "200 OK HTTPS." def call_query(data): try: if ( data["entry"][0]["changes"][0]["value"]["metadata"]["phone_number_id"] == PHONE_NUMBER_ID ): print("\n New Line starting from here :=" + "=" * 150) print(data) try: phone_number, text = gathering_data(data) print(phone_number) print(text) sample_input = UserQueryInput( phone_number=phone_number, user_query=text ) res = process_query(sample_input) normal_message(res, phone_number) except Exception as e: print("Exception in post whatsapp request :::::: ", e) except BaseException: pass if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)