import os
from tempfile import TemporaryDirectory
from typing import Dict, Mapping, Optional, Sequence, Union

import pandas as pd
from datasets import load_dataset as hf_load_dataset
from tqdm import tqdm

from .operator import SourceOperator
from .stream import MultiStream, Stream

try:
    import ibm_boto3
    from ibm_botocore.client import ClientError

    ibm_boto3_available = True
except ImportError:
    ibm_boto3_available = False


class Loader(SourceOperator):
    pass


class LoadHF(Loader):
    path: str
    name: Optional[str] = None
    data_dir: Optional[str] = None
    data_files: Optional[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]] = None
    streaming: bool = True
    cached = False

    def process(self):
        dataset = hf_load_dataset(
            self.path, name=self.name, data_dir=self.data_dir, data_files=self.data_files, streaming=self.streaming
        )

        return MultiStream.from_iterables(dataset)


class LoadCSV(Loader):
    files: Dict[str, str]
    chunksize: int = 1000

    def load_csv(self, file):
        for chunk in pd.read_csv(file, chunksize=self.chunksize):
            for index, row in chunk.iterrows():
                yield row.to_dict()

    def process(self):
        return MultiStream(
            {name: Stream(generator=self.load_csv, gen_kwargs={"file": file}) for name, file in self.files.items()}
        )


class LoadFromIBMCloud(Loader):
    endpoint_url_env: str
    aws_access_key_id_env: str
    aws_secret_access_key_env: str
    bucket_name: str
    data_dir: str
    data_files: Sequence[str]

    def _download_from_cos(self, cos, bucket_name, item_name, local_file):
        print(f"Downloading {item_name} from {bucket_name} COS to {local_file}")
        try:
            response = cos.Object(bucket_name, item_name).get()
            size = response["ContentLength"]
        except Exception as e:
            raise Exception(f"Unabled to access {item_name} in {bucket_name} in COS", e)

        progress_bar = tqdm(total=size, unit="iB", unit_scale=True)

        def upload_progress(chunk):
            progress_bar.update(chunk)

        try:
            cos.Bucket(bucket_name).download_file(item_name, local_file, Callback=upload_progress)
            print("\nDownload Successful")
        except Exception as e:
            raise Exception(f"Unabled to download {item_name} in {bucket_name}", e)

    def prepare(self):
        super().prepare()
        self.endpoint_url = os.getenv(self.endpoint_url_env)
        self.aws_access_key_id = os.getenv(self.aws_access_key_id_env)
        self.aws_secret_access_key = os.getenv(self.aws_secret_access_key_env)

    def verify(self):
        super().verify()
        assert (
            ibm_boto3_available
        ), f"Please install ibm_boto3 in order to use the LoadFromIBMCloud loader (using `pip install ibm-cos-sdk`) "
        assert self.endpoint_url is not None, f"Please set the {self.endpoint_url_env} environmental variable"
        assert self.aws_access_key_id is not None, f"Please set {self.aws_access_key_id_env} environmental variable"
        assert (
            self.aws_secret_access_key is not None
        ), f"Please set {self.aws_secret_access_key_env} environmental variable"

    def process(self):
        cos = ibm_boto3.resource(
            "s3",
            aws_access_key_id=self.aws_access_key_id,
            aws_secret_access_key=self.aws_secret_access_key,
            endpoint_url=self.endpoint_url,
        )

        with TemporaryDirectory() as temp_directory:
            for data_file in self.data_files:
                self._download_from_cos(
                    cos, self.bucket_name, self.data_dir + "/" + data_file, temp_directory + "/" + data_file
                )
            dataset = hf_load_dataset(temp_directory, streaming=False)

        return MultiStream.from_iterables(dataset)