ishworrsubedii's picture
Initial commit
a4a2363
"""
Created By: ishwor subedi
Date: 2024-08-28
"""
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from dateutil.parser import isoparse
from fastapi.routing import APIRouter
from src.pipeline.conversai_analytic_pipeline import ConversAIAnalyticPipeline
from fastapi import Request
from src.utils.error_handling import create_success_response, raise_http_exception, \
success_response_user_management
from src.models.apis_models import FeedbackRequest, DailyActiveEndUserRequest, AverageSessionInteractionRequest, \
TokenUsageRequest, UserSatisfactionRateRequest
from src import logging as logger
analytic_endpoints_router = APIRouter(tags=["Analytics Endpoints"])
conversai_analytic_pipeline = ConversAIAnalyticPipeline()
@analytic_endpoints_router.post("/daily_chat_count")
async def daily_chat_count(
request: DailyActiveEndUserRequest):
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
logger.info(f">>> daily_chat_count API Triggered by {vectorstore} <<<")
try:
if not start_date or not end_date:
end_date = datetime.now().astimezone().date()
start_date = end_date - timedelta(days=7)
else:
start_date = isoparse(start_date).date()
end_date = isoparse(end_date).date()
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)
dates = [
isoparse(i["timestamp"]).date()
for i in response
if start_date <= isoparse(i["timestamp"]).date() <= end_date
]
date_count = Counter(dates)
data = [{"date": date.isoformat(), "count": count} for date, count in date_count.items()]
response = create_success_response(code=200, data=dict(output=data))
logger.info(f">>> daily_chat_count API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.error(f">>> daily_chat_count API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
@analytic_endpoints_router.post("/daily_active_end_user")
async def daily_active_end_user(
request: DailyActiveEndUserRequest
):
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
logger.info(f">>> daily_active_end_user API Triggered by {vectorstore} <<<")
try:
if not start_date or not end_date:
end_date = datetime.now().astimezone().date()
start_date = end_date - timedelta(days=7)
else:
start_date = isoparse(start_date).date()
end_date = isoparse(end_date).date()
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)
ip_by_date = defaultdict(set)
for i in response:
timestamp = isoparse(i["timestamp"])
ip_address = i["IpAddress"]
if start_date <= timestamp.date() <= end_date:
date = timestamp.date()
ip_by_date[date].add(ip_address)
data = [{"date": date.isoformat(), "terminal": len(ips)} for date, ips in ip_by_date.items() if len(ips) > 1]
response = create_success_response(code=200, data=dict(output=data))
logger.info(f">>> daily_active_end_user API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.error(f">>> daily_active_end_user API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
@analytic_endpoints_router.post("/average_session_interaction")
async def average_session_interaction(
request: AverageSessionInteractionRequest
):
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
logger.info(f">>> average_session_interaction API Triggered by {vectorstore} <<<")
try:
if not start_date or not end_date:
end_date = datetime.now().astimezone().date()
start_date = end_date - timedelta(days=7)
else:
start_date = isoparse(start_date).date()
end_date = isoparse(end_date).date()
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)
total_messages_by_date = defaultdict(int)
unique_ips_by_date = defaultdict(set)
for i in response:
timestamp = isoparse(i["timestamp"])
ip_address = i["IpAddress"]
if start_date <= timestamp.date() <= end_date:
date = timestamp.date()
total_messages_by_date[date] += 1
unique_ips_by_date[date].add(ip_address)
data = []
for date in sorted(total_messages_by_date.keys()):
total_messages = total_messages_by_date[date]
unique_ips = len(unique_ips_by_date[date])
average_interactions = total_messages / unique_ips if unique_ips > 0 else 0
data.append({"date": date.isoformat(), "interactions": average_interactions})
response = create_success_response(code=200, data=dict(data=data))
logger.info(f">>> average_session_interaction API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.error(f">>> average_session_interaction API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
@analytic_endpoints_router.post("/token_usages")
async def token_usages(request: TokenUsageRequest):
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
logger.info(f">>> token_usages API Triggered by {vectorstore} <<<")
try:
if not start_date or not end_date:
end_date = datetime.now().astimezone().date()
start_date = end_date - timedelta(days=7)
else:
start_date = isoparse(start_date).date()
end_date = isoparse(end_date).date()
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)
token_usage_by_date = defaultdict(int)
for i in response:
timestamp = isoparse(i["timestamp"])
if start_date <= timestamp.date() <= end_date:
date = timestamp.date()
response_token_count = i.get("ResponseTokenCount")
if response_token_count is not None:
token_usage_by_date[date] += response_token_count
data = [{"date": date.isoformat(), "total_tokens": total_tokens} for date, total_tokens in
token_usage_by_date.items()]
response = create_success_response(code=200, data=dict(output=data))
logger.info(f">>> token_usages API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.error(f">>> token_usages API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
@analytic_endpoints_router.post("/add_feedback")
async def add_feedback(req: Request, request: FeedbackRequest):
feedback, user_id, vectorstore = request.feedback, request.user_id, request.vectorstore
try:
logger.info(f">>> add_feedback API Triggered by {request.vectorstore} <<<")
client_ip = req.client.host
city = conversai_analytic_pipeline.get_ip_info(client_ip)
conversai_analytic_pipeline.add_feedback_(feedback, user_id, city, client_ip, vectorstore)
response = success_response_user_management(code=200, message="Add Feedback Sucess")
logger.info(f">>> add_feedback API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.error(f">>> add_feedback API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
@analytic_endpoints_router.post("/user_satisfaction_rate")
async def user_satisfaction_rate(
request: UserSatisfactionRateRequest
):
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
logger.info(f">>> user_satisfaction_rate API Triggered by {vectorstore} <<<")
try:
if not start_date or not end_date:
end_date = datetime.now().astimezone().date()
start_date = end_date - timedelta(days=7)
else:
start_date = isoparse(start_date).date()
end_date = isoparse(end_date).date()
feedback_counts = defaultdict(lambda: {"like": 0, "dislike": 0})
response = conversai_analytic_pipeline.feedback_table_(vectorstore)
for i in response:
timestamp = isoparse(i["timestamp"])
if start_date <= timestamp.date() <= end_date:
date = timestamp.date()
feedback = i.get("feedback")
if feedback == "like":
feedback_counts[date]["like"] += 1
elif feedback == "dislike":
feedback_counts[date]["dislike"] += 1
data = []
for date in sorted(feedback_counts.keys()):
like_count = feedback_counts[date]["like"]
dislike_count = feedback_counts[date]["dislike"]
total_feedback = like_count + dislike_count
satisfaction_rate = (like_count / total_feedback * 100) if total_feedback > 0 else 0
data.append({"date": date.isoformat(), "rate": satisfaction_rate})
response = create_success_response(code=200, data=dict(output=data))
logger.info(f">>> user_satisfaction_rate API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.info(f">>> user_satisfaction_rate API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")