Spaces:
Sleeping
Sleeping
| from transitions import Machine | |
| from enum import Enum | |
| OKBLUE = '\033[94m' | |
| OKGREEN = '\033[92m' | |
| OKCYAN = '\033[96m' | |
| FAIL = '\033[91m' | |
| ENDC = '\033[0m' | |
| # define the states | |
| # 0. init | |
| # 1. data entry complete | |
| # 2. data entry validated | |
| # 3. ML classification started (can be long running on batch) | |
| # 4. ML classification completed | |
| # 5. manual inspection / adjustment of classification completed | |
| # 6. data uploaded | |
| states = ['init', 'data_entry_complete', 'data_entry_validated', 'ml_classification_started', 'ml_classification_completed', 'manual_inspection_completed', 'data_uploaded'] | |
| # define an enum for the states, automatically giving integers according to the position in the list | |
| # - this is useful for the transitions | |
| # maybe this needs to use setattr or similar | |
| workflow_phases = Enum('StateEnum', {state: i for i, state in enumerate(states)}) | |
| class WorkflowState(Enum): | |
| INIT = 0 | |
| DATA_ENTRY_COMPLETE = 1 | |
| DATA_VALIDATED = 2 | |
| #ML_STARTED = 3 | |
| ML_COMPLETED = 3 | |
| MANUAL_REVIEW_COMPLETE = 4 | |
| UPLOADED = 5 | |
| # TODO: refactor the FSM to have explicit named states, and write a helper function to determine the next state and advance to it. | |
| # this allows either triggering by name, or being a bit lazy and saying "advance" and it will go to the next state.. | |
| # maybe a cleaner way is to say completed('X') and then whatever the next state from X is can be taken. Instead of knowing | |
| # what the next state is (becausee that was supposed to be defined her in the specification, and not in each phase) | |
| # | |
| # also add a "did we pass stage X" function, by name. This will make it easy to choose what to present, what actions to do next etc. | |
| class WorkflowFSM: | |
| def __init__(self): | |
| # Define states as strings (transitions requirement) | |
| self.states = [state.name for state in WorkflowState] | |
| # TODO: what is the point of the enum? I can just take the list and do an enumerate on it.?? | |
| # Create state machine | |
| self.machine = Machine( | |
| model=self, | |
| states=self.states, | |
| initial=WorkflowState.INIT.name, | |
| ) | |
| # Add transitions for each state to the next state | |
| for i in range(len(self.states) - 1): | |
| self.machine.add_transition( | |
| trigger='advance', | |
| source=self.states[i], | |
| dest=self.states[i + 1] | |
| ) | |
| # Add reset transition | |
| self.machine.add_transition( | |
| trigger='reset', | |
| source='*', | |
| dest=WorkflowState.INIT.name | |
| ) | |
| # Add callbacks for logging | |
| self.machine.before_state_change = self._log_transition | |
| self.machine.after_state_change = self._post_transition | |
| def _cprint(self, msg:str, color:str=OKCYAN): | |
| """Print colored message""" | |
| print(f"{color}{msg}{ENDC}") | |
| def _advance_state(self): | |
| """Determine the next state based on current state""" | |
| current_idx = self.states.index(self.state) | |
| if current_idx < len(self.states) - 1: | |
| return self.states[current_idx + 1] | |
| return self.state # Stay in final state if already there | |
| def _log_transition(self): | |
| # TODO: use logger, not printing. | |
| self._cprint(f"[FSM] -> Transitioning from {self.state}") | |
| def _post_transition(self): | |
| # TODO: use logger, not printing. | |
| self._cprint(f"[FSM] -| Transitioned to {self.state}") | |
| def advance(self): | |
| if self.state_number < len(self.states) - 1: | |
| self.trigger('advance') | |
| else: | |
| # maybe too aggressive to exception here? | |
| raise RuntimeError("Already at final state") | |
| def state_number(self) -> int: | |
| """Get the numerical value of current state""" | |
| return self.states.index(self.state) | |
| def state_name(self) -> str: | |
| """Get the name of current state""" | |
| return self.state | |
| # add a property for the number of states | |
| def num_states(self) -> int: | |
| return len(self.states) | |