# Standard Library Imports import datetime import threading import time import traceback from pathlib import Path import enum # Third-Party Imports from loguru import logger logger.add("./data/log/file_{time}.log", rotation="500 MB") # Automatically rotate too big log files # Local Imports from .ansi_utils import ansi_color_str, ansi_link_str from .attr_utils import get_prev_caller_info, get_caller_info, get_thread_info, get_prev_frame, get_prev_frame_from_frame, is_of_type from .table_utils import format_property, format_table from .extended_symbols import ExtendedSymbols, draw_box, draw_arrow, format_arrow #from ..mew_log.mew_context import MewContext, resolve_context # Constants lastLog = None # Stores the most recent log entry debug_log = [] # Stores all log entries startTime=time.time() #settings vars enable_log = True enable_log_to_file = False log_filename = "log" log_file_path = "data/log/log.txt" enable_fancy = True enable_use_color = True type_color='yellow' value_color='bright_yellow' arrow_color = 'bright_white' def allow_curly_braces(original_string): if "{" in original_string or "}" in original_string: escaped_string = original_string.replace("{", "{{").replace("}", "}}") #print("Escaped String:", escaped_string) # Debug output return escaped_string return original_string def format_arg(arg, use_color, fancy): def is_complex_type(obj): return is_of_type(obj, (list, set, dict)) or hasattr(obj, '__dict__') type_str = f"<{type(arg).__name__}>" value_str = repr(arg) if not isinstance(arg, str) and not fancy else format_table(arg, use_color=use_color) newline_if_needed = '\n' if is_complex_type(arg) else "" formatted_arg = f"{type_str}:{newline_if_needed}{value_str}" formatted_arg = allow_curly_braces(formatted_arg) return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg def format_kwarg(kw, arg, use_color, fancy): name_str = ansi_color_str(kw, fg=kw_color) if use_color else kw arg_str = format_arg(arg, use_color, fancy) formatted_arg = f"{name_str}: {arg_str}" return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg def format_args(use_color=enable_use_color, fancy=enable_fancy, *args, **kwargs): formatted_args = [ format_arg(arg, use_color, fancy) for arg in args] formatted_kwargs = {k: format_kwarg(k, v, use_color, fancy) for k, v in kwargs.items()} return formatted_args, formatted_kwargs def make_formatted_msg(msg, *args, **kwargs): #fetch format vars from kwargs fancy = kwargs.pop('fancy', enable_fancy) use_color = kwargs.pop('use_color', enable_use_color) context_depth = kwargs.pop('context_depth', None) context = kwargs.pop('context', None) # print(f"args={args}") # print(f"kwargs={kwargs}") formatted_args, formatted_kwargs = format_args(use_color, fancy, *args, **kwargs) msgf = stringf(msg, *formatted_args, **formatted_kwargs) #context = MewContext.resolve_context(context, depth, steps=5) if fancy: formatted_msg = f"\n ~ | {get_timestamp()} | {get_thread_info(get_prev_caller_info(msgf, use_color=use_color, steps=2))}" else: formatted_msg = f"\n ~ | {get_timestamp()} | {context}:{msgf}" return formatted_msg def stringf(s: str, *args, **kwargs): # s = allow_curly_braces(s) # Uncomment or modify as needed if not args and not kwargs: #print("Both args and kwargs are empty.") return s else: return s.format(*args, **kwargs) # if not args: # print("Args is empty.") # if not kwargs: # print("Kwargs is empty.") # print(s) # print(f"args: {args}") # print(f"kwargs: {kwargs}") # return s.format(*args, **kwargs) # No arguments or keyword arguments # stringf("Hello, World!") # # With positional arguments # stringf("Hello, {}!", "World") # # With keyword arguments # stringf("Hello, {name}!", name="Alice") # # With both args and kwargs empty (only prints "Hello, World!") # stringf("Hello, World!") def ensure_directory_exists(directory): path = Path(directory) path.mkdir(parents=True, exist_ok=True) def ensure_file_exists(file_path): path = Path(file_path) path.touch(exist_ok=True) #from ..mew_log.mew_log_helper import MewLogHelper # Logging Configuration # Configure Loguru's logger to use a custom format import sys import io # Set UTF-8 encoding for stdout and stderr #sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True) #sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True) #logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", backtrace=True) logger.configure(handlers=[ { "sink": sys.stdout, "format": "{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", # | {file}:{line}:{function} "level": "INFO" }, { "sink": sys.stdout, "format": "{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", # | {file}:{line}:{function} "level": "WARNING" }, { "sink": sys.stderr, "format": "{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", #| {file}:{line}:{function} "level": "ERROR" } ]) # Logging Configuration def init_log(log_config): global enable_log_to_file, enable_log, log_filename, log_file_path enable_log_to_file = log_config['log_to_file'] clear_file_first = log_config['clear_file_first'] enable_log = log_config['enable_log'] log_filename = log_config['log_filename'] log_dir = 'data/log/' ensure_directory_exists(log_dir) log_file_path = f"data/log/{log_filename}.txt" ensure_file_exists(log_file_path) with open(log_file_path, 'w') as fp: pass print(f'init_log: \n ~ enable_log: {enable_log} \n ~ enable_log_to_file: {enable_log_to_file} \n ~ log_file_path: {log_file_path}') def _update_log(formatted_msg_str): global lastLog, debug_log, enable_log_to_file lastLog = formatted_msg_str debug_log.append(lastLog) if enable_log_to_file: log_file(lastLog) def log_file(msg, file_path = None): global log_file_path if not enable_log: return if file_path is None: file_path = log_file_path curTime = time.time() elapsedTime = curTime - startTime with open(log_file_path, 'w') as fp: fp.write(f"\n ~ | time={elapsedTime:.2f}s ~ {msg}") #logger.add(log_file_path, format="time={elapsedTime:.2f}s ~ {msg}") def log_info(msg, *args,**kwargs): if not enable_log: return formatted_msg_str = make_formatted_msg(msg, *args,**kwargs) _update_log(formatted_msg_str) formatted_log_str = lastLog print(formatted_log_str) #logger.info(formatted_log_str) def log_warning(msg, *args,**kwargs): if not enable_log: return formatted_msg_str = make_formatted_msg(msg, *args,**kwargs) _update_log(formatted_msg_str) formatted_log_str = lastLog #print(formatted_log_str) logger.warning(formatted_log_str) def log_error(msg, e, *args,**kwargs): if not enable_log: return formatted_msg_str = f"\n===^_^===\n{make_formatted_msg(msg, *args, **kwargs)}" formatted_msg_str += f"{trace_error(e)}\n===>_<===" _update_log(formatted_msg_str) formatted_log_str = lastLog #print(formatted_log_str) logger.error(formatted_log_str) import functools import traceback def log_function(func): @functools.wraps(func) def wrapper(*args, **kwargs): def get_func_args(): arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] func_args = dict(zip(arg_names, args)) func_args.update(kwargs) try: arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] func_args = dict(zip(arg_names, args)) func_args.update(kwargs) func_args_str = format_table(func_args, headers=None, tablefmt='simple') #print(get_prev_caller_info(f"({func_args_str} )")) result = func(*args, **kwargs) print(get_thread_info(get_prev_caller_info(f"({func_args} ) -> result: {format_arg(result, use_color=True, fancy=True)}"))) return result except Exception as e: arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] func_args = dict(zip(arg_names, args)) func_args.update(kwargs) logger.error(f"Exception: {str(e)}") logger.error(f"Format Traceback: {traceback.format_exc()}") raise # Re-raise the exception after logging return wrapper # @log_function # def dive(a, b): # return a / b # dive(2,3) # dive(12,3) # dive(14,3) # dive(21,3) # dive(21,0) error_color = 'bright_red' def format_traceback(tb_entry, depth=0, use_color=enable_use_color): arrow_str = format_arrow('T', 'double', 3 + 2 * depth) filename, lineno, funcname, line = tb_entry file_path = f"file:///{filename}" file_link = f"{filename}::{lineno}::{funcname}" trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link arrow_str2 = format_arrow('L', 'double', 3 + 2 * (depth+1)) #(" " * (3 + 2 * depth))+ formatted_tb_str = f'\n ~ | {arrow_str}trace({depth}): {trace_link}\n ~ | {arrow_str2} Line: "{line}"' return ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str def trace_error(e, use_color=True, fancy=True): exception_str = f"\n ~ | {ExtendedSymbols.DOUBLE_VERTICAL}Exception: {repr(e)}" out_str = ansi_color_str(exception_str, fg=error_color) if use_color else exception_str if isinstance(e, BaseException): arrow_str = format_arrow('T', 'double', 3) traceback_obj = e.__traceback__ if traceback_obj: file_path = f"file:///{traceback_obj.tb_frame.f_code.co_filename}" file_link = f"{traceback_obj.tb_frame.f_code.co_filename}::{traceback_obj.tb_lineno}" trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link formatted_tb_str = f"\n ~ | {arrow_str}trace(0): {trace_link}" out_str += ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str tb = traceback.extract_tb(traceback_obj) for i, tb_entry in enumerate(tb): out_str += format_traceback(tb_entry, depth=i+1, use_color=use_color) return ansi_color_str(out_str, fg=error_color) if use_color else out_str # log_info(' ~ | test log_info: {}', 1) # log_warning(' ~ | test log_warning: {}', 2) def log_text_input(key: str): log_info(f'\n ~ | Text Input[{key}]: ') input_text = input() log_info(f'\n ~ | Recv input: "{input_text}"') return input_text def format_duration(seconds): hours, remainder = divmod(seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{int(hours)}h {int(minutes)}m {int(seconds)}s" def parse_duration(duration_str): parts = duration_str.split() hours = int(parts[0][:-1]) if 'h' in parts[0] else 0 minutes = int(parts[1][:-1]) if 'm' in parts[1] else 0 seconds = int(parts[2][:-1]) if 's' in parts[2] else 0 return hours * 3600 + minutes * 60 + seconds def get_timestamp(): return datetime.datetime.now().strftime("%Y-%b-%d %H:%M:%S") def format_path(directory, depth=0, max_depth=3, use_color=True): """Formats the directory structure as a string up to a specified depth, with optional color. Args: directory (str or Path): The directory path to format. depth (int): The current depth in the directory structure. max_depth (int): The maximum depth to format. use_color (bool): Whether to apply color to the output. Returns: str: The formatted directory structure. """ if max_depth is not None and depth > max_depth: return "" directory_path = Path(directory) if not directory_path.is_dir(): return "The specified path is not a directory.\n" # Build the formatted string, applying color if requested line_prefix = "│ " * depth + "├── " directory_name = directory_path.name if use_color: line_prefix = ansi_color_str(line_prefix, fg='cyan') # Assuming color_str function exists directory_name = ansi_color_str(directory_name, fg='green') # Adjust colors as needed formatted_str = line_prefix + directory_name + "\n" # Sort directory contents for consistent order sorted_items = sorted(directory_path.iterdir(), key=lambda x: (x.is_file(), x.name)) for item in sorted_items: if item.is_dir(): # Recursive call for directories, increasing the depth formatted_str += format_path(item, depth + 1, max_depth, use_color) else: # Include file name with indentation, applying color if requested file_line = "│ " * (depth + 1) + "├── " + item.name if use_color: file_line = ansi_color_str(file_line, fg='white') # Example color, adjust as needed formatted_str += file_line + "\n" return formatted_str # Spinner Class class LogSpinner: def __init__(self, message="", rainbow=True, anim_speed=0.1): self.message = message self.speed = anim_speed self.colors = ['\033[31m', '\033[33m', '\033[32m', '\033[34m', '\033[35m', '\033[36m'] self.spinner = ['⠇', '⠋', '⠙', '⠸', '⠼', '⠴', '⠦', '⠧'] #self.spinner = ['|', '/', '-', '\\'] self.rainbow = rainbow def __enter__(self): self.stop_spinner = False self.startTime = time.time() # spinner_text = f'^_^ | {self.message} - time: {0.0:.2f}s | Begin!' # print(spinner_text) self.spinner_thread = threading.Thread(target=self.spin) self.spinner_thread.start() return self def __exit__(self, exc_type, exc_value, traceback): self.stop_spinner = True self.spinner_thread.join() curTime = time.time() spinner_text = f'>_< | {self.message} - time: {format_duration(curTime - self.startTime)} | Done!' print(spinner_text) def spin(self): while not self.stop_spinner: #self.update_spinner() if self.rainbow: self.update_color_spinner() else: self.update_spinner() def update_spinner(self): for char in self.spinner: curTime = time.time() spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |' print(f'\r ^_^ | {char} | {spinner_text}', end='\r', flush=True) time.sleep(self.speed) def update_color_spinner(self): for color in self.colors: for char in self.spinner: curTime = time.time() spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |' print(f'^_^ | {color}{char}\033[0m | {spinner_text}', end='\r', flush=True) time.sleep(self.speed) def run_unit_test(): # Example usage: class Person: def __init__(self, name, age, city): self.name = name self.age = age self.city = city # Create instances of Person person1 = Person('John', 30, 'New York') person2 = Person('Alice', 25, 'Los Angeles') person3 = Person('Bob', 30, 'Hong Kong') person4 = Person('Charlie', 35, 'Shanghai') person5 = Person('David', 40, 'Beijing') # Define other data structures data_dict = {'Name': 'John', 'Age': 30, 'City': 'New York'} data_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] data_set = {'apple', 'banana', 'orange'} data_dict_of_lists = {'Name': ['John', 'Alice'], 'Age': [30, 25], 'City': ['New York', 'Los Angeles']} data_list_of_dicts = [person1.__dict__, person2.__dict__] data_dict_of_dicts = {'Person1': person1.__dict__, 'Person2': person2.__dict__} list_of_objs = [person3, person4, person5] data_list_of_lists = [['John', 30, 'New York'], ['Alice', 25, 'Los Angeles']] dict_of_objs = {'Alice': person3, 'Bob': person4, 'Charlie': person5} dict_of_list_of_objs = {'Group1': [person3, person4], 'Group2': [person5]} # Combine all data into a complex structure complex_data = { 'data_dict': data_dict, 'data_list': data_list, 'data_set': data_set, 'data_dict_of_lists': data_dict_of_lists, 'data_list_of_dicts': data_list_of_dicts, 'data_dict_of_dicts': data_dict_of_dicts, 'list_of_objs': list_of_objs, 'data_list_of_lists': data_list_of_lists, 'dict_of_objs': dict_of_objs, 'dict_of_list_of_objs': dict_of_list_of_objs } # Log each unique data structure log_info("Data Dictionary: {}", data_dict) log_info("Data List: {}", data_list) log_info("Data Set: {}", data_set) log_info("Data Dictionary of Lists: {}", data_dict_of_lists) log_info("Data List of Dicts: {}", data_list_of_dicts) log_info("Data Dictionary of Dicts: {}", data_dict_of_dicts) log_info("List of Objects: {}", list_of_objs) log_info("Data List of Lists: {}", data_list_of_lists) log_info("Dictionary of Objects: {}", dict_of_objs) log_info("Dictionary of List of Objects: {}", dict_of_list_of_objs) #run_unit_test()