Spaces:
Configuration error
Configuration error
| from collections import OrderedDict | |
| from typing import List, Optional, Tuple | |
| from uuid import uuid4 | |
| import numpy as np | |
| from inference.core import logger | |
| from inference.core.active_learning.cache_operations import ( | |
| return_strategy_credit, | |
| use_credit_of_matching_strategy, | |
| ) | |
| from inference.core.active_learning.entities import ( | |
| ActiveLearningConfiguration, | |
| ImageDimensions, | |
| Prediction, | |
| PredictionType, | |
| SamplingMethod, | |
| ) | |
| from inference.core.active_learning.post_processing import ( | |
| adjust_prediction_to_client_scaling_factor, | |
| encode_prediction, | |
| ) | |
| from inference.core.cache.base import BaseCache | |
| from inference.core.env import ACTIVE_LEARNING_TAGS | |
| from inference.core.roboflow_api import ( | |
| annotate_image_at_roboflow, | |
| register_image_at_roboflow, | |
| ) | |
| from inference.core.utils.image_utils import encode_image_to_jpeg_bytes | |
| from inference.core.utils.preprocess import downscale_image_keeping_aspect_ratio | |
| def execute_sampling( | |
| image: np.ndarray, | |
| prediction: Prediction, | |
| prediction_type: PredictionType, | |
| sampling_methods: List[SamplingMethod], | |
| ) -> List[str]: | |
| matching_strategies = [] | |
| for method in sampling_methods: | |
| sampling_result = method.sample(image, prediction, prediction_type) | |
| if sampling_result: | |
| matching_strategies.append(method.name) | |
| return matching_strategies | |
| def execute_datapoint_registration( | |
| cache: BaseCache, | |
| matching_strategies: List[str], | |
| image: np.ndarray, | |
| prediction: Prediction, | |
| prediction_type: PredictionType, | |
| configuration: ActiveLearningConfiguration, | |
| api_key: str, | |
| batch_name: str, | |
| ) -> None: | |
| local_image_id = str(uuid4()) | |
| encoded_image, scaling_factor = prepare_image_to_registration( | |
| image=image, | |
| desired_size=configuration.max_image_size, | |
| jpeg_compression_level=configuration.jpeg_compression_level, | |
| ) | |
| prediction = adjust_prediction_to_client_scaling_factor( | |
| prediction=prediction, | |
| scaling_factor=scaling_factor, | |
| prediction_type=prediction_type, | |
| ) | |
| matching_strategies_limits = OrderedDict( | |
| (strategy_name, configuration.strategies_limits[strategy_name]) | |
| for strategy_name in matching_strategies | |
| ) | |
| strategy_with_spare_credit = use_credit_of_matching_strategy( | |
| cache=cache, | |
| workspace=configuration.workspace_id, | |
| project=configuration.dataset_id, | |
| matching_strategies_limits=matching_strategies_limits, | |
| ) | |
| if strategy_with_spare_credit is None: | |
| logger.debug(f"Limit on Active Learning strategy reached.") | |
| return None | |
| register_datapoint_at_roboflow( | |
| cache=cache, | |
| strategy_with_spare_credit=strategy_with_spare_credit, | |
| encoded_image=encoded_image, | |
| local_image_id=local_image_id, | |
| prediction=prediction, | |
| prediction_type=prediction_type, | |
| configuration=configuration, | |
| api_key=api_key, | |
| batch_name=batch_name, | |
| ) | |
| def prepare_image_to_registration( | |
| image: np.ndarray, | |
| desired_size: Optional[ImageDimensions], | |
| jpeg_compression_level: int, | |
| ) -> Tuple[bytes, float]: | |
| scaling_factor = 1.0 | |
| if desired_size is not None: | |
| height_before_scale = image.shape[0] | |
| image = downscale_image_keeping_aspect_ratio( | |
| image=image, | |
| desired_size=desired_size.to_wh(), | |
| ) | |
| scaling_factor = image.shape[0] / height_before_scale | |
| return ( | |
| encode_image_to_jpeg_bytes(image=image, jpeg_quality=jpeg_compression_level), | |
| scaling_factor, | |
| ) | |
| def register_datapoint_at_roboflow( | |
| cache: BaseCache, | |
| strategy_with_spare_credit: str, | |
| encoded_image: bytes, | |
| local_image_id: str, | |
| prediction: Prediction, | |
| prediction_type: PredictionType, | |
| configuration: ActiveLearningConfiguration, | |
| api_key: str, | |
| batch_name: str, | |
| ) -> None: | |
| tags = collect_tags( | |
| configuration=configuration, | |
| sampling_strategy=strategy_with_spare_credit, | |
| ) | |
| roboflow_image_id = safe_register_image_at_roboflow( | |
| cache=cache, | |
| strategy_with_spare_credit=strategy_with_spare_credit, | |
| encoded_image=encoded_image, | |
| local_image_id=local_image_id, | |
| configuration=configuration, | |
| api_key=api_key, | |
| batch_name=batch_name, | |
| tags=tags, | |
| ) | |
| if is_prediction_registration_forbidden( | |
| prediction=prediction, | |
| persist_predictions=configuration.persist_predictions, | |
| roboflow_image_id=roboflow_image_id, | |
| ): | |
| return None | |
| encoded_prediction, prediction_file_type = encode_prediction( | |
| prediction=prediction, prediction_type=prediction_type | |
| ) | |
| _ = annotate_image_at_roboflow( | |
| api_key=api_key, | |
| dataset_id=configuration.dataset_id, | |
| local_image_id=local_image_id, | |
| roboflow_image_id=roboflow_image_id, | |
| annotation_content=encoded_prediction, | |
| annotation_file_type=prediction_file_type, | |
| is_prediction=True, | |
| ) | |
| def collect_tags( | |
| configuration: ActiveLearningConfiguration, sampling_strategy: str | |
| ) -> List[str]: | |
| tags = ACTIVE_LEARNING_TAGS if ACTIVE_LEARNING_TAGS is not None else [] | |
| tags.extend(configuration.tags) | |
| tags.extend(configuration.strategies_tags[sampling_strategy]) | |
| if configuration.persist_predictions: | |
| # this replacement is needed due to backend input validation | |
| tags.append(configuration.model_id.replace("/", "-")) | |
| return tags | |
| def safe_register_image_at_roboflow( | |
| cache: BaseCache, | |
| strategy_with_spare_credit: str, | |
| encoded_image: bytes, | |
| local_image_id: str, | |
| configuration: ActiveLearningConfiguration, | |
| api_key: str, | |
| batch_name: str, | |
| tags: List[str], | |
| ) -> Optional[str]: | |
| credit_to_be_returned = False | |
| try: | |
| registration_response = register_image_at_roboflow( | |
| api_key=api_key, | |
| dataset_id=configuration.dataset_id, | |
| local_image_id=local_image_id, | |
| image_bytes=encoded_image, | |
| batch_name=batch_name, | |
| tags=tags, | |
| ) | |
| image_duplicated = registration_response.get("duplicate", False) | |
| if image_duplicated: | |
| credit_to_be_returned = True | |
| logger.warning(f"Image duplication detected: {registration_response}.") | |
| return None | |
| return registration_response["id"] | |
| except Exception as error: | |
| credit_to_be_returned = True | |
| raise error | |
| finally: | |
| if credit_to_be_returned: | |
| return_strategy_credit( | |
| cache=cache, | |
| workspace=configuration.workspace_id, | |
| project=configuration.dataset_id, | |
| strategy_name=strategy_with_spare_credit, | |
| ) | |
| def is_prediction_registration_forbidden( | |
| prediction: Prediction, | |
| persist_predictions: bool, | |
| roboflow_image_id: Optional[str], | |
| ) -> bool: | |
| return ( | |
| roboflow_image_id is None | |
| or persist_predictions is False | |
| or prediction.get("is_stub", False) is True | |
| or (len(prediction.get("predictions", [])) == 0 and "top" not in prediction) | |
| ) | |