import csv import glob import json from concurrent.futures import Future, ThreadPoolExecutor from typing import Any, Callable, TypeVar _T = TypeVar("_T") def execute_parallel(func: Callable[..., _T], kwargs_list: list[dict[str, Any]], max_workers: int) -> list[_T]: """並行処理を行う Args: func (Callable): 並行処理したい関数 kwargs_list (list[dict[str, Any]]): 関数の引数(dict型)のリスト max_workers (int): 並行処理数 Returns: list[Any]: 関数の戻り値のリスト """ response_list: list[Future[_T]] = [] with ThreadPoolExecutor(max_workers=max_workers) as e: for kwargs in kwargs_list: response: Future[_T] = e.submit(func, **kwargs) response_list.append(response) return [r.result() for r in response_list] def _load_json_file(file_path: str) -> Any: with open(file_path, "r", encoding="utf-8") as json_file: data = json.load(json_file) return data def make_log_csv(log_dir: str, csv_file_name: str = "log.csv") -> None: """ログデータのcsvを保存 Args: log_dir (str): ログデータが保存されているディレクトリ csv_file_name (str, optional): 保存するcsvファイル名. Defaults to "log.csv". """ # ディレクトリ内のJSONファイルのリストを取得 # TODO: エラーキャッチ json_files = sorted([f for f in glob.glob(f"{log_dir}/*.json")], key=lambda x: int(x.split("/")[-1].split(".")[0])) # すべてのJSONファイルからユニークなキーを取得 columns = [] data_list: list[dict[Any, Any]] = [] keys_set = set() for json_file in json_files: data = _load_json_file(json_file) if isinstance(data, dict): for key in data.keys(): if key not in keys_set: keys_set.add(key) columns.append(key) data_list.append(data) # CSVファイルを作成し、ヘッダーを書き込む with open(csv_file_name, "w", encoding="utf-8", newline="") as csv_file: writer = csv.writer(csv_file) writer.writerow(columns) # JSONファイルからデータを読み取り、CSVファイルに書き込む for data in data_list: row = [data.get(key, "") for key in columns] writer.writerow(row) print(f"saved csv file: {csv_file_name}")