|
|
""" |
|
|
Before we can identify damage sites, we need to look for suitable regions in the image. |
|
|
Typically, damage sites appear as dark regions in the image. Instead of simple thresholding, we use |
|
|
a clustering approach to identify regions that belong together and form damage site candidates. |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import scipy.ndimage as ndi |
|
|
from scipy.spatial import KDTree |
|
|
from sklearn.cluster import DBSCAN |
|
|
import logging |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
def get_centroids(image, image_threshold=20, |
|
|
eps=1, min_samples=5, metric='euclidean', |
|
|
min_size=20, fill_holes=False, |
|
|
filter_close_centroids=False, filter_radius=50) -> list: |
|
|
""" |
|
|
Determine centroids of clusters corresponding to potential damage sites. |
|
|
In a first step, a threshold is applied to the input image to identify areas of potential damage sites. |
|
|
Using DBSCAN, these agglomerations of pixels are fitted into clusters. Then, the mean x/y values are determined |
|
|
from pixels belonging to one cluster. If the number of pixels in a given cluster excees the threshold given by min_size, this cluster is added |
|
|
to the list of (x,y) coordinates that is returned as the final list potential damage sites. |
|
|
|
|
|
Sometimes, clusters may be found in very close proximity to each other, we can reject those to avoid |
|
|
classifying the same event multiple times (which may distort our statistics). |
|
|
|
|
|
DBScan documentation: https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html |
|
|
|
|
|
Args: |
|
|
image: Input SEM image (PIL Image or NumPy array). |
|
|
image_threshold (int, optional): Threshold to be applied to the image to identify candidates for damage sites. Defaults to 20. |
|
|
eps (int, optional): parameter eps of DBSCAN: The maximum distance between two samples for one to be considered as in the neighborhood of the other. Defaults to 1. |
|
|
min_samples (int, optional): parameter min_samples of DBSCAN: The number of samples (or total weight) in a neighborhood for a point to be considered as a core point. Defaults to 5. |
|
|
metric (str, optional): parameter metric of DBSCAN. Defaults to 'euclidean'. |
|
|
min_size (int, optional): Minimum number of pixels in a cluster for the damage site candidate to be considered in the final list. Defaults to 20. |
|
|
fill_holes (bool, optional): Fill small holes in damage sites clusters using binary_fill_holes. Defaults to False. |
|
|
filter_close_centroids (bool, optional): Filter cluster centroids within a given radius. Defaults to False |
|
|
filter_radius (float, optional): Radius within which centroids are considered to be the same. Defaults to 50 |
|
|
|
|
|
Returns: |
|
|
list: list of (x,y) coordinates of the centroids of the clusters of accepted damage site candidates. |
|
|
""" |
|
|
|
|
|
centroids = [] |
|
|
logging.info(f"get_centroids: Input image type: {type(image)}") |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(image, Image.Image): |
|
|
|
|
|
if image.mode == 'RGB': |
|
|
image_array = np.array(image.convert('L')) |
|
|
logging.info("get_centroids: Converted RGB PIL Image to grayscale NumPy array.") |
|
|
else: |
|
|
image_array = np.array(image) |
|
|
logging.info("get_centroids: Converted PIL Image to NumPy array.") |
|
|
elif isinstance(image, np.ndarray): |
|
|
|
|
|
if image.ndim == 3 and image.shape[2] in [3, 4]: |
|
|
image_array = np.mean(image, axis=2).astype(image.dtype) |
|
|
logging.info("get_centroids: Converted multi-channel NumPy array to grayscale NumPy array.") |
|
|
else: |
|
|
image_array = image |
|
|
logging.info("get_centroids: Image is already a suitable NumPy array.") |
|
|
else: |
|
|
logging.error("get_centroids: Unsupported image format received. Expected PIL Image or NumPy array.") |
|
|
raise ValueError("Unsupported image format. Expected PIL Image or NumPy array for thresholding.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cluster_candidates_mask = image_array < image_threshold |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if fill_holes: |
|
|
cluster_candidates_mask = ndi.binary_fill_holes(cluster_candidates_mask) |
|
|
|
|
|
|
|
|
cluster_candidates = np.asarray(cluster_candidates_mask).nonzero() |
|
|
cluster_candidates = np.transpose(cluster_candidates) |
|
|
|
|
|
|
|
|
if cluster_candidates.size == 0: |
|
|
logging.warning("No cluster candidates found after thresholding. Returning empty centroids list.") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric=metric) |
|
|
|
|
|
dbscan.fit(cluster_candidates) |
|
|
|
|
|
labels = dbscan.labels_ |
|
|
|
|
|
n_clusters = len(set(labels)) - (1 if -1 in labels else 0) |
|
|
n_noise = list(labels).count(-1) |
|
|
logging.debug('# clusters {}, #noise {}'.format(n_clusters, n_noise)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i in set(labels): |
|
|
if i > -1: |
|
|
|
|
|
cluster_points = cluster_candidates[labels == i, :] |
|
|
if len(cluster_points) > min_size: |
|
|
x_mean = np.mean(cluster_points, axis=0)[0] |
|
|
y_mean = np.mean(cluster_points, axis=0)[1] |
|
|
centroids.append([x_mean, y_mean]) |
|
|
|
|
|
if filter_close_centroids and len(centroids) > 1: |
|
|
proximity_tree = KDTree(centroids) |
|
|
pairs = proximity_tree.query_pairs(filter_radius) |
|
|
|
|
|
|
|
|
indices_to_remove = set() |
|
|
for p1_idx, p2_idx in pairs: |
|
|
|
|
|
|
|
|
indices_to_remove.add(max(p1_idx, p2_idx)) |
|
|
|
|
|
|
|
|
filtered_centroids = [centroid for i, centroid in enumerate(centroids) if i not in indices_to_remove] |
|
|
centroids = filtered_centroids |
|
|
logging.info(f"Filtered {len(indices_to_remove)} close centroids. Remaining: {len(centroids)}") |
|
|
|
|
|
|
|
|
return centroids |
|
|
|