import os import sys src_directory = os.path.abspath(os.path.join( os.path.dirname(__file__), "../..", "src")) sys.path.append(src_directory) from services import data_processing import pandas as pd from collections import Counter from sqlalchemy.orm import Session def get_trending_videos_count(): df = data_processing.get_updated_df() trending_counts= df.groupby(["trending_date"])['trending_date'].count() return trending_counts def get_most_popular_categories(): df = data_processing.get_updated_df() category_counts = df.groupby(["category_name"])["category_name"].count() return category_counts def get_views_vs_likes(): df = data_processing.get_updated_df() scatter_data = df[["views", "likes"]].dropna() return scatter_data def get_like_ratio_distribution(): df = data_processing.get_updated_df() data= df[["video_id","views","likes"]].copy() data = data[data["views"] > 0] data["like_ratio"] = data["likes"] / data["views"] return data def get_top_liked_videos(top_n=10): df = data_processing.get_updated_df() top_videos = df[["title", "likes"]].dropna().sort_values(by="likes", ascending=False).head(10) return top_videos def get_trending_channels(): df = data_processing.get_updated_df() data = df[["channelTitle","publishedAt"]].copy() return data def calculate_channel_growth(): data = data_processing.get_updated_df() data["publishedAt"] = pd.to_datetime(data["publishedAt"], errors="coerce") data.dropna(subset=["publishedAt"], inplace=True) data["published_month"] = data["publishedAt"].dt.to_period("M").astype(str) grouped_data = data.groupby(["published_month", "channelTitle"]).size().reset_index(name="video_count") return grouped_data def process_tags(): data = data_processing.get_updated_df() tags = data["tags"].dropna().str.lower().str.split("|") all_tags = [tag.strip() for sublist in tags for tag in sublist if tag.strip()] tag_counts = Counter(all_tags) tag_data = [{"tag": tag, "count": count} for tag, count in tag_counts.items()] return {"tags": tag_data} def analyze_trending_duration(): data = data_processing.get_updated_df() trending_days = data.groupby("video_id")["trending_date"].count().reset_index() trending_days.columns = ["video_id", "days_trending"] views_growth = data.groupby("trending_date")["views"].mean().reset_index() return { "lifespan": trending_days.to_dict(orient="records"), "views_growth": views_growth.to_dict(orient="records") } def analyze_upload_patterns(mode: str): data = data_processing.get_updated_df() data["publishedAt"] = pd.to_datetime(data["publishedAt"], errors="coerce") if mode == "hour": data["upload_hour"] = data["publishedAt"].dt.hour hourly_counts = data["upload_hour"].value_counts().sort_index().reset_index() hourly_counts.columns = ["hour", "count"] return hourly_counts.to_dict(orient="records") elif mode == "day": data["upload_day"] = data["publishedAt"].dt.day_name() daily_counts = data["upload_day"].value_counts().reindex( ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] ).reset_index() daily_counts.columns = ["day", "count"] return daily_counts.to_dict(orient="records") return {"error": "Invalid mode"} def category_like_view_ratio(): data = data_processing.get_updated_df() data["like_view_ratio"] = data["likes"] / data["views"] category_data = data.groupby("category_name")["like_view_ratio"].mean().reset_index() return category_data.to_dict(orient="records") def category_comment_engagement(): data = data_processing.get_updated_df() return data[["category_name", "comment_count", "views", "likes"]].dropna().to_dict(orient="records") if __name__ == "__main__": pass