""" Created By: ishwor subedi Date: 2024-08-28 """ from collections import Counter, defaultdict from datetime import datetime, timedelta from dateutil.parser import isoparse from fastapi.routing import APIRouter from src.pipeline.conversai_analytic_pipeline import ConversAIAnalyticPipeline from fastapi import Request from src.utils.error_handling import create_success_response, raise_http_exception, \ success_response_user_management from src.models.apis_models import FeedbackRequest, DailyActiveEndUserRequest, AverageSessionInteractionRequest, \ TokenUsageRequest, UserSatisfactionRateRequest from src import logging as logger analytic_endpoints_router = APIRouter(tags=["Analytics Endpoints"]) conversai_analytic_pipeline = ConversAIAnalyticPipeline() @analytic_endpoints_router.post("/daily_chat_count") async def daily_chat_count( request: DailyActiveEndUserRequest): start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore logger.info(f">>> daily_chat_count API Triggered by {vectorstore} <<<") try: if not start_date or not end_date: end_date = datetime.now().astimezone().date() start_date = end_date - timedelta(days=7) else: start_date = isoparse(start_date).date() end_date = isoparse(end_date).date() response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore) dates = [ isoparse(i["timestamp"]).date() for i in response if start_date <= isoparse(i["timestamp"]).date() <= end_date ] date_count = Counter(dates) data = [{"date": date.isoformat(), "count": count} for date, count in date_count.items()] response = create_success_response(code=200, data=dict(output=data)) logger.info(f">>> daily_chat_count API Response Success for {vectorstore} <<<") return response except Exception as e: logger.error(f">>> daily_chat_count API Response Failed for {vectorstore} {e}<<<") raise_http_exception(500, "Internal Server Error") @analytic_endpoints_router.post("/daily_active_end_user") async def daily_active_end_user( request: DailyActiveEndUserRequest ): start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore logger.info(f">>> daily_active_end_user API Triggered by {vectorstore} <<<") try: if not start_date or not end_date: end_date = datetime.now().astimezone().date() start_date = end_date - timedelta(days=7) else: start_date = isoparse(start_date).date() end_date = isoparse(end_date).date() response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore) ip_by_date = defaultdict(set) for i in response: timestamp = isoparse(i["timestamp"]) ip_address = i["IpAddress"] if start_date <= timestamp.date() <= end_date: date = timestamp.date() ip_by_date[date].add(ip_address) data = [{"date": date.isoformat(), "terminal": len(ips)} for date, ips in ip_by_date.items() if len(ips) > 1] response = create_success_response(code=200, data=dict(output=data)) logger.info(f">>> daily_active_end_user API Response Success for {vectorstore} <<<") return response except Exception as e: logger.error(f">>> daily_active_end_user API Response Failed for {vectorstore} {e}<<<") raise_http_exception(500, "Internal Server Error") @analytic_endpoints_router.post("/average_session_interaction") async def average_session_interaction( request: AverageSessionInteractionRequest ): start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore logger.info(f">>> average_session_interaction API Triggered by {vectorstore} <<<") try: if not start_date or not end_date: end_date = datetime.now().astimezone().date() start_date = end_date - timedelta(days=7) else: start_date = isoparse(start_date).date() end_date = isoparse(end_date).date() response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore) total_messages_by_date = defaultdict(int) unique_ips_by_date = defaultdict(set) for i in response: timestamp = isoparse(i["timestamp"]) ip_address = i["IpAddress"] if start_date <= timestamp.date() <= end_date: date = timestamp.date() total_messages_by_date[date] += 1 unique_ips_by_date[date].add(ip_address) data = [] for date in sorted(total_messages_by_date.keys()): total_messages = total_messages_by_date[date] unique_ips = len(unique_ips_by_date[date]) average_interactions = total_messages / unique_ips if unique_ips > 0 else 0 data.append({"date": date.isoformat(), "interactions": average_interactions}) response = create_success_response(code=200, data=dict(data=data)) logger.info(f">>> average_session_interaction API Response Success for {vectorstore} <<<") return response except Exception as e: logger.error(f">>> average_session_interaction API Response Failed for {vectorstore} {e}<<<") raise_http_exception(500, "Internal Server Error") @analytic_endpoints_router.post("/token_usages") async def token_usages(request: TokenUsageRequest): start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore logger.info(f">>> token_usages API Triggered by {vectorstore} <<<") try: if not start_date or not end_date: end_date = datetime.now().astimezone().date() start_date = end_date - timedelta(days=7) else: start_date = isoparse(start_date).date() end_date = isoparse(end_date).date() response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore) token_usage_by_date = defaultdict(int) for i in response: timestamp = isoparse(i["timestamp"]) if start_date <= timestamp.date() <= end_date: date = timestamp.date() response_token_count = i.get("ResponseTokenCount") if response_token_count is not None: token_usage_by_date[date] += response_token_count data = [{"date": date.isoformat(), "total_tokens": total_tokens} for date, total_tokens in token_usage_by_date.items()] response = create_success_response(code=200, data=dict(output=data)) logger.info(f">>> token_usages API Response Success for {vectorstore} <<<") return response except Exception as e: logger.error(f">>> token_usages API Response Failed for {vectorstore} {e}<<<") raise_http_exception(500, "Internal Server Error") @analytic_endpoints_router.post("/add_feedback") async def add_feedback(req: Request, request: FeedbackRequest): feedback, user_id, vectorstore = request.feedback, request.user_id, request.vectorstore try: logger.info(f">>> add_feedback API Triggered by {request.vectorstore} <<<") client_ip = req.client.host city = conversai_analytic_pipeline.get_ip_info(client_ip) conversai_analytic_pipeline.add_feedback_(feedback, user_id, city, client_ip, vectorstore) response = success_response_user_management(code=200, message="Add Feedback Sucess") logger.info(f">>> add_feedback API Response Success for {vectorstore} <<<") return response except Exception as e: logger.error(f">>> add_feedback API Response Failed for {vectorstore} {e}<<<") raise_http_exception(500, "Internal Server Error") @analytic_endpoints_router.post("/user_satisfaction_rate") async def user_satisfaction_rate( request: UserSatisfactionRateRequest ): start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore logger.info(f">>> user_satisfaction_rate API Triggered by {vectorstore} <<<") try: if not start_date or not end_date: end_date = datetime.now().astimezone().date() start_date = end_date - timedelta(days=7) else: start_date = isoparse(start_date).date() end_date = isoparse(end_date).date() feedback_counts = defaultdict(lambda: {"like": 0, "dislike": 0}) response = conversai_analytic_pipeline.feedback_table_(vectorstore) for i in response: timestamp = isoparse(i["timestamp"]) if start_date <= timestamp.date() <= end_date: date = timestamp.date() feedback = i.get("feedback") if feedback == "like": feedback_counts[date]["like"] += 1 elif feedback == "dislike": feedback_counts[date]["dislike"] += 1 data = [] for date in sorted(feedback_counts.keys()): like_count = feedback_counts[date]["like"] dislike_count = feedback_counts[date]["dislike"] total_feedback = like_count + dislike_count satisfaction_rate = (like_count / total_feedback * 100) if total_feedback > 0 else 0 data.append({"date": date.isoformat(), "rate": satisfaction_rate}) response = create_success_response(code=200, data=dict(output=data)) logger.info(f">>> user_satisfaction_rate API Response Success for {vectorstore} <<<") return response except Exception as e: logger.info(f">>> user_satisfaction_rate API Response Failed for {vectorstore} {e}<<<") raise_http_exception(500, "Internal Server Error")