Spaces:
Configuration error
Configuration error
| import itertools | |
| import statistics | |
| from collections import Counter, defaultdict | |
| from copy import deepcopy | |
| from functools import partial | |
| from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union | |
| from uuid import uuid4 | |
| import numpy as np | |
| from fastapi import BackgroundTasks | |
| from inference.core.managers.base import ModelManager | |
| from inference.core.utils.image_utils import ImageType, load_image | |
| from inference.enterprise.workflows.complier.entities import StepExecutionMode | |
| from inference.enterprise.workflows.complier.steps_executors.active_learning_middlewares import ( | |
| WorkflowsActiveLearningMiddleware, | |
| ) | |
| from inference.enterprise.workflows.complier.steps_executors.constants import ( | |
| CENTER_X_KEY, | |
| CENTER_Y_KEY, | |
| DETECTION_ID_KEY, | |
| HEIGHT_KEY, | |
| IMAGE_TYPE_KEY, | |
| IMAGE_VALUE_KEY, | |
| ORIGIN_COORDINATES_KEY, | |
| ORIGIN_SIZE_KEY, | |
| PARENT_ID_KEY, | |
| WIDTH_KEY, | |
| ) | |
| from inference.enterprise.workflows.complier.steps_executors.types import ( | |
| NextStepReference, | |
| OutputsLookup, | |
| ) | |
| from inference.enterprise.workflows.complier.steps_executors.utils import ( | |
| get_image, | |
| resolve_parameter, | |
| ) | |
| from inference.enterprise.workflows.complier.utils import ( | |
| construct_selector_pointing_step_output, | |
| construct_step_selector, | |
| ) | |
| from inference.enterprise.workflows.entities.steps import ( | |
| AbsoluteStaticCrop, | |
| ActiveLearningDataCollector, | |
| AggregationMode, | |
| BinaryOperator, | |
| CompoundDetectionFilterDefinition, | |
| Condition, | |
| Crop, | |
| DetectionFilter, | |
| DetectionFilterDefinition, | |
| DetectionOffset, | |
| DetectionsConsensus, | |
| Operator, | |
| RelativeStaticCrop, | |
| ) | |
| from inference.enterprise.workflows.entities.validators import get_last_selector_chunk | |
| from inference.enterprise.workflows.errors import ExecutionGraphError | |
| OPERATORS = { | |
| Operator.EQUAL: lambda a, b: a == b, | |
| Operator.NOT_EQUAL: lambda a, b: a != b, | |
| Operator.LOWER_THAN: lambda a, b: a < b, | |
| Operator.GREATER_THAN: lambda a, b: a > b, | |
| Operator.LOWER_OR_EQUAL_THAN: lambda a, b: a <= b, | |
| Operator.GREATER_OR_EQUAL_THAN: lambda a, b: a >= b, | |
| Operator.IN: lambda a, b: a in b, | |
| } | |
| BINARY_OPERATORS = { | |
| BinaryOperator.AND: lambda a, b: a and b, | |
| BinaryOperator.OR: lambda a, b: a or b, | |
| } | |
| AGGREGATION_MODE2FIELD_AGGREGATOR = { | |
| AggregationMode.MAX: max, | |
| AggregationMode.MIN: min, | |
| AggregationMode.AVERAGE: statistics.mean, | |
| } | |
| async def run_crop_step( | |
| step: Crop, | |
| runtime_parameters: Dict[str, Any], | |
| outputs_lookup: OutputsLookup, | |
| model_manager: ModelManager, | |
| api_key: Optional[str], | |
| step_execution_mode: StepExecutionMode, | |
| ) -> Tuple[NextStepReference, OutputsLookup]: | |
| image = get_image( | |
| step=step, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| detections = resolve_parameter( | |
| selector_or_value=step.detections, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| decoded_images = [load_image(e) for e in image] | |
| decoded_images = [ | |
| i[0] if i[1] is True else i[0][:, :, ::-1] for i in decoded_images | |
| ] | |
| origin_image_shape = extract_origin_size_from_images( | |
| input_images=image, | |
| decoded_images=decoded_images, | |
| ) | |
| crops = list( | |
| itertools.chain.from_iterable( | |
| crop_image(image=i, detections=d, origin_size=o) | |
| for i, d, o in zip(decoded_images, detections, origin_image_shape) | |
| ) | |
| ) | |
| parent_ids = [c[PARENT_ID_KEY] for c in crops] | |
| outputs_lookup[construct_step_selector(step_name=step.name)] = { | |
| "crops": crops, | |
| PARENT_ID_KEY: parent_ids, | |
| } | |
| return None, outputs_lookup | |
| def crop_image( | |
| image: np.ndarray, | |
| detections: List[dict], | |
| origin_size: dict, | |
| ) -> List[Dict[str, Union[str, np.ndarray]]]: | |
| crops = [] | |
| for detection in detections: | |
| x_min, y_min, x_max, y_max = detection_to_xyxy(detection=detection) | |
| cropped_image = image[y_min:y_max, x_min:x_max] | |
| crops.append( | |
| { | |
| IMAGE_TYPE_KEY: ImageType.NUMPY_OBJECT.value, | |
| IMAGE_VALUE_KEY: cropped_image, | |
| PARENT_ID_KEY: detection[DETECTION_ID_KEY], | |
| ORIGIN_COORDINATES_KEY: { | |
| CENTER_X_KEY: detection["x"], | |
| CENTER_Y_KEY: detection["y"], | |
| ORIGIN_SIZE_KEY: origin_size, | |
| }, | |
| } | |
| ) | |
| return crops | |
| async def run_condition_step( | |
| step: Condition, | |
| runtime_parameters: Dict[str, Any], | |
| outputs_lookup: OutputsLookup, | |
| model_manager: ModelManager, | |
| api_key: Optional[str], | |
| step_execution_mode: StepExecutionMode, | |
| ) -> Tuple[NextStepReference, OutputsLookup]: | |
| left_value = resolve_parameter( | |
| selector_or_value=step.left, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| right_value = resolve_parameter( | |
| selector_or_value=step.right, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| evaluation_result = OPERATORS[step.operator](left_value, right_value) | |
| next_step = step.step_if_true if evaluation_result else step.step_if_false | |
| return next_step, outputs_lookup | |
| async def run_detection_filter( | |
| step: DetectionFilter, | |
| runtime_parameters: Dict[str, Any], | |
| outputs_lookup: OutputsLookup, | |
| model_manager: ModelManager, | |
| api_key: Optional[str], | |
| step_execution_mode: StepExecutionMode, | |
| ) -> Tuple[NextStepReference, OutputsLookup]: | |
| predictions = resolve_parameter( | |
| selector_or_value=step.predictions, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| images_meta_selector = construct_selector_pointing_step_output( | |
| selector=step.predictions, | |
| new_output="image", | |
| ) | |
| images_meta = resolve_parameter( | |
| selector_or_value=images_meta_selector, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| prediction_type_selector = construct_selector_pointing_step_output( | |
| selector=step.predictions, | |
| new_output="prediction_type", | |
| ) | |
| predictions_type = resolve_parameter( | |
| selector_or_value=prediction_type_selector, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| filter_callable = build_filter_callable(definition=step.filter_definition) | |
| result_detections, result_parent_id = [], [] | |
| for prediction in predictions: | |
| filtered_predictions = [deepcopy(p) for p in prediction if filter_callable(p)] | |
| result_detections.append(filtered_predictions) | |
| result_parent_id.append([p[PARENT_ID_KEY] for p in filtered_predictions]) | |
| step_selector = construct_step_selector(step_name=step.name) | |
| outputs_lookup[step_selector] = [ | |
| {"predictions": d, PARENT_ID_KEY: p, "image": i, "prediction_type": pt} | |
| for d, p, i, pt in zip( | |
| result_detections, result_parent_id, images_meta, predictions_type | |
| ) | |
| ] | |
| return None, outputs_lookup | |
| def build_filter_callable( | |
| definition: Union[DetectionFilterDefinition, CompoundDetectionFilterDefinition], | |
| ) -> Callable[[dict], bool]: | |
| if definition.type == "CompoundDetectionFilterDefinition": | |
| left_callable = build_filter_callable(definition=definition.left) | |
| right_callable = build_filter_callable(definition=definition.right) | |
| binary_operator = BINARY_OPERATORS[definition.operator] | |
| return lambda e: binary_operator(left_callable(e), right_callable(e)) | |
| if definition.type == "DetectionFilterDefinition": | |
| operator = OPERATORS[definition.operator] | |
| return lambda e: operator(e[definition.field_name], definition.reference_value) | |
| raise ExecutionGraphError( | |
| f"Detected filter definition of type {definition.type} which is unknown" | |
| ) | |
| async def run_detection_offset_step( | |
| step: DetectionOffset, | |
| runtime_parameters: Dict[str, Any], | |
| outputs_lookup: OutputsLookup, | |
| model_manager: ModelManager, | |
| api_key: Optional[str], | |
| step_execution_mode: StepExecutionMode, | |
| ) -> Tuple[NextStepReference, OutputsLookup]: | |
| detections = resolve_parameter( | |
| selector_or_value=step.predictions, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| images_meta_selector = construct_selector_pointing_step_output( | |
| selector=step.predictions, | |
| new_output="image", | |
| ) | |
| images_meta = resolve_parameter( | |
| selector_or_value=images_meta_selector, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| prediction_type_selector = construct_selector_pointing_step_output( | |
| selector=step.predictions, | |
| new_output="prediction_type", | |
| ) | |
| predictions_type = resolve_parameter( | |
| selector_or_value=prediction_type_selector, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| offset_x = resolve_parameter( | |
| selector_or_value=step.offset_x, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| offset_y = resolve_parameter( | |
| selector_or_value=step.offset_y, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| result_detections, result_parent_id = [], [] | |
| for detection in detections: | |
| offset_detections = [ | |
| offset_detection(detection=d, offset_x=offset_x, offset_y=offset_y) | |
| for d in detection | |
| ] | |
| result_detections.append(offset_detections) | |
| result_parent_id.append([d[PARENT_ID_KEY] for d in offset_detections]) | |
| step_selector = construct_step_selector(step_name=step.name) | |
| outputs_lookup[step_selector] = [ | |
| {"predictions": d, PARENT_ID_KEY: p, "image": i, "prediction_type": pt} | |
| for d, p, i, pt in zip( | |
| result_detections, result_parent_id, images_meta, predictions_type | |
| ) | |
| ] | |
| return None, outputs_lookup | |
| def offset_detection( | |
| detection: Dict[str, Any], offset_x: int, offset_y: int | |
| ) -> Dict[str, Any]: | |
| detection_copy = deepcopy(detection) | |
| detection_copy[WIDTH_KEY] += round(offset_x) | |
| detection_copy[HEIGHT_KEY] += round(offset_y) | |
| detection_copy[PARENT_ID_KEY] = detection_copy[DETECTION_ID_KEY] | |
| detection_copy[DETECTION_ID_KEY] = str(uuid4()) | |
| return detection_copy | |
| async def run_static_crop_step( | |
| step: Union[AbsoluteStaticCrop, RelativeStaticCrop], | |
| runtime_parameters: Dict[str, Any], | |
| outputs_lookup: OutputsLookup, | |
| model_manager: ModelManager, | |
| api_key: Optional[str], | |
| step_execution_mode: StepExecutionMode, | |
| ) -> Tuple[NextStepReference, OutputsLookup]: | |
| image = get_image( | |
| step=step, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| decoded_images = [load_image(e) for e in image] | |
| decoded_images = [ | |
| i[0] if i[1] is True else i[0][:, :, ::-1] for i in decoded_images | |
| ] | |
| origin_image_shape = extract_origin_size_from_images( | |
| input_images=image, | |
| decoded_images=decoded_images, | |
| ) | |
| crops = [ | |
| take_static_crop( | |
| image=i, | |
| crop=step, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| origin_size=size, | |
| ) | |
| for i, size in zip(decoded_images, origin_image_shape) | |
| ] | |
| parent_ids = [c[PARENT_ID_KEY] for c in crops] | |
| outputs_lookup[construct_step_selector(step_name=step.name)] = { | |
| "crops": crops, | |
| PARENT_ID_KEY: parent_ids, | |
| } | |
| return None, outputs_lookup | |
| def extract_origin_size_from_images( | |
| input_images: List[Union[dict, np.ndarray]], | |
| decoded_images: List[np.ndarray], | |
| ) -> List[Dict[str, int]]: | |
| result = [] | |
| for input_image, decoded_image in zip(input_images, decoded_images): | |
| if ( | |
| issubclass(type(input_image), dict) | |
| and ORIGIN_COORDINATES_KEY in input_image | |
| ): | |
| result.append(input_image[ORIGIN_COORDINATES_KEY][ORIGIN_SIZE_KEY]) | |
| else: | |
| result.append( | |
| {HEIGHT_KEY: decoded_image.shape[0], WIDTH_KEY: decoded_image.shape[1]} | |
| ) | |
| return result | |
| def take_static_crop( | |
| image: np.ndarray, | |
| crop: Union[AbsoluteStaticCrop, RelativeStaticCrop], | |
| runtime_parameters: Dict[str, Any], | |
| outputs_lookup: OutputsLookup, | |
| origin_size: dict, | |
| ) -> Dict[str, Union[str, np.ndarray]]: | |
| resolve_parameter_closure = partial( | |
| resolve_parameter, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| x_center = resolve_parameter_closure(crop.x_center) | |
| y_center = resolve_parameter_closure(crop.y_center) | |
| width = resolve_parameter_closure(crop.width) | |
| height = resolve_parameter_closure(crop.height) | |
| if crop.type == "RelativeStaticCrop": | |
| x_center = round(image.shape[1] * x_center) | |
| y_center = round(image.shape[0] * y_center) | |
| width = round(image.shape[1] * width) | |
| height = round(image.shape[0] * height) | |
| x_min = round(x_center - width / 2) | |
| y_min = round(y_center - height / 2) | |
| x_max = round(x_min + width) | |
| y_max = round(y_min + height) | |
| cropped_image = image[y_min:y_max, x_min:x_max] | |
| return { | |
| IMAGE_TYPE_KEY: ImageType.NUMPY_OBJECT.value, | |
| IMAGE_VALUE_KEY: cropped_image, | |
| PARENT_ID_KEY: f"$steps.{crop.name}", | |
| ORIGIN_COORDINATES_KEY: { | |
| CENTER_X_KEY: x_center, | |
| CENTER_Y_KEY: y_center, | |
| ORIGIN_SIZE_KEY: origin_size, | |
| }, | |
| } | |
| async def run_detections_consensus_step( | |
| step: DetectionsConsensus, | |
| runtime_parameters: Dict[str, Any], | |
| outputs_lookup: OutputsLookup, | |
| model_manager: ModelManager, | |
| api_key: Optional[str], | |
| step_execution_mode: StepExecutionMode, | |
| ) -> Tuple[NextStepReference, OutputsLookup]: | |
| resolve_parameter_closure = partial( | |
| resolve_parameter, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| all_predictions = [resolve_parameter_closure(p) for p in step.predictions] | |
| # all_predictions has shape (n_consensus_input, bs, img_predictions) | |
| if len(all_predictions) < 1: | |
| raise ExecutionGraphError( | |
| f"Consensus step requires at least one source of predictions." | |
| ) | |
| batch_sizes = get_and_validate_batch_sizes( | |
| all_predictions=all_predictions, | |
| step_name=step.name, | |
| ) | |
| images_meta_selector = construct_selector_pointing_step_output( | |
| selector=step.predictions[0], | |
| new_output="image", | |
| ) | |
| images_meta = resolve_parameter_closure(images_meta_selector) | |
| batch_size = batch_sizes[0] | |
| results = [] | |
| for batch_index in range(batch_size): | |
| batch_element_predictions = [e[batch_index] for e in all_predictions] | |
| ( | |
| parent_id, | |
| object_present, | |
| presence_confidence, | |
| consensus_detections, | |
| ) = resolve_batch_consensus( | |
| predictions=batch_element_predictions, | |
| required_votes=resolve_parameter_closure(step.required_votes), | |
| class_aware=resolve_parameter_closure(step.class_aware), | |
| iou_threshold=resolve_parameter_closure(step.iou_threshold), | |
| confidence=resolve_parameter_closure(step.confidence), | |
| classes_to_consider=resolve_parameter_closure(step.classes_to_consider), | |
| required_objects=resolve_parameter_closure(step.required_objects), | |
| presence_confidence_aggregation=step.presence_confidence_aggregation, | |
| detections_merge_confidence_aggregation=step.detections_merge_confidence_aggregation, | |
| detections_merge_coordinates_aggregation=step.detections_merge_coordinates_aggregation, | |
| ) | |
| results.append( | |
| { | |
| "predictions": consensus_detections, | |
| "parent_id": parent_id, | |
| "object_present": object_present, | |
| "presence_confidence": presence_confidence, | |
| "image": images_meta[batch_index], | |
| "prediction_type": "object-detection", | |
| } | |
| ) | |
| outputs_lookup[construct_step_selector(step_name=step.name)] = results | |
| return None, outputs_lookup | |
| def get_and_validate_batch_sizes( | |
| all_predictions: List[List[List[dict]]], | |
| step_name: str, | |
| ) -> List[int]: | |
| batch_sizes = get_predictions_batch_sizes(all_predictions=all_predictions) | |
| if not all_batch_sizes_equal(batch_sizes=batch_sizes): | |
| raise ExecutionGraphError( | |
| f"Detected missmatch of input dimensions in step: {step_name}" | |
| ) | |
| return batch_sizes | |
| def get_predictions_batch_sizes(all_predictions: List[List[List[dict]]]) -> List[int]: | |
| return [len(predictions) for predictions in all_predictions] | |
| def all_batch_sizes_equal(batch_sizes: List[int]) -> bool: | |
| if len(batch_sizes) == 0: | |
| return True | |
| reference = batch_sizes[0] | |
| return all(e == reference for e in batch_sizes) | |
| def resolve_batch_consensus( | |
| predictions: List[List[dict]], | |
| required_votes: int, | |
| class_aware: bool, | |
| iou_threshold: float, | |
| confidence: float, | |
| classes_to_consider: Optional[List[str]], | |
| required_objects: Optional[Union[int, Dict[str, int]]], | |
| presence_confidence_aggregation: AggregationMode, | |
| detections_merge_confidence_aggregation: AggregationMode, | |
| detections_merge_coordinates_aggregation: AggregationMode, | |
| ) -> Tuple[str, bool, Dict[str, float], List[dict]]: | |
| if does_not_detected_objects_in_any_source(predictions=predictions): | |
| return "undefined", False, {}, [] | |
| parent_id = get_parent_id_of_predictions_from_different_sources( | |
| predictions=predictions, | |
| ) | |
| predictions = filter_predictions( | |
| predictions=predictions, | |
| classes_to_consider=classes_to_consider, | |
| ) | |
| detections_already_considered = set() | |
| consensus_detections = [] | |
| for source_id, detection in enumerate_detections(predictions=predictions): | |
| ( | |
| consensus_detections_update, | |
| detections_already_considered, | |
| ) = get_consensus_for_single_detection( | |
| detection=detection, | |
| source_id=source_id, | |
| predictions=predictions, | |
| iou_threshold=iou_threshold, | |
| class_aware=class_aware, | |
| required_votes=required_votes, | |
| confidence=confidence, | |
| detections_merge_confidence_aggregation=detections_merge_confidence_aggregation, | |
| detections_merge_coordinates_aggregation=detections_merge_coordinates_aggregation, | |
| detections_already_considered=detections_already_considered, | |
| ) | |
| consensus_detections += consensus_detections_update | |
| ( | |
| object_present, | |
| presence_confidence, | |
| ) = check_objects_presence_in_consensus_predictions( | |
| consensus_detections=consensus_detections, | |
| aggregation_mode=presence_confidence_aggregation, | |
| class_aware=class_aware, | |
| required_objects=required_objects, | |
| ) | |
| return ( | |
| parent_id, | |
| object_present, | |
| presence_confidence, | |
| consensus_detections, | |
| ) | |
| def get_consensus_for_single_detection( | |
| detection: dict, | |
| source_id: int, | |
| predictions: List[List[dict]], | |
| iou_threshold: float, | |
| class_aware: bool, | |
| required_votes: int, | |
| confidence: float, | |
| detections_merge_confidence_aggregation: AggregationMode, | |
| detections_merge_coordinates_aggregation: AggregationMode, | |
| detections_already_considered: Set[str], | |
| ) -> Tuple[List[dict], Set[str]]: | |
| if detection["detection_id"] in detections_already_considered: | |
| return ([], detections_already_considered) | |
| consensus_detections = [] | |
| detections_with_max_overlap = ( | |
| get_detections_from_different_sources_with_max_overlap( | |
| detection=detection, | |
| source=source_id, | |
| predictions=predictions, | |
| iou_threshold=iou_threshold, | |
| class_aware=class_aware, | |
| detections_already_considered=detections_already_considered, | |
| ) | |
| ) | |
| if len(detections_with_max_overlap) < (required_votes - 1): | |
| return consensus_detections, detections_already_considered | |
| detections_to_merge = [detection] + [ | |
| matched_value[0] for matched_value in detections_with_max_overlap.values() | |
| ] | |
| merged_detection = merge_detections( | |
| detections=detections_to_merge, | |
| confidence_aggregation_mode=detections_merge_confidence_aggregation, | |
| boxes_aggregation_mode=detections_merge_coordinates_aggregation, | |
| ) | |
| if merged_detection["confidence"] < confidence: | |
| return consensus_detections, detections_already_considered | |
| consensus_detections.append(merged_detection) | |
| detections_already_considered.add(detection[DETECTION_ID_KEY]) | |
| for matched_value in detections_with_max_overlap.values(): | |
| detections_already_considered.add(matched_value[0][DETECTION_ID_KEY]) | |
| return consensus_detections, detections_already_considered | |
| def check_objects_presence_in_consensus_predictions( | |
| consensus_detections: List[dict], | |
| class_aware: bool, | |
| aggregation_mode: AggregationMode, | |
| required_objects: Optional[Union[int, Dict[str, int]]], | |
| ) -> Tuple[bool, Dict[str, float]]: | |
| if len(consensus_detections) == 0: | |
| return False, {} | |
| if required_objects is None: | |
| required_objects = 0 | |
| if issubclass(type(required_objects), dict) and not class_aware: | |
| required_objects = sum(required_objects.values()) | |
| if ( | |
| issubclass(type(required_objects), int) | |
| and len(consensus_detections) < required_objects | |
| ): | |
| return False, {} | |
| if not class_aware: | |
| aggregated_confidence = aggregate_field_values( | |
| detections=consensus_detections, | |
| field="confidence", | |
| aggregation_mode=aggregation_mode, | |
| ) | |
| return True, {"any_object": aggregated_confidence} | |
| class2detections = defaultdict(list) | |
| for detection in consensus_detections: | |
| class2detections[detection["class"]].append(detection) | |
| if issubclass(type(required_objects), dict): | |
| for requested_class, required_objects_count in required_objects.items(): | |
| if len(class2detections[requested_class]) < required_objects_count: | |
| return False, {} | |
| class2confidence = { | |
| class_name: aggregate_field_values( | |
| detections=class_detections, | |
| field="confidence", | |
| aggregation_mode=aggregation_mode, | |
| ) | |
| for class_name, class_detections in class2detections.items() | |
| } | |
| return True, class2confidence | |
| def does_not_detected_objects_in_any_source(predictions: List[List[dict]]) -> bool: | |
| return all(len(p) == 0 for p in predictions) | |
| def get_parent_id_of_predictions_from_different_sources( | |
| predictions: List[List[dict]], | |
| ) -> str: | |
| encountered_parent_ids = { | |
| p[PARENT_ID_KEY] for prediction_source in predictions for p in prediction_source | |
| } | |
| if len(encountered_parent_ids) > 1: | |
| raise ExecutionGraphError( | |
| f"Missmatch in predictions - while executing consensus step, " | |
| f"in equivalent batches, detections are assigned different parent " | |
| f"identifiers, whereas consensus can only be applied for predictions " | |
| f"made against the same input." | |
| ) | |
| return list(encountered_parent_ids)[0] | |
| def filter_predictions( | |
| predictions: List[List[dict]], | |
| classes_to_consider: Optional[List[str]], | |
| ) -> List[List[dict]]: | |
| if classes_to_consider is None: | |
| return predictions | |
| classes_to_consider = set(classes_to_consider) | |
| return [ | |
| [ | |
| detection | |
| for detection in detections | |
| if detection["class"] in classes_to_consider | |
| ] | |
| for detections in predictions | |
| ] | |
| def get_detections_from_different_sources_with_max_overlap( | |
| detection: dict, | |
| source: int, | |
| predictions: List[List[dict]], | |
| iou_threshold: float, | |
| class_aware: bool, | |
| detections_already_considered: Set[str], | |
| ) -> Dict[int, Tuple[dict, float]]: | |
| current_max_overlap = {} | |
| for other_source, other_detection in enumerate_detections( | |
| predictions=predictions, | |
| excluded_source=source, | |
| ): | |
| if other_detection[DETECTION_ID_KEY] in detections_already_considered: | |
| continue | |
| if class_aware and detection["class"] != other_detection["class"]: | |
| continue | |
| iou_value = calculate_iou( | |
| detection_a=detection, | |
| detection_b=other_detection, | |
| ) | |
| if iou_value <= iou_threshold: | |
| continue | |
| if current_max_overlap.get(other_source) is None: | |
| current_max_overlap[other_source] = (other_detection, iou_value) | |
| if current_max_overlap[other_source][1] < iou_value: | |
| current_max_overlap[other_source] = (other_detection, iou_value) | |
| return current_max_overlap | |
| def enumerate_detections( | |
| predictions: List[List[dict]], | |
| excluded_source: Optional[int] = None, | |
| ) -> Generator[Tuple[int, dict], None, None]: | |
| for source_id, detections in enumerate(predictions): | |
| if excluded_source is not None and excluded_source == source_id: | |
| continue | |
| for detection in detections: | |
| yield source_id, detection | |
| def calculate_iou(detection_a: dict, detection_b: dict) -> float: | |
| box_a = detection_to_xyxy(detection=detection_a) | |
| box_b = detection_to_xyxy(detection=detection_b) | |
| x_a = max(box_a[0], box_b[0]) | |
| y_a = max(box_a[1], box_b[1]) | |
| x_b = min(box_a[2], box_b[2]) | |
| y_b = min(box_a[3], box_b[3]) | |
| intersection = max(0, x_b - x_a) * max(0, y_b - y_a) | |
| bbox_a_area, bbox_b_area = get_detection_sizes( | |
| detections=[detection_a, detection_b] | |
| ) | |
| union = float(bbox_a_area + bbox_b_area - intersection) | |
| if union == 0.0: | |
| return 0.0 | |
| return intersection / float(bbox_a_area + bbox_b_area - intersection) | |
| def detection_to_xyxy(detection: dict) -> Tuple[int, int, int, int]: | |
| x_min = round(detection["x"] - detection[WIDTH_KEY] / 2) | |
| y_min = round(detection["y"] - detection[HEIGHT_KEY] / 2) | |
| x_max = round(x_min + detection[WIDTH_KEY]) | |
| y_max = round(y_min + detection[HEIGHT_KEY]) | |
| return x_min, y_min, x_max, y_max | |
| def merge_detections( | |
| detections: List[dict], | |
| confidence_aggregation_mode: AggregationMode, | |
| boxes_aggregation_mode: AggregationMode, | |
| ) -> dict: | |
| class_name, class_id = AGGREGATION_MODE2CLASS_SELECTOR[confidence_aggregation_mode]( | |
| detections | |
| ) | |
| x, y, width, height = AGGREGATION_MODE2BOXES_AGGREGATOR[boxes_aggregation_mode]( | |
| detections | |
| ) | |
| return { | |
| PARENT_ID_KEY: detections[0][PARENT_ID_KEY], | |
| DETECTION_ID_KEY: f"{uuid4()}", | |
| "class": class_name, | |
| "class_id": class_id, | |
| "confidence": aggregate_field_values( | |
| detections=detections, | |
| field="confidence", | |
| aggregation_mode=confidence_aggregation_mode, | |
| ), | |
| "x": x, | |
| "y": y, | |
| "width": width, | |
| "height": height, | |
| } | |
| def get_majority_class(detections: List[dict]) -> Tuple[str, int]: | |
| class_counts = Counter(d["class"] for d in detections) | |
| most_common_class_name = class_counts.most_common(1)[0][0] | |
| class_id = [ | |
| d["class_id"] for d in detections if d["class"] == most_common_class_name | |
| ][0] | |
| return most_common_class_name, class_id | |
| def get_class_of_most_confident_detection(detections: List[dict]) -> Tuple[str, int]: | |
| max_confidence = aggregate_field_values( | |
| detections=detections, | |
| field="confidence", | |
| aggregation_mode=AggregationMode.MAX, | |
| ) | |
| most_confident_prediction = [ | |
| d for d in detections if d["confidence"] == max_confidence | |
| ][0] | |
| return most_confident_prediction["class"], most_confident_prediction["class_id"] | |
| def get_class_of_least_confident_detection(detections: List[dict]) -> Tuple[str, int]: | |
| max_confidence = aggregate_field_values( | |
| detections=detections, | |
| field="confidence", | |
| aggregation_mode=AggregationMode.MIN, | |
| ) | |
| most_confident_prediction = [ | |
| d for d in detections if d["confidence"] == max_confidence | |
| ][0] | |
| return most_confident_prediction["class"], most_confident_prediction["class_id"] | |
| AGGREGATION_MODE2CLASS_SELECTOR = { | |
| AggregationMode.MAX: get_class_of_most_confident_detection, | |
| AggregationMode.MIN: get_class_of_least_confident_detection, | |
| AggregationMode.AVERAGE: get_majority_class, | |
| } | |
| def get_average_bounding_box(detections: List[dict]) -> Tuple[int, int, int, int]: | |
| x = round(aggregate_field_values(detections=detections, field="x")) | |
| y = round(aggregate_field_values(detections=detections, field="y")) | |
| width = round(aggregate_field_values(detections=detections, field="width")) | |
| height = round(aggregate_field_values(detections=detections, field="height")) | |
| return x, y, width, height | |
| def get_smallest_bounding_box(detections: List[dict]) -> Tuple[int, int, int, int]: | |
| detection_sizes = get_detection_sizes(detections=detections) | |
| smallest_size = min(detection_sizes) | |
| matching_detection_id = [ | |
| idx for idx, v in enumerate(detection_sizes) if v == smallest_size | |
| ][0] | |
| matching_detection = detections[matching_detection_id] | |
| return ( | |
| matching_detection["x"], | |
| matching_detection["y"], | |
| matching_detection["width"], | |
| matching_detection["height"], | |
| ) | |
| def get_largest_bounding_box(detections: List[dict]) -> Tuple[int, int, int, int]: | |
| detection_sizes = get_detection_sizes(detections=detections) | |
| largest_size = max(detection_sizes) | |
| matching_detection_id = [ | |
| idx for idx, v in enumerate(detection_sizes) if v == largest_size | |
| ][0] | |
| matching_detection = detections[matching_detection_id] | |
| return ( | |
| matching_detection["x"], | |
| matching_detection["y"], | |
| matching_detection[WIDTH_KEY], | |
| matching_detection[HEIGHT_KEY], | |
| ) | |
| AGGREGATION_MODE2BOXES_AGGREGATOR = { | |
| AggregationMode.MAX: get_largest_bounding_box, | |
| AggregationMode.MIN: get_smallest_bounding_box, | |
| AggregationMode.AVERAGE: get_average_bounding_box, | |
| } | |
| def get_detection_sizes(detections: List[dict]) -> List[float]: | |
| return [d[HEIGHT_KEY] * d[WIDTH_KEY] for d in detections] | |
| def aggregate_field_values( | |
| detections: List[dict], | |
| field: str, | |
| aggregation_mode: AggregationMode = AggregationMode.AVERAGE, | |
| ) -> float: | |
| values = [d[field] for d in detections] | |
| return AGGREGATION_MODE2FIELD_AGGREGATOR[aggregation_mode](values) | |
| async def run_active_learning_data_collector( | |
| step: ActiveLearningDataCollector, | |
| runtime_parameters: Dict[str, Any], | |
| outputs_lookup: OutputsLookup, | |
| model_manager: ModelManager, | |
| api_key: Optional[str], | |
| step_execution_mode: StepExecutionMode, | |
| active_learning_middleware: WorkflowsActiveLearningMiddleware, | |
| background_tasks: Optional[BackgroundTasks], | |
| ) -> Tuple[NextStepReference, OutputsLookup]: | |
| resolve_parameter_closure = partial( | |
| resolve_parameter, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| image = get_image( | |
| step=step, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| images_meta_selector = construct_selector_pointing_step_output( | |
| selector=step.predictions, | |
| new_output="image", | |
| ) | |
| images_meta = resolve_parameter_closure(images_meta_selector) | |
| prediction_type_selector = construct_selector_pointing_step_output( | |
| selector=step.predictions, | |
| new_output="prediction_type", | |
| ) | |
| predictions_type = resolve_parameter( | |
| selector_or_value=prediction_type_selector, | |
| runtime_parameters=runtime_parameters, | |
| outputs_lookup=outputs_lookup, | |
| ) | |
| prediction_type = set(predictions_type) | |
| if len(prediction_type) > 1: | |
| raise ExecutionGraphError( | |
| f"Active Learning data collection step requires only single prediction " | |
| f"type to be part of ingest. Detected: {prediction_type}." | |
| ) | |
| prediction_type = next(iter(prediction_type)) | |
| predictions = resolve_parameter_closure(step.predictions) | |
| predictions_output_name = get_last_selector_chunk(step.predictions) | |
| target_dataset = resolve_parameter_closure(step.target_dataset) | |
| target_dataset_api_key = resolve_parameter_closure(step.target_dataset_api_key) | |
| disable_active_learning = resolve_parameter_closure(step.disable_active_learning) | |
| active_learning_compatible_predictions = [ | |
| {"image": image_meta, predictions_output_name: prediction} | |
| for image_meta, prediction in zip(images_meta, predictions) | |
| ] | |
| active_learning_middleware.register( | |
| # this should actually be asyncio, but that requires a lot of backend components redesign | |
| dataset_name=target_dataset, | |
| images=image, | |
| predictions=active_learning_compatible_predictions, | |
| api_key=target_dataset_api_key or api_key, | |
| active_learning_disabled_for_request=disable_active_learning, | |
| prediction_type=prediction_type, | |
| background_tasks=background_tasks, | |
| active_learning_configuration=step.active_learning_configuration, | |
| ) | |
| return None, outputs_lookup | |