|
""" |
|
Created By: ishwor subedi |
|
Date: 2024-08-28 |
|
""" |
|
from collections import Counter, defaultdict |
|
from datetime import datetime, timedelta |
|
from dateutil.parser import isoparse |
|
from fastapi.routing import APIRouter |
|
from src.pipeline.conversai_analytic_pipeline import ConversAIAnalyticPipeline |
|
from fastapi import Request |
|
from src.utils.error_handling import create_success_response, raise_http_exception, \ |
|
success_response_user_management |
|
from src.models.apis_models import FeedbackRequest, DailyActiveEndUserRequest, AverageSessionInteractionRequest, \ |
|
TokenUsageRequest, UserSatisfactionRateRequest |
|
from src import logging as logger |
|
|
|
analytic_endpoints_router = APIRouter(tags=["Analytics Endpoints"]) |
|
|
|
conversai_analytic_pipeline = ConversAIAnalyticPipeline() |
|
|
|
|
|
@analytic_endpoints_router.post("/daily_chat_count") |
|
async def daily_chat_count( |
|
request: DailyActiveEndUserRequest): |
|
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore |
|
logger.info(f">>> daily_chat_count API Triggered by {vectorstore} <<<") |
|
try: |
|
if not start_date or not end_date: |
|
end_date = datetime.now().astimezone().date() |
|
start_date = end_date - timedelta(days=7) |
|
else: |
|
start_date = isoparse(start_date).date() |
|
end_date = isoparse(end_date).date() |
|
|
|
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore) |
|
|
|
dates = [ |
|
isoparse(i["timestamp"]).date() |
|
for i in response |
|
if start_date <= isoparse(i["timestamp"]).date() <= end_date |
|
] |
|
|
|
date_count = Counter(dates) |
|
|
|
data = [{"date": date.isoformat(), "count": count} for date, count in date_count.items()] |
|
|
|
response = create_success_response(code=200, data=dict(output=data)) |
|
logger.info(f">>> daily_chat_count API Response Success for {vectorstore} <<<") |
|
|
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>> daily_chat_count API Response Failed for {vectorstore} {e}<<<") |
|
|
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@analytic_endpoints_router.post("/daily_active_end_user") |
|
async def daily_active_end_user( |
|
request: DailyActiveEndUserRequest |
|
): |
|
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore |
|
logger.info(f">>> daily_active_end_user API Triggered by {vectorstore} <<<") |
|
try: |
|
if not start_date or not end_date: |
|
end_date = datetime.now().astimezone().date() |
|
start_date = end_date - timedelta(days=7) |
|
else: |
|
start_date = isoparse(start_date).date() |
|
end_date = isoparse(end_date).date() |
|
|
|
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore) |
|
|
|
ip_by_date = defaultdict(set) |
|
|
|
for i in response: |
|
timestamp = isoparse(i["timestamp"]) |
|
ip_address = i["IpAddress"] |
|
if start_date <= timestamp.date() <= end_date: |
|
date = timestamp.date() |
|
ip_by_date[date].add(ip_address) |
|
|
|
data = [{"date": date.isoformat(), "terminal": len(ips)} for date, ips in ip_by_date.items() if len(ips) > 1] |
|
|
|
response = create_success_response(code=200, data=dict(output=data)) |
|
logger.info(f">>> daily_active_end_user API Response Success for {vectorstore} <<<") |
|
|
|
return response |
|
except Exception as e: |
|
logger.error(f">>> daily_active_end_user API Response Failed for {vectorstore} {e}<<<") |
|
|
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@analytic_endpoints_router.post("/average_session_interaction") |
|
async def average_session_interaction( |
|
request: AverageSessionInteractionRequest |
|
): |
|
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore |
|
logger.info(f">>> average_session_interaction API Triggered by {vectorstore} <<<") |
|
try: |
|
if not start_date or not end_date: |
|
end_date = datetime.now().astimezone().date() |
|
start_date = end_date - timedelta(days=7) |
|
else: |
|
start_date = isoparse(start_date).date() |
|
end_date = isoparse(end_date).date() |
|
|
|
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore) |
|
|
|
total_messages_by_date = defaultdict(int) |
|
unique_ips_by_date = defaultdict(set) |
|
|
|
for i in response: |
|
timestamp = isoparse(i["timestamp"]) |
|
ip_address = i["IpAddress"] |
|
if start_date <= timestamp.date() <= end_date: |
|
date = timestamp.date() |
|
total_messages_by_date[date] += 1 |
|
unique_ips_by_date[date].add(ip_address) |
|
|
|
data = [] |
|
for date in sorted(total_messages_by_date.keys()): |
|
total_messages = total_messages_by_date[date] |
|
unique_ips = len(unique_ips_by_date[date]) |
|
average_interactions = total_messages / unique_ips if unique_ips > 0 else 0 |
|
data.append({"date": date.isoformat(), "interactions": average_interactions}) |
|
|
|
response = create_success_response(code=200, data=dict(data=data)) |
|
logger.info(f">>> average_session_interaction API Response Success for {vectorstore} <<<") |
|
|
|
return response |
|
except Exception as e: |
|
logger.error(f">>> average_session_interaction API Response Failed for {vectorstore} {e}<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@analytic_endpoints_router.post("/token_usages") |
|
async def token_usages(request: TokenUsageRequest): |
|
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore |
|
logger.info(f">>> token_usages API Triggered by {vectorstore} <<<") |
|
try: |
|
if not start_date or not end_date: |
|
end_date = datetime.now().astimezone().date() |
|
start_date = end_date - timedelta(days=7) |
|
else: |
|
start_date = isoparse(start_date).date() |
|
end_date = isoparse(end_date).date() |
|
|
|
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore) |
|
|
|
token_usage_by_date = defaultdict(int) |
|
|
|
for i in response: |
|
timestamp = isoparse(i["timestamp"]) |
|
if start_date <= timestamp.date() <= end_date: |
|
date = timestamp.date() |
|
response_token_count = i.get("ResponseTokenCount") |
|
if response_token_count is not None: |
|
token_usage_by_date[date] += response_token_count |
|
|
|
data = [{"date": date.isoformat(), "total_tokens": total_tokens} for date, total_tokens in |
|
token_usage_by_date.items()] |
|
|
|
response = create_success_response(code=200, data=dict(output=data)) |
|
logger.info(f">>> token_usages API Response Success for {vectorstore} <<<") |
|
|
|
return response |
|
except Exception as e: |
|
logger.error(f">>> token_usages API Response Failed for {vectorstore} {e}<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@analytic_endpoints_router.post("/add_feedback") |
|
async def add_feedback(req: Request, request: FeedbackRequest): |
|
feedback, user_id, vectorstore = request.feedback, request.user_id, request.vectorstore |
|
try: |
|
logger.info(f">>> add_feedback API Triggered by {request.vectorstore} <<<") |
|
|
|
client_ip = req.client.host |
|
city = conversai_analytic_pipeline.get_ip_info(client_ip) |
|
|
|
conversai_analytic_pipeline.add_feedback_(feedback, user_id, city, client_ip, vectorstore) |
|
|
|
response = success_response_user_management(code=200, message="Add Feedback Sucess") |
|
logger.info(f">>> add_feedback API Response Success for {vectorstore} <<<") |
|
|
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>> add_feedback API Response Failed for {vectorstore} {e}<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@analytic_endpoints_router.post("/user_satisfaction_rate") |
|
async def user_satisfaction_rate( |
|
request: UserSatisfactionRateRequest |
|
): |
|
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore |
|
logger.info(f">>> user_satisfaction_rate API Triggered by {vectorstore} <<<") |
|
try: |
|
if not start_date or not end_date: |
|
end_date = datetime.now().astimezone().date() |
|
start_date = end_date - timedelta(days=7) |
|
else: |
|
start_date = isoparse(start_date).date() |
|
end_date = isoparse(end_date).date() |
|
|
|
feedback_counts = defaultdict(lambda: {"like": 0, "dislike": 0}) |
|
response = conversai_analytic_pipeline.feedback_table_(vectorstore) |
|
for i in response: |
|
timestamp = isoparse(i["timestamp"]) |
|
if start_date <= timestamp.date() <= end_date: |
|
date = timestamp.date() |
|
feedback = i.get("feedback") |
|
if feedback == "like": |
|
feedback_counts[date]["like"] += 1 |
|
elif feedback == "dislike": |
|
feedback_counts[date]["dislike"] += 1 |
|
|
|
data = [] |
|
for date in sorted(feedback_counts.keys()): |
|
like_count = feedback_counts[date]["like"] |
|
dislike_count = feedback_counts[date]["dislike"] |
|
total_feedback = like_count + dislike_count |
|
satisfaction_rate = (like_count / total_feedback * 100) if total_feedback > 0 else 0 |
|
data.append({"date": date.isoformat(), "rate": satisfaction_rate}) |
|
|
|
response = create_success_response(code=200, data=dict(output=data)) |
|
logger.info(f">>> user_satisfaction_rate API Response Success for {vectorstore} <<<") |
|
|
|
return response |
|
except Exception as e: |
|
logger.info(f">>> user_satisfaction_rate API Response Failed for {vectorstore} {e}<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|