|
import requests |
|
from torch import Tensor, device |
|
from typing import List, Callable |
|
from tqdm.autonotebook import tqdm |
|
import sys |
|
import importlib |
|
import os |
|
import torch |
|
import numpy as np |
|
import queue |
|
import logging |
|
from typing import Dict, Optional, Union |
|
from pathlib import Path |
|
|
|
import huggingface_hub |
|
from huggingface_hub.constants import HUGGINGFACE_HUB_CACHE |
|
from huggingface_hub import HfApi, hf_hub_url, cached_download, HfFolder |
|
import fnmatch |
|
from packaging import version |
|
import heapq |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
def pytorch_cos_sim(a: Tensor, b: Tensor): |
|
""" |
|
Computes the cosine similarity cos_sim(a[i], b[j]) for all i and j. |
|
:return: Matrix with res[i][j] = cos_sim(a[i], b[j]) |
|
""" |
|
return cos_sim(a, b) |
|
|
|
def cos_sim(a: Tensor, b: Tensor): |
|
""" |
|
Computes the cosine similarity cos_sim(a[i], b[j]) for all i and j. |
|
:return: Matrix with res[i][j] = cos_sim(a[i], b[j]) |
|
""" |
|
if not isinstance(a, torch.Tensor): |
|
a = torch.tensor(a) |
|
|
|
if not isinstance(b, torch.Tensor): |
|
b = torch.tensor(b) |
|
|
|
if len(a.shape) == 1: |
|
a = a.unsqueeze(0) |
|
|
|
if len(b.shape) == 1: |
|
b = b.unsqueeze(0) |
|
|
|
a_norm = torch.nn.functional.normalize(a, p=2, dim=1) |
|
b_norm = torch.nn.functional.normalize(b, p=2, dim=1) |
|
return torch.mm(a_norm, b_norm.transpose(0, 1)) |
|
|
|
|
|
def dot_score(a: Tensor, b: Tensor): |
|
""" |
|
Computes the dot-product dot_prod(a[i], b[j]) for all i and j. |
|
:return: Matrix with res[i][j] = dot_prod(a[i], b[j]) |
|
""" |
|
if not isinstance(a, torch.Tensor): |
|
a = torch.tensor(a) |
|
|
|
if not isinstance(b, torch.Tensor): |
|
b = torch.tensor(b) |
|
|
|
if len(a.shape) == 1: |
|
a = a.unsqueeze(0) |
|
|
|
if len(b.shape) == 1: |
|
b = b.unsqueeze(0) |
|
|
|
return torch.mm(a, b.transpose(0, 1)) |
|
|
|
|
|
def pairwise_dot_score(a: Tensor, b: Tensor): |
|
""" |
|
Computes the pairwise dot-product dot_prod(a[i], b[i]) |
|
:return: Vector with res[i] = dot_prod(a[i], b[i]) |
|
""" |
|
if not isinstance(a, torch.Tensor): |
|
a = torch.tensor(a) |
|
|
|
if not isinstance(b, torch.Tensor): |
|
b = torch.tensor(b) |
|
|
|
return (a * b).sum(dim=-1) |
|
|
|
|
|
def pairwise_cos_sim(a: Tensor, b: Tensor): |
|
""" |
|
Computes the pairwise cossim cos_sim(a[i], b[i]) |
|
:return: Vector with res[i] = cos_sim(a[i], b[i]) |
|
""" |
|
if not isinstance(a, torch.Tensor): |
|
a = torch.tensor(a) |
|
|
|
if not isinstance(b, torch.Tensor): |
|
b = torch.tensor(b) |
|
|
|
return pairwise_dot_score(normalize_embeddings(a), normalize_embeddings(b)) |
|
|
|
|
|
def normalize_embeddings(embeddings: Tensor): |
|
""" |
|
Normalizes the embeddings matrix, so that each sentence embedding has unit length |
|
""" |
|
return torch.nn.functional.normalize(embeddings, p=2, dim=1) |
|
|
|
|
|
def paraphrase_mining(model, |
|
sentences: List[str], |
|
show_progress_bar: bool = False, |
|
batch_size:int = 32, |
|
*args, |
|
**kwargs): |
|
""" |
|
Given a list of sentences / texts, this function performs paraphrase mining. It compares all sentences against all |
|
other sentences and returns a list with the pairs that have the highest cosine similarity score. |
|
|
|
:param model: SentenceTransformer model for embedding computation |
|
:param sentences: A list of strings (texts or sentences) |
|
:param show_progress_bar: Plotting of a progress bar |
|
:param batch_size: Number of texts that are encoded simultaneously by the model |
|
:param query_chunk_size: Search for most similar pairs for #query_chunk_size at the same time. Decrease, to lower memory footprint (increases run-time). |
|
:param corpus_chunk_size: Compare a sentence simultaneously against #corpus_chunk_size other sentences. Decrease, to lower memory footprint (increases run-time). |
|
:param max_pairs: Maximal number of text pairs returned. |
|
:param top_k: For each sentence, we retrieve up to top_k other sentences |
|
:param score_function: Function for computing scores. By default, cosine similarity. |
|
:return: Returns a list of triplets with the format [score, id1, id2] |
|
""" |
|
|
|
|
|
embeddings = model.encode(sentences, show_progress_bar=show_progress_bar, batch_size=batch_size, convert_to_tensor=True) |
|
|
|
return paraphrase_mining_embeddings(embeddings, *args, **kwargs) |
|
|
|
|
|
def paraphrase_mining_embeddings(embeddings: Tensor, |
|
query_chunk_size: int = 5000, |
|
corpus_chunk_size: int = 100000, |
|
max_pairs: int = 500000, |
|
top_k: int = 100, |
|
score_function: Callable[[Tensor, Tensor], Tensor] = cos_sim): |
|
""" |
|
Given a list of sentences / texts, this function performs paraphrase mining. It compares all sentences against all |
|
other sentences and returns a list with the pairs that have the highest cosine similarity score. |
|
|
|
:param embeddings: A tensor with the embeddings |
|
:param query_chunk_size: Search for most similar pairs for #query_chunk_size at the same time. Decrease, to lower memory footprint (increases run-time). |
|
:param corpus_chunk_size: Compare a sentence simultaneously against #corpus_chunk_size other sentences. Decrease, to lower memory footprint (increases run-time). |
|
:param max_pairs: Maximal number of text pairs returned. |
|
:param top_k: For each sentence, we retrieve up to top_k other sentences |
|
:param score_function: Function for computing scores. By default, cosine similarity. |
|
:return: Returns a list of triplets with the format [score, id1, id2] |
|
""" |
|
|
|
top_k += 1 |
|
|
|
|
|
pairs = queue.PriorityQueue() |
|
min_score = -1 |
|
num_added = 0 |
|
|
|
for corpus_start_idx in range(0, len(embeddings), corpus_chunk_size): |
|
for query_start_idx in range(0, len(embeddings), query_chunk_size): |
|
scores = score_function(embeddings[query_start_idx:query_start_idx+query_chunk_size], embeddings[corpus_start_idx:corpus_start_idx+corpus_chunk_size]) |
|
|
|
scores_top_k_values, scores_top_k_idx = torch.topk(scores, min(top_k, len(scores[0])), dim=1, largest=True, sorted=False) |
|
scores_top_k_values = scores_top_k_values.cpu().tolist() |
|
scores_top_k_idx = scores_top_k_idx.cpu().tolist() |
|
|
|
for query_itr in range(len(scores)): |
|
for top_k_idx, corpus_itr in enumerate(scores_top_k_idx[query_itr]): |
|
i = query_start_idx + query_itr |
|
j = corpus_start_idx + corpus_itr |
|
|
|
if i != j and scores_top_k_values[query_itr][top_k_idx] > min_score: |
|
pairs.put((scores_top_k_values[query_itr][top_k_idx], i, j)) |
|
num_added += 1 |
|
|
|
if num_added >= max_pairs: |
|
entry = pairs.get() |
|
min_score = entry[0] |
|
|
|
|
|
added_pairs = set() |
|
pairs_list = [] |
|
while not pairs.empty(): |
|
score, i, j = pairs.get() |
|
sorted_i, sorted_j = sorted([i, j]) |
|
|
|
if sorted_i != sorted_j and (sorted_i, sorted_j) not in added_pairs: |
|
added_pairs.add((sorted_i, sorted_j)) |
|
pairs_list.append([score, i, j]) |
|
|
|
|
|
pairs_list = sorted(pairs_list, key=lambda x: x[0], reverse=True) |
|
return pairs_list |
|
|
|
|
|
def information_retrieval(*args, **kwargs): |
|
"""This function is deprecated. Use semantic_search instead""" |
|
return semantic_search(*args, **kwargs) |
|
|
|
|
|
def semantic_search(query_embeddings: Tensor, |
|
corpus_embeddings: Tensor, |
|
query_chunk_size: int = 100, |
|
corpus_chunk_size: int = 500000, |
|
top_k: int = 10, |
|
score_function: Callable[[Tensor, Tensor], Tensor] = cos_sim): |
|
""" |
|
This function performs a cosine similarity search between a list of query embeddings and a list of corpus embeddings. |
|
It can be used for Information Retrieval / Semantic Search for corpora up to about 1 Million entries. |
|
|
|
:param query_embeddings: A 2 dimensional tensor with the query embeddings. |
|
:param corpus_embeddings: A 2 dimensional tensor with the corpus embeddings. |
|
:param query_chunk_size: Process 100 queries simultaneously. Increasing that value increases the speed, but requires more memory. |
|
:param corpus_chunk_size: Scans the corpus 100k entries at a time. Increasing that value increases the speed, but requires more memory. |
|
:param top_k: Retrieve top k matching entries. |
|
:param score_function: Function for computing scores. By default, cosine similarity. |
|
:return: Returns a list with one entry for each query. Each entry is a list of dictionaries with the keys 'corpus_id' and 'score', sorted by decreasing cosine similarity scores. |
|
""" |
|
|
|
if isinstance(query_embeddings, (np.ndarray, np.generic)): |
|
query_embeddings = torch.from_numpy(query_embeddings) |
|
elif isinstance(query_embeddings, list): |
|
query_embeddings = torch.stack(query_embeddings) |
|
|
|
if len(query_embeddings.shape) == 1: |
|
query_embeddings = query_embeddings.unsqueeze(0) |
|
|
|
if isinstance(corpus_embeddings, (np.ndarray, np.generic)): |
|
corpus_embeddings = torch.from_numpy(corpus_embeddings) |
|
elif isinstance(corpus_embeddings, list): |
|
corpus_embeddings = torch.stack(corpus_embeddings) |
|
|
|
|
|
|
|
if corpus_embeddings.device != query_embeddings.device: |
|
query_embeddings = query_embeddings.to(corpus_embeddings.device) |
|
|
|
queries_result_list = [[] for _ in range(len(query_embeddings))] |
|
|
|
for query_start_idx in range(0, len(query_embeddings), query_chunk_size): |
|
|
|
for corpus_start_idx in range(0, len(corpus_embeddings), corpus_chunk_size): |
|
|
|
cos_scores = score_function(query_embeddings[query_start_idx:query_start_idx+query_chunk_size], corpus_embeddings[corpus_start_idx:corpus_start_idx+corpus_chunk_size]) |
|
|
|
|
|
cos_scores_top_k_values, cos_scores_top_k_idx = torch.topk(cos_scores, min(top_k, len(cos_scores[0])), dim=1, largest=True, sorted=False) |
|
cos_scores_top_k_values = cos_scores_top_k_values.cpu().tolist() |
|
cos_scores_top_k_idx = cos_scores_top_k_idx.cpu().tolist() |
|
|
|
for query_itr in range(len(cos_scores)): |
|
for sub_corpus_id, score in zip(cos_scores_top_k_idx[query_itr], cos_scores_top_k_values[query_itr]): |
|
corpus_id = corpus_start_idx + sub_corpus_id |
|
query_id = query_start_idx + query_itr |
|
if len(queries_result_list[query_id]) < top_k: |
|
heapq.heappush(queries_result_list[query_id], (score, corpus_id)) |
|
else: |
|
heapq.heappushpop(queries_result_list[query_id], (score, corpus_id)) |
|
|
|
|
|
for query_id in range(len(queries_result_list)): |
|
for doc_itr in range(len(queries_result_list[query_id])): |
|
score, corpus_id = queries_result_list[query_id][doc_itr] |
|
queries_result_list[query_id][doc_itr] = {'corpus_id': corpus_id, 'score': score} |
|
queries_result_list[query_id] = sorted(queries_result_list[query_id], key=lambda x: x['score'], reverse=True) |
|
|
|
return queries_result_list |
|
|
|
|
|
def http_get(url, path): |
|
""" |
|
Downloads a URL to a given path on disc |
|
""" |
|
if os.path.dirname(path) != '': |
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
|
|
req = requests.get(url, stream=True) |
|
if req.status_code != 200: |
|
print("Exception when trying to download {}. Response {}".format(url, req.status_code), file=sys.stderr) |
|
req.raise_for_status() |
|
return |
|
|
|
download_filepath = path+"_part" |
|
with open(download_filepath, "wb") as file_binary: |
|
content_length = req.headers.get('Content-Length') |
|
total = int(content_length) if content_length is not None else None |
|
progress = tqdm(unit="B", total=total, unit_scale=True) |
|
for chunk in req.iter_content(chunk_size=1024): |
|
if chunk: |
|
progress.update(len(chunk)) |
|
file_binary.write(chunk) |
|
|
|
os.rename(download_filepath, path) |
|
progress.close() |
|
|
|
|
|
def batch_to_device(batch, target_device: device): |
|
""" |
|
send a pytorch batch to a device (CPU/GPU) |
|
""" |
|
for key in batch: |
|
if isinstance(batch[key], Tensor): |
|
batch[key] = batch[key].to(target_device) |
|
return batch |
|
|
|
|
|
|
|
def fullname(o): |
|
""" |
|
Gives a full name (package_name.class_name) for a class / object in Python. Will |
|
be used to load the correct classes from JSON files |
|
""" |
|
|
|
module = o.__class__.__module__ |
|
if module is None or module == str.__class__.__module__: |
|
return o.__class__.__name__ |
|
else: |
|
return module + '.' + o.__class__.__name__ |
|
|
|
def import_from_string(dotted_path): |
|
""" |
|
Import a dotted module path and return the attribute/class designated by the |
|
last name in the path. Raise ImportError if the import failed. |
|
""" |
|
try: |
|
module_path, class_name = dotted_path.rsplit('.', 1) |
|
except ValueError: |
|
msg = "%s doesn't look like a module path" % dotted_path |
|
raise ImportError(msg) |
|
|
|
try: |
|
module = importlib.import_module(dotted_path) |
|
except: |
|
module = importlib.import_module(module_path) |
|
|
|
try: |
|
return getattr(module, class_name) |
|
except AttributeError: |
|
msg = 'Module "%s" does not define a "%s" attribute/class' % (module_path, class_name) |
|
raise ImportError(msg) |
|
|
|
|
|
def community_detection(embeddings, threshold=0.75, min_community_size=10, batch_size=1024): |
|
""" |
|
Function for Fast Community Detection |
|
Finds in the embeddings all communities, i.e. embeddings that are close (closer than threshold). |
|
Returns only communities that are larger than min_community_size. The communities are returned |
|
in decreasing order. The first element in each list is the central point in the community. |
|
""" |
|
if not isinstance(embeddings, torch.Tensor): |
|
embeddings = torch.tensor(embeddings) |
|
|
|
threshold = torch.tensor(threshold, device=embeddings.device) |
|
|
|
extracted_communities = [] |
|
|
|
|
|
min_community_size = min(min_community_size, len(embeddings)) |
|
sort_max_size = min(max(2 * min_community_size, 50), len(embeddings)) |
|
|
|
for start_idx in range(0, len(embeddings), batch_size): |
|
|
|
cos_scores = cos_sim(embeddings[start_idx:start_idx + batch_size], embeddings) |
|
|
|
|
|
top_k_values, _ = cos_scores.topk(k=min_community_size, largest=True) |
|
|
|
|
|
for i in range(len(top_k_values)): |
|
if top_k_values[i][-1] >= threshold: |
|
new_cluster = [] |
|
|
|
|
|
top_val_large, top_idx_large = cos_scores[i].topk(k=sort_max_size, largest=True) |
|
|
|
|
|
while top_val_large[-1] > threshold and sort_max_size < len(embeddings): |
|
sort_max_size = min(2 * sort_max_size, len(embeddings)) |
|
top_val_large, top_idx_large = cos_scores[i].topk(k=sort_max_size, largest=True) |
|
|
|
for idx, val in zip(top_idx_large.tolist(), top_val_large): |
|
if val < threshold: |
|
break |
|
|
|
new_cluster.append(idx) |
|
|
|
extracted_communities.append(new_cluster) |
|
|
|
del cos_scores |
|
|
|
|
|
extracted_communities = sorted(extracted_communities, key=lambda x: len(x), reverse=True) |
|
|
|
|
|
unique_communities = [] |
|
extracted_ids = set() |
|
|
|
for cluster_id, community in enumerate(extracted_communities): |
|
community = sorted(community) |
|
non_overlapped_community = [] |
|
for idx in community: |
|
if idx not in extracted_ids: |
|
non_overlapped_community.append(idx) |
|
|
|
if len(non_overlapped_community) >= min_community_size: |
|
unique_communities.append(non_overlapped_community) |
|
extracted_ids.update(non_overlapped_community) |
|
|
|
unique_communities = sorted(unique_communities, key=lambda x: len(x), reverse=True) |
|
|
|
return unique_communities |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def snapshot_download( |
|
repo_id: str, |
|
revision: Optional[str] = None, |
|
cache_dir: Union[str, Path, None] = None, |
|
library_name: Optional[str] = None, |
|
library_version: Optional[str] = None, |
|
user_agent: Union[Dict, str, None] = None, |
|
ignore_files: Optional[List[str]] = None, |
|
use_auth_token: Union[bool, str, None] = None |
|
) -> str: |
|
""" |
|
Method derived from huggingface_hub. |
|
Adds a new parameters 'ignore_files', which allows to ignore certain files / file-patterns |
|
""" |
|
if cache_dir is None: |
|
cache_dir = HUGGINGFACE_HUB_CACHE |
|
if isinstance(cache_dir, Path): |
|
cache_dir = str(cache_dir) |
|
|
|
_api = HfApi() |
|
|
|
token = None |
|
if isinstance(use_auth_token, str): |
|
token = use_auth_token |
|
elif use_auth_token: |
|
token = HfFolder.get_token() |
|
|
|
model_info = _api.model_info(repo_id=repo_id, revision=revision, token=token) |
|
|
|
storage_folder = os.path.join( |
|
cache_dir, repo_id.replace("/", "_") |
|
) |
|
|
|
all_files = model_info.siblings |
|
|
|
for idx, repofile in enumerate(all_files): |
|
if repofile.rfilename == "modules.json": |
|
del all_files[idx] |
|
all_files.append(repofile) |
|
break |
|
|
|
for model_file in all_files: |
|
if ignore_files is not None: |
|
skip_download = False |
|
for pattern in ignore_files: |
|
if fnmatch.fnmatch(model_file.rfilename, pattern): |
|
skip_download = True |
|
break |
|
|
|
if skip_download: |
|
continue |
|
|
|
url = hf_hub_url( |
|
repo_id, filename=model_file.rfilename, revision=model_info.sha |
|
) |
|
relative_filepath = os.path.join(*model_file.rfilename.split("/")) |
|
|
|
|
|
nested_dirname = os.path.dirname( |
|
os.path.join(storage_folder, relative_filepath) |
|
) |
|
os.makedirs(nested_dirname, exist_ok=True) |
|
|
|
cached_download_args = {'url': url, |
|
'cache_dir': storage_folder, |
|
'force_filename': relative_filepath, |
|
'library_name': library_name, |
|
'library_version': library_version, |
|
'user_agent': user_agent, |
|
'use_auth_token': use_auth_token} |
|
|
|
if version.parse(huggingface_hub.__version__) >= version.parse("0.8.1"): |
|
|
|
|
|
cached_download_args['legacy_cache_layout'] = True |
|
|
|
path = cached_download(**cached_download_args) |
|
|
|
if os.path.exists(path + ".lock"): |
|
os.remove(path + ".lock") |
|
|
|
return storage_folder |
|
|