Spaces:
Running
Running
| import xml.etree.ElementTree as ET | |
| from modules.utils import class_dict | |
| from xml.dom import minidom | |
| from modules.utils import error | |
| from modules.OCR import analyze_sentiment | |
| def rescale(scale, boxes): | |
| """ | |
| Rescale the coordinates of the bounding boxes by a given scale factor. | |
| Args: | |
| scale (float): The scale factor to apply. | |
| boxes (list): List of bounding boxes to be rescaled. | |
| Returns: | |
| list: Rescaled bounding boxes. | |
| """ | |
| for i in range(len(boxes)): | |
| boxes[i] = [boxes[i][0] * scale, boxes[i][1] * scale, boxes[i][2] * scale, boxes[i][3] * scale] | |
| return boxes | |
| def create_BPMN_id(data): | |
| """ | |
| Create unique BPMN IDs for each element in the data based on their types. | |
| Args: | |
| data (dict): Dictionary containing labels and links of elements. | |
| Returns: | |
| dict: Updated data with BPMN IDs assigned. | |
| """ | |
| enum_end, enum_start, enum_task, enum_sequence, enum_dataflow, enum_messflow, enum_messageEvent, enum_exclusiveGateway, enum_parallelGateway, enum_pool = 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 | |
| BPMN_name = [class_dict[data['labels'][i]] for i in range(len(data['labels']))] | |
| for idx, Bpmn_id in enumerate(BPMN_name): | |
| if Bpmn_id == 'event': | |
| if data['links'][idx][0] is not None and data['links'][idx][1] is None: | |
| data['BPMN_id'][idx] = f'end_event_{enum_end}' | |
| enum_end += 1 | |
| elif data['links'][idx][0] is None and data['links'][idx][1] is not None: | |
| data['BPMN_id'][idx] = f'start_event_{enum_start}' | |
| enum_start += 1 | |
| elif Bpmn_id == 'task' or Bpmn_id == 'dataObject': | |
| data['BPMN_id'][idx] = f'task_{enum_task}' | |
| enum_task += 1 | |
| elif Bpmn_id == 'sequenceFlow': | |
| data['BPMN_id'][idx] = f'sequenceFlow_{enum_sequence}' | |
| enum_sequence += 1 | |
| elif Bpmn_id == 'messageFlow': | |
| data['BPMN_id'][idx] = f'messageFlow_{enum_messflow}' | |
| enum_messflow += 1 | |
| elif Bpmn_id == 'messageEvent': | |
| data['BPMN_id'][idx] = f'message_event_{enum_messageEvent}' | |
| enum_messageEvent += 1 | |
| elif Bpmn_id == 'exclusiveGateway': | |
| data['BPMN_id'][idx] = f'exclusiveGateway_{enum_exclusiveGateway}' | |
| enum_exclusiveGateway += 1 | |
| elif Bpmn_id == 'parallelGateway': | |
| data['BPMN_id'][idx] = f'parallelGateway_{enum_parallelGateway}' | |
| enum_parallelGateway += 1 | |
| elif Bpmn_id == 'dataAssociation': | |
| data['BPMN_id'][idx] = f'dataAssociation_{enum_sequence}' | |
| enum_dataflow += 1 | |
| elif Bpmn_id == 'pool': | |
| data['BPMN_id'][idx] = f'pool_{enum_pool}' | |
| enum_pool += 1 | |
| return data | |
| def check_end(link): | |
| """ | |
| Check if a link represents an end event. | |
| Args: | |
| link (tuple): A link containing indices of connected elements. | |
| Returns: | |
| bool: True if the link represents an end event, False otherwise. | |
| """ | |
| if link[1] is None: | |
| return True | |
| return False | |
| def connect(data, text_mapping, i): | |
| """ | |
| Connect elements based on their links and generate the corresponding text mapping. | |
| Args: | |
| data (dict): Data containing links and BPMN IDs. | |
| text_mapping (dict): Mapping of BPMN IDs to their text descriptions. | |
| i (int): Index of the current element. | |
| Returns: | |
| tuple: Current text, next texts, and next ID. | |
| """ | |
| next_text = [] | |
| target_idx = data['links'][i][1] | |
| # Check if the target index is valid | |
| if target_idx == None or target_idx >= len(data['links']): | |
| error('There may be an error with the Vizi file, care when you download it.') | |
| return None, None, None | |
| current_id = data['BPMN_id'][i] | |
| current_text = text_mapping[current_id] | |
| next_idx = data['links'][target_idx][1] | |
| next_id = data['BPMN_id'][next_idx] | |
| if next_id.split('_')[0] == 'exclusiveGateway': | |
| for idx, link in enumerate(data['links']): | |
| if link[0] == next_idx and link[1] is not None: | |
| next_text.append(text_mapping[data['BPMN_id'][link[1]]]) | |
| elif next_id.split('_')[0] == 'parallelGateway': | |
| for idx, link in enumerate(data['links']): | |
| if link[0] == next_idx and link[1] is not None: | |
| next_text.append(text_mapping[data['BPMN_id'][link[1]]]) | |
| else: | |
| next_text.append(text_mapping[next_id]) | |
| return current_text, next_text, next_id | |
| def check_start(val): | |
| """ | |
| Check if a link represents a start event. | |
| Args: | |
| val (tuple): A link containing indices of connected elements. | |
| Returns: | |
| bool: True if the link represents a start event, False otherwise. | |
| """ | |
| if val[0] is None: | |
| return True | |
| return False | |
| def find_merge(bpmn_id, links): | |
| """ | |
| Identify merge points in the BPMN diagram. | |
| Args: | |
| bpmn_id (list): List of BPMN IDs. | |
| links (list): List of links between elements. | |
| Returns: | |
| list: List indicating merge points. | |
| """ | |
| merge = [] | |
| for idx, link in enumerate(links): | |
| next_element = link[1] | |
| if next_element is None: | |
| merge.append(None) | |
| continue | |
| next_object = links[next_element][1] | |
| if next_object is None: | |
| merge.append(None) | |
| continue | |
| if bpmn_id[next_object].split('_')[0] == 'parallelGateway': | |
| merge.append(bpmn_id[next_object]) | |
| else: | |
| merge.append(None) | |
| merge_elements = merge.copy() | |
| for idx, element in enumerate(merge): | |
| if element is None: | |
| merge_elements[idx] = False | |
| continue | |
| # Count how many times the element is in the list | |
| count = merge.count(element) | |
| if count > 1: | |
| merge_elements[idx] = True | |
| else: | |
| merge_elements[idx] = False | |
| return merge_elements | |
| def find_positive_end(bpmn_ids, links, text_mapping): | |
| """ | |
| Find the positive end event based on sentiment analysis. | |
| Args: | |
| bpmn_ids (list): List of BPMN IDs. | |
| links (list): List of links between elements. | |
| text_mapping (dict): Mapping of BPMN IDs to their text descriptions. | |
| Returns: | |
| str: BPMN ID of the positive end event. | |
| """ | |
| emotion_data = [] | |
| for idx, bpmn_id in enumerate(bpmn_ids): | |
| if idx >= len(links): | |
| continue | |
| if check_end(links[idx]) and (bpmn_id.split('_')[0] in ['event', 'message']): | |
| # Perform sentiment analysis and get the highest scoring emotion and its score between positive and negative | |
| highest_emotion, highest_score = analyze_sentiment(text_mapping[bpmn_id]) | |
| emotion_data.append((bpmn_id, highest_emotion, highest_score)) | |
| # Sort by emotion label with 'positive' first and 'negative' second, | |
| # then by score in descending order | |
| sorted_emotions = sorted(emotion_data, key=lambda x: (x[1] != 'positive', -x[2])) | |
| return sorted_emotions[0][0] if len(sorted_emotions) > 0 else None | |
| def find_best_direction(texts_list): | |
| """ | |
| Find the best direction based on sentiment analysis. | |
| Args: | |
| texts_list (list): List of texts to analyze. | |
| Returns: | |
| str: Text with the best (positive) sentiment. | |
| """ | |
| emotion_data = [] | |
| for text in texts_list: | |
| highest_emotion, highest_score = analyze_sentiment(text) | |
| emotion_data.append((text, highest_emotion, highest_score)) | |
| # Sort by emotion label with 'positive' first and 'negative' second, | |
| # then by score in descending order | |
| sorted_emotions = sorted(emotion_data, key=lambda x: (x[1] != 'positive', -x[2])) | |
| return sorted_emotions[0][0] if len(sorted_emotions) > 0 else None | |
| def create_wizard_file(data, text_mapping): | |
| """ | |
| Create a wizard file for BPMN modeling based on the provided data and text mappings. | |
| Args: | |
| data (dict): Data containing BPMN elements and their properties. | |
| text_mapping (dict): Mapping of BPMN IDs to their text descriptions. | |
| Returns: | |
| str: Pretty-printed XML string of the wizard file. | |
| """ | |
| not_change = ['pool','sequenceFlow','messageFlow','dataAssociation'] | |
| # Add a name into the text_mapping when there is no name | |
| for idx, key in enumerate(text_mapping.keys()): | |
| if text_mapping[key] == '' and key.split('_')[0] not in not_change: | |
| text_mapping[key] = f'unnamed_{key}' | |
| root = ET.Element('methodAndStyleWizard') | |
| modelName = ET.SubElement(root, 'modelName') | |
| modelName.text = 'My Diagram' | |
| author = ET.SubElement(root, 'author') | |
| author.text = 'sketch-to-BPMN' | |
| # Add pools to the collaboration element | |
| for idx, (pool_index, keep_elements) in enumerate(data['pool_dict'].items()): | |
| pool_id = f'participant_{idx+1}' | |
| pool = ET.SubElement(root, 'processName') | |
| pool.text = text_mapping[pool_index] | |
| processDescription = ET.SubElement(root, 'processDescription') | |
| first = False | |
| for idx, Bpmn_id in enumerate(data['BPMN_id']): | |
| # Start Event | |
| element_type = Bpmn_id.split('_')[0] | |
| if element_type == 'message': | |
| eventType = 'Message' | |
| elif element_type == 'event': | |
| eventType = 'None' | |
| if idx >= len(data['links']): | |
| continue | |
| if check_start(data['links'][idx]) and (element_type == 'event' or element_type == 'message'): | |
| if text_mapping[Bpmn_id] == '': | |
| text_mapping[Bpmn_id] = 'start' | |
| startEvent = ET.SubElement(root, 'startEvent', attrib={'name': text_mapping[Bpmn_id], 'eventType': eventType, 'isRegular': 'True'}) | |
| requestMessage = ET.SubElement(root, 'requestMessage') | |
| requester = ET.SubElement(root, 'requester') | |
| endEvents = ET.SubElement(root, 'endStates') | |
| positive_end = find_positive_end(data['BPMN_id'], data['links'], text_mapping) | |
| if positive_end is not None: | |
| print("Best end is: ", text_mapping[positive_end]) | |
| # Add end states event to the collaboration element | |
| for idx, Bpmn_id in enumerate(data['BPMN_id']): | |
| # End States | |
| if idx >= len(data['links']): | |
| continue | |
| if check_end(data['links'][idx]) and (Bpmn_id.split('_')[0] == 'event' or Bpmn_id.split('_')[0] == 'message'): | |
| if text_mapping[Bpmn_id] == '': | |
| text_mapping[Bpmn_id] = '(unnamed)' | |
| if Bpmn_id == positive_end: | |
| ET.SubElement(endEvents, 'endState', attrib={'name': text_mapping[Bpmn_id], 'eventType': 'None', 'isRegular': 'True'}) | |
| else: | |
| ET.SubElement(endEvents, 'endState', attrib={'name': text_mapping[Bpmn_id], 'eventType': 'None', 'isRegular': 'False'}) | |
| # Add activities to the collaboration element | |
| activities = ET.SubElement(root, 'activities') | |
| for idx, activity_name in enumerate(data['BPMN_id']): | |
| if activity_name.startswith('task'): | |
| activity = ET.SubElement(activities, 'activity', attrib={'name': text_mapping.get(activity_name, activity_name), 'performer': ''}) | |
| endStates = ET.SubElement(activity, 'endStates') | |
| current_text, next_text, next_id = connect(data, text_mapping, idx) | |
| if next_text is not None and len(next_text) == 1: | |
| ET.SubElement(endStates, 'endState', attrib={'name': next_text[0], 'isRegular': 'True'}) | |
| elif next_text is not None and len(next_text) >= 2 and next_id.split('_')[0] == 'exclusiveGateway': | |
| best_direction = find_best_direction(next_text) | |
| if best_direction is not None: | |
| print("Best direction is: ", best_direction) | |
| for i in range(len(next_text)): | |
| if next_text[i] == best_direction: | |
| ET.SubElement(endStates, 'endState', attrib={'name': next_text[i], 'isRegular': 'True'}) | |
| else: | |
| ET.SubElement(endStates, 'endState', attrib={'name': next_text[i], 'isRegular': 'False'}) | |
| ET.SubElement(activity, 'subActivities') | |
| ET.SubElement(activity, 'subActivityFlows') | |
| ET.SubElement(activity, 'messageFlows') | |
| merge_object = find_merge(data['BPMN_id'], data['links']) | |
| activityFlows = ET.SubElement(root, 'activityFlows') | |
| for i, link in enumerate(data['links']): | |
| # create flow with start event | |
| if link[0] is None and link[1] is not None and (data['BPMN_id'][i].split('_')[0] == 'event' or data['BPMN_id'][i].split('_')[0] == 'message'): | |
| current_text, next_text, _ = connect(data, text_mapping, i) | |
| if current_text is None or next_text is None: | |
| continue | |
| ET.SubElement(activityFlows, 'activityFlow', attrib={'startEvent': current_text, 'endState': '---', 'target': next_text[0], 'isMerging': 'False', 'isPredefined': 'True'}) | |
| continue | |
| # create flow with tasks | |
| if link[0] is not None and link[1] is not None and data['BPMN_id'][i].split('_')[0] == 'task': | |
| current_text, next_text, next_id = connect(data, text_mapping, i) | |
| if current_text is None or next_text is None: | |
| continue | |
| if merge_object[i] == True: | |
| merge = 'True' | |
| else: | |
| merge = 'False' | |
| if len(next_text) == 2 and next_id.split('_')[0] == 'exclusiveGateway': | |
| ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': next_text[0], 'target': next_text[0], 'isMerging': 'False', 'isPredefined': 'True'}) | |
| ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': next_text[1], 'target': next_text[1], 'isMerging': 'False', 'isPredefined': 'True'}) | |
| elif len(next_text) > 1 and next_id.split('_')[0] == 'parallelGateway': | |
| for next in next_text: | |
| ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next, 'isMerging': merge, 'isPredefined': 'True'}) | |
| elif len(next_text) == 1: | |
| ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next_text[0], 'isMerging': merge, 'isPredefined': 'True'}) | |
| else: | |
| ET.SubElement(activityFlows, 'activityFlow', attrib={'activity': current_text, 'endState': '---', 'target': next_text, 'isMerging': merge, 'isPredefined': 'True'}) | |
| ET.SubElement(root, 'participants') | |
| # Pretty print the XML | |
| pwm_str = ET.tostring(root, encoding='utf-8', method='xml') | |
| pretty_pwm_str = minidom.parseString(pwm_str).toprettyxml(indent=" ") | |
| return pretty_pwm_str | |