|
import sys
|
|
import pandas as pd
|
|
from datasets import load_dataset
|
|
from anime_recommender.loggers.logging import logging
|
|
from anime_recommender.exception.exception import AnimeRecommendorException
|
|
from anime_recommender.entity.config_entity import DataIngestionConfig
|
|
from anime_recommender.entity.artifact_entity import DataIngestionArtifact
|
|
from anime_recommender.utils.main_utils.utils import export_data_to_dataframe
|
|
|
|
class DataIngestion:
|
|
"""
|
|
A class responsible for data ingestion in the anime recommender system.
|
|
|
|
This class fetches data from Hugging Face datasets, converts it into pandas DataFrame format,
|
|
and exports the processed data to storage for further use in the pipeline.
|
|
"""
|
|
def __init__(self, data_ingestion_config: DataIngestionConfig):
|
|
"""
|
|
Initializes the DataIngestion class with the provided configuration.
|
|
|
|
Args:
|
|
data_ingestion_config (DataIngestionConfig): Configuration settings for data ingestion.
|
|
"""
|
|
try:
|
|
self.data_ingestion_config = data_ingestion_config
|
|
except Exception as e:
|
|
raise AnimeRecommendorException(e, sys)
|
|
|
|
def fetch_data_from_huggingface(self, dataset_path: str, split: str = None) -> pd.DataFrame:
|
|
"""
|
|
Fetches a dataset from Hugging Face and converts it into a pandas DataFrame.
|
|
Args:
|
|
dataset_path (str): The path to the Hugging Face dataset.
|
|
split (str, optional): The dataset split to be fetched (e.g., 'train', 'test'). Defaults to None.
|
|
|
|
Returns:
|
|
pd.DataFrame: The dataset converted into a pandas DataFrame.
|
|
"""
|
|
try:
|
|
logging.info(f"Fetching data from Hugging Face dataset: {dataset_path}")
|
|
|
|
dataset = load_dataset(dataset_path, split=split)
|
|
|
|
|
|
df = pd.DataFrame(dataset['train'])
|
|
|
|
|
|
logging.info(f"Shape of the dataframe: {df.shape}")
|
|
logging.info(f"Column names: {df.columns}")
|
|
logging.info(f"Preview of the DataFrame:\n{df.head()}")
|
|
logging.info("Data fetched successfully from Hugging Face.")
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logging.error(f"An error occurred while fetching data: {str(e)}")
|
|
raise AnimeRecommendorException(e, sys)
|
|
|
|
def ingest_data(self) -> DataIngestionArtifact:
|
|
"""
|
|
Orchestrates the data ingestion process, fetching datasets and saving them to the feature store.
|
|
Returns:
|
|
DataIngestionArtifact: An artifact containing paths to the ingested datasets.
|
|
"""
|
|
try:
|
|
|
|
anime_df = self.fetch_data_from_huggingface(self.data_ingestion_config.anime_filepath)
|
|
rating_df = self.fetch_data_from_huggingface(self.data_ingestion_config.rating_filepath)
|
|
|
|
|
|
export_data_to_dataframe(anime_df, file_path=self.data_ingestion_config.feature_store_anime_file_path)
|
|
export_data_to_dataframe(rating_df, file_path=self.data_ingestion_config.feature_store_userrating_file_path)
|
|
|
|
|
|
dataingestionartifact = DataIngestionArtifact(
|
|
feature_store_anime_file_path=self.data_ingestion_config.feature_store_anime_file_path,
|
|
feature_store_userrating_file_path=self.data_ingestion_config.feature_store_userrating_file_path
|
|
)
|
|
return dataingestionartifact
|
|
|
|
except Exception as e:
|
|
raise AnimeRecommendorException(e, sys)
|
|
|