Spaces:
Runtime error
Runtime error
File size: 4,454 Bytes
7a919c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# yyj
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import pandas as pd
import os
import json
from loguru import logger
import faiss
class Clusterer():
def __init__(self,
text: list,
text_embedding:np.array,
n_clusters: list|int):
self.text = text
self.text_embedding = text_embedding
self.n_clusters = n_clusters
# https://github.com/facebookresearch/faiss/issues/1875
def _silhouette_score(self, cluster_index, samples) -> float:
distance, _ = cluster_index.search(samples, 2)
# a = distance[:, 0], b = distance[:, 1]
# s = (b - a) / np.max(a, b)
s = (distance[:, 1] - distance[:, 0]) / np.max(distance, 1)
return s.mean()
# def _find_best_nclusters(self):
# silhouette_scores = []
# cluster_range = range(self.cluster_range[0], self.cluster_range[1],5)
# nparray = np.array(self.text_embedding)
# n, d = nparray.shape # 获取数据点的数量和维度
# x = nparray.astype('float32') # 转换数据类型为float32
# niter = 20
# verbose = True
# for ncentroids in cluster_range:
# kmeans = faiss.Kmeans(d, ncentroids, niter=niter, verbose=verbose)
# kmeans.train(x)
# scores = self._silhouette_score(kmeans.index, x)
# silhouette_scores.append(scores)
# self.result = zip(cluster_range, silhouette_scores)
# best_n_clusters = cluster_range[silhouette_scores.index(max(silhouette_scores))]
# logger.info(f"Optimal number of clusters: {best_n_clusters}")
# if best_n_clusters < 10:
# logger.debug('Number of clusters is less than 10, please check the data, set to default 10')
# self.n_clusters = 10
# else:
# self.n_clusters = best_n_clusters
def kmeanscluster(self, n_clusters: int = None):
'''
input:
- random_state: int, random state for kmeans
output:
- cluster labels saved to metadata
'''
if self.text_embedding is None:
raise ValueError('No embeddings of repo found, please run generate_embeddings first')
logger.info(f'Using n_clusters: {n_clusters}')
nparray = np.array(self.text_embedding)
n, d = nparray.shape # 获取数据点的数量和维度
x = nparray.astype('float32') # 转换数据类型为float32
ncentroids = n_clusters
niter = 100 + (n_clusters - 1) * 10
verbose = True
kmeans = faiss.Kmeans(d, ncentroids, niter=niter, verbose=verbose)
kmeans.train(x)
index = faiss.IndexFlatL2(d)
index.add(x)
D, I = index.search(kmeans.centroids, 10)
samples = {}
for i in range(ncentroids):
samples[i] ={
'cluster': i,
'samples': [self.text[j] for j in I[i]],
'distance': D[i].tolist()
}
score = self._silhouette_score(kmeans.index, x)
logger.info(f'kmeans with k = {n_clusters} complete. Silhouette score: {score}')
result = (n_clusters, score)
return result, samples
def generate_cluster(self, workdir: str):
'''
generate cluster
'''
feature_dir = os.path.join(workdir, 'cluster_features')
if not os.path.exists(feature_dir):
os.makedirs(feature_dir)
results = []
for n_clusters in self.n_clusters:
sub_dir = os.path.join(feature_dir, f'cluster_features_{n_clusters}')
if not os.path.exists(sub_dir):
os.makedirs(sub_dir)
result, samples = self.kmeanscluster(n_clusters)
# save k and score result
results.append(result)
path = os.path.join(sub_dir, f'kresult.txt')
with open(path, 'w') as f:
f.write(str(result))
# save samples
path = os.path.join(sub_dir, f'samples.json')
with open(path, 'w') as f:
json.dump(samples, f,ensure_ascii=False)
# save all results
path = os.path.join(feature_dir, 'all_results.txt')
with open(path, 'w') as f:
for k, score in results:
f.write(f'k={k}, score={score}\n')
|