freemt
Update swap columns in df_aigned
52771bf
raw
history blame
4.18 kB
"""Gne pset from cmat. Find pairs for a given cmat.
tinybee.find_pairs.py with fixed estimator='dbscan' eps=eps, min_samples=min_samples
"""
from typing import List, Tuple, Union
import numpy as np
import pandas as pd
from sklearn.cluster import DBSCAN
import logzero
from logzero import logger
from radiobee.cmat2tset import cmat2tset
from radiobee.interpolate_pset import interpolate_pset
def gen_pset(
cmat1: Union[List[List[float]], np.ndarray, pd.DataFrame],
eps: float = 10,
min_samples: int = 6,
delta: float = 7,
verbose: Union[bool, int] = False,
# ) -> List[Tuple[int, int, Union[float, str]]]:
) -> List[Tuple[Union[float, str], Union[float, str], Union[float, str]]]:
"""Gen pset from cmat.
Find pairs for a given cmat.
Args:
cmat: correlation/similarity matrix
eps: min epsilon for DBSCAN (10)
min_samples: minimum # of samples for DBSCAN (6)
delta: tolerance (7)
Returns:
pairs + "" or metric (float)
dbscan_pairs' setup
if eps is None:
eps = src_len * .01
if eps < 3:
eps = 3
if min_samples is None:
min_samples = tgt_len / 100 * 0.5
if min_samples < 3:
min_samples = 3
def gen_eps_minsamples(src_len, tgt_len):
eps = src_len * .01
if eps < 3:
eps = 3
min_samples = tgt_len / 100 * 0.5
if min_samples < 3:
min_samples = 3
return {"eps": eps, "min_samples": min_samples}
"""
if isinstance(verbose, bool):
if verbose:
verbose = 10
else:
verbose = 20
logzero.loglevel(verbose)
# if isinstance(cmat, list):
cmat = np.array(cmat1)
src_len, tgt_len = cmat.shape
# tset = cmat2tset(cmat)
tset = cmat2tset(cmat).tolist()
logger.debug("tset: %s", tset)
# iset = gen_iset(cmat, verbose=verbose, estimator=estimator)
labels = DBSCAN(eps=eps, min_samples=min_samples).fit(tset).labels_
df_tset = pd.DataFrame(tset, columns=["x", "y", "cos"])
cset = df_tset[labels > -1].to_numpy()
# sort cset
_ = sorted(cset.tolist(), key=lambda x: x[0])
iset = interpolate_pset(_, tgt_len)
# *_, ymax = zip(*tset)
# ymax = list(ymax)
# low_ = np.min(ymax) - 1 # reset to minimum_value - 1
buff = [(-1, -1, ""), (tgt_len, src_len, "")]
# for _ in range(tgt_len):
for idx, tset_elm in enumerate(tset):
logger.debug("buff: %s", buff)
# postion max in ymax and insert in buff
# if with range given by iset+-delta and
# it's valid (do not exceed constraint
# by neighboring points
# argmax = int(np.argmax(ymax))
# logger.debug("=== %s,%s === %s", _, argmax, tset[_])
logger.debug("=== %s === %s", _, tset_elm)
# ymax[_] = low_
# elm = tset[argmax]
# elm0, *_ = elm
elm0, *_ = tset_elm
# position elm in buff
idx = -1 # for making pyright happy
for idx, loc in enumerate(buff):
if loc[0] > elm0:
break
else:
idx += 1 # last
# insert elm in for valid elm
# (within range inside two neighboring points)
# pos = int(tset[argmax][0])
pos = int(tset_elm[0])
logger.debug(" %s <=> %s ", tset_elm, iset[pos])
# if abs(tset[argmax][1] - iset[pos][1]) <= delta:
if abs(tset_elm[1] - iset[pos][1]) <= delta:
if tset_elm[1] > buff[idx - 1][1] and tset_elm[1] < buff[idx][1]:
buff.insert(idx, tset_elm)
logger.debug("idx: %s, tset_elm: %s", idx, tset_elm)
else:
logger.debug("\t***\t idx: %s, tset_elm: %s", idx, tset_elm)
_ = """
if abs(tset[loc][1] - iset[loc][1]) <= delta:
if tset[loc][1] > buff[idx][1] and tset[loc][1] < buff[idx + 1][1]:
buff.insert(idx + 1, tset[loc])
# """
# remove first and last entry in buff
buff.pop(0)
buff.pop()
# return [(1, 1, "")]
return [(int(elm0), int(elm1), elm2) for elm0, elm1, elm2 in buff]