Spaces:
Running
Running
from transitions import Machine | |
from enum import Enum | |
OKBLUE = '\033[94m' | |
OKGREEN = '\033[92m' | |
OKCYAN = '\033[96m' | |
FAIL = '\033[91m' | |
ENDC = '\033[0m' | |
# define the states | |
# 0. init | |
# 1. data entry complete | |
# 2. data entry validated | |
# 3. ML classification started (can be long running on batch) | |
# 4. ML classification completed | |
# 5. manual inspection / adjustment of classification completed | |
# 6. data uploaded | |
states = ['init', 'data_entry_complete', 'data_entry_validated', 'ml_classification_started', 'ml_classification_completed', 'manual_inspection_completed', 'data_uploaded'] | |
# define an enum for the states, automatically giving integers according to the position in the list | |
# - this is useful for the transitions | |
# maybe this needs to use setattr or similar | |
workflow_phases = Enum('StateEnum', {state: i for i, state in enumerate(states)}) | |
class WorkflowState(Enum): | |
INIT = 0 | |
DATA_ENTRY_COMPLETE = 1 | |
DATA_VALIDATED = 2 | |
#ML_STARTED = 3 | |
ML_COMPLETED = 3 | |
MANUAL_REVIEW_COMPLETE = 4 | |
UPLOADED = 5 | |
# TODO: refactor the FSM to have explicit named states, and write a helper function to determine the next state and advance to it. | |
# this allows either triggering by name, or being a bit lazy and saying "advance" and it will go to the next state.. | |
# maybe a cleaner way is to say completed('X') and then whatever the next state from X is can be taken. Instead of knowing | |
# what the next state is (becausee that was supposed to be defined her in the specification, and not in each phase) | |
# | |
# also add a "did we pass stage X" function, by name. This will make it easy to choose what to present, what actions to do next etc. | |
class WorkflowFSM: | |
def __init__(self): | |
# Define states as strings (transitions requirement) | |
self.states = [state.name for state in WorkflowState] | |
# TODO: what is the point of the enum? I can just take the list and do an enumerate on it.?? | |
# Create state machine | |
self.machine = Machine( | |
model=self, | |
states=self.states, | |
initial=WorkflowState.INIT.name, | |
) | |
# Add transitions for each state to the next state | |
for i in range(len(self.states) - 1): | |
self.machine.add_transition( | |
trigger='advance', | |
source=self.states[i], | |
dest=self.states[i + 1] | |
) | |
# Add reset transition | |
self.machine.add_transition( | |
trigger='reset', | |
source='*', | |
dest=WorkflowState.INIT.name | |
) | |
# Add callbacks for logging | |
self.machine.before_state_change = self._log_transition | |
self.machine.after_state_change = self._post_transition | |
def _cprint(self, msg:str, color:str=OKCYAN): | |
"""Print colored message""" | |
print(f"{color}{msg}{ENDC}") | |
def _advance_state(self): | |
"""Determine the next state based on current state""" | |
current_idx = self.states.index(self.state) | |
if current_idx < len(self.states) - 1: | |
return self.states[current_idx + 1] | |
return self.state # Stay in final state if already there | |
def _log_transition(self): | |
# TODO: use logger, not printing. | |
self._cprint(f"[FSM] -> Transitioning from {self.state}") | |
def _post_transition(self): | |
# TODO: use logger, not printing. | |
self._cprint(f"[FSM] -| Transitioned to {self.state}") | |
def advance(self): | |
if self.state_number < len(self.states) - 1: | |
self.trigger('advance') | |
else: | |
# maybe too aggressive to exception here? | |
raise RuntimeError("Already at final state") | |
def state_number(self) -> int: | |
"""Get the numerical value of current state""" | |
return self.states.index(self.state) | |
def state_name(self) -> str: | |
"""Get the name of current state""" | |
return self.state | |
# add a property for the number of states | |
def num_states(self) -> int: | |
return len(self.states) | |