import requests |
from torch import Tensor, device |
from typing import List, Callable |
from tqdm.autonotebook import tqdm |
import sys |
import importlib |
import os |
import torch |
import numpy as np |
import queue |
import logging |
from typing import Dict, Optional, Union |
from pathlib import Path |
import huggingface_hub |
from huggingface_hub.constants import HUGGINGFACE_HUB_CACHE |
from huggingface_hub import HfApi, hf_hub_url, cached_download, HfFolder |
import fnmatch |
from packaging import version |
import heapq |
logger = logging.getLogger(__name__) |
def pytorch_cos_sim(a: Tensor, b: Tensor): |
""" |
Computes the cosine similarity cos_sim(a[i], b[j]) for all i and j. |
:return: Matrix with res[i][j] = cos_sim(a[i], b[j]) |
""" |
return cos_sim(a, b) |
def cos_sim(a: Tensor, b: Tensor): |
""" |
Computes the cosine similarity cos_sim(a[i], b[j]) for all i and j. |
:return: Matrix with res[i][j] = cos_sim(a[i], b[j]) |
""" |
if not isinstance(a, torch.Tensor): |
a = torch.tensor(a) |
if not isinstance(b, torch.Tensor): |
b = torch.tensor(b) |
if len(a.shape) == 1: |
a = a.unsqueeze(0) |
if len(b.shape) == 1: |
b = b.unsqueeze(0) |
a_norm = torch.nn.functional.normalize(a, p=2, dim=1) |
b_norm = torch.nn.functional.normalize(b, p=2, dim=1) |
return torch.mm(a_norm, b_norm.transpose(0, 1)) |
def dot_score(a: Tensor, b: Tensor): |
""" |
Computes the dot-product dot_prod(a[i], b[j]) for all i and j. |
:return: Matrix with res[i][j] = dot_prod(a[i], b[j]) |
""" |
if not isinstance(a, torch.Tensor): |
a = torch.tensor(a) |
if not isinstance(b, torch.Tensor): |
b = torch.tensor(b) |
if len(a.shape) == 1: |
a = a.unsqueeze(0) |
if len(b.shape) == 1: |
b = b.unsqueeze(0) |
return torch.mm(a, b.transpose(0, 1)) |
def pairwise_dot_score(a: Tensor, b: Tensor): |
""" |
Computes the pairwise dot-product dot_prod(a[i], b[i]) |
:return: Vector with res[i] = dot_prod(a[i], b[i]) |
""" |
if not isinstance(a, torch.Tensor): |
a = torch.tensor(a) |
if not isinstance(b, torch.Tensor): |
b = torch.tensor(b) |
return (a * b).sum(dim=-1) |
def pairwise_cos_sim(a: Tensor, b: Tensor): |
""" |
Computes the pairwise cossim cos_sim(a[i], b[i]) |
:return: Vector with res[i] = cos_sim(a[i], b[i]) |
""" |
if not isinstance(a, torch.Tensor): |
a = torch.tensor(a) |
if not isinstance(b, torch.Tensor): |
b = torch.tensor(b) |
return pairwise_dot_score(normalize_embeddings(a), normalize_embeddings(b)) |
def normalize_embeddings(embeddings: Tensor): |
""" |
Normalizes the embeddings matrix, so that each sentence embedding has unit length |
""" |
return torch.nn.functional.normalize(embeddings, p=2, dim=1) |
def paraphrase_mining(model, |
sentences: List[str], |
show_progress_bar: bool = False, |
batch_size:int = 32, |
*args, |
**kwargs): |
""" |
Given a list of sentences / texts, this function performs paraphrase mining. It compares all sentences against all |
other sentences and returns a list with the pairs that have the highest cosine similarity score. |
:param model: SentenceTransformer model for embedding computation |
:param sentences: A list of strings (texts or sentences) |
:param show_progress_bar: Plotting of a progress bar |
:param batch_size: Number of texts that are encoded simultaneously by the model |
:param query_chunk_size: Search for most similar pairs for #query_chunk_size at the same time. Decrease, to lower memory footprint (increases run-time). |
:param corpus_chunk_size: Compare a sentence simultaneously against #corpus_chunk_size other sentences. Decrease, to lower memory footprint (increases run-time). |
:param max_pairs: Maximal number of text pairs returned. |
:param top_k: For each sentence, we retrieve up to top_k other sentences |
:param score_function: Function for computing scores. By default, cosine similarity. |
:return: Returns a list of triplets with the format [score, id1, id2] |
""" |
embeddings = model.encode(sentences, show_progress_bar=show_progress_bar, batch_size=batch_size, convert_to_tensor=True) |
return paraphrase_mining_embeddings(embeddings, *args, **kwargs) |
def paraphrase_mining_embeddings(embeddings: Tensor, |
query_chunk_size: int = 5000, |
corpus_chunk_size: int = 100000, |
max_pairs: int = 500000, |
top_k: int = 100, |
score_function: Callable[[Tensor, Tensor], Tensor] = cos_sim): |
""" |
Given a list of sentences / texts, this function performs paraphrase mining. It compares all sentences against all |
other sentences and returns a list with the pairs that have the highest cosine similarity score. |
:param embeddings: A tensor with the embeddings |
:param query_chunk_size: Search for most similar pairs for #query_chunk_size at the same time. Decrease, to lower memory footprint (increases run-time). |
:param corpus_chunk_size: Compare a sentence simultaneously against #corpus_chunk_size other sentences. Decrease, to lower memory footprint (increases run-time). |
:param max_pairs: Maximal number of text pairs returned. |
:param top_k: For each sentence, we retrieve up to top_k other sentences |
:param score_function: Function for computing scores. By default, cosine similarity. |
:return: Returns a list of triplets with the format [score, id1, id2] |
""" |
top_k += 1 |
pairs = queue.PriorityQueue() |
min_score = -1 |
num_added = 0 |
for corpus_start_idx in range(0, len(embeddings), corpus_chunk_size): |
for query_start_idx in range(0, len(embeddings), query_chunk_size): |
scores = score_function(embeddings[query_start_idx:query_start_idx+query_chunk_size], embeddings[corpus_start_idx:corpus_start_idx+corpus_chunk_size]) |
scores_top_k_values, scores_top_k_idx = torch.topk(scores, min(top_k, len(scores[0])), dim=1, largest=True, sorted=False) |
scores_top_k_values = scores_top_k_values.cpu().tolist() |
scores_top_k_idx = scores_top_k_idx.cpu().tolist() |
for query_itr in range(len(scores)): |
for top_k_idx, corpus_itr in enumerate(scores_top_k_idx[query_itr]): |
i = query_start_idx + query_itr |
j = corpus_start_idx + corpus_itr |
if i != j and scores_top_k_values[query_itr][top_k_idx] > min_score: |
pairs.put((scores_top_k_values[query_itr][top_k_idx], i, j)) |
num_added += 1 |
if num_added >= max_pairs: |
entry = pairs.get() |
min_score = entry[0] |
added_pairs = set() |
pairs_list = [] |
while not pairs.empty(): |
score, i, j = pairs.get() |
sorted_i, sorted_j = sorted([i, j]) |
if sorted_i != sorted_j and (sorted_i, sorted_j) not in added_pairs: |
added_pairs.add((sorted_i, sorted_j)) |
pairs_list.append([score, i, j]) |
pairs_list = sorted(pairs_list, key=lambda x: x[0], reverse=True) |
return pairs_list |
def information_retrieval(*args, **kwargs): |
"""This function is deprecated. Use semantic_search instead""" |
return semantic_search(*args, **kwargs) |
def semantic_search(query_embeddings: Tensor, |
corpus_embeddings: Tensor, |
query_chunk_size: int = 100, |
corpus_chunk_size: int = 500000, |
top_k: int = 10, |
score_function: Callable[[Tensor, Tensor], Tensor] = cos_sim): |
""" |
This function performs a cosine similarity search between a list of query embeddings and a list of corpus embeddings. |
It can be used for Information Retrieval / Semantic Search for corpora up to about 1 Million entries. |
:param query_embeddings: A 2 dimensional tensor with the query embeddings. |
:param corpus_embeddings: A 2 dimensional tensor with the corpus embeddings. |
:param query_chunk_size: Process 100 queries simultaneously. Increasing that value increases the speed, but requires more memory. |
:param corpus_chunk_size: Scans the corpus 100k entries at a time. Increasing that value increases the speed, but requires more memory. |
:param top_k: Retrieve top k matching entries. |
:param score_function: Function for computing scores. By default, cosine similarity. |
:return: Returns a list with one entry for each query. Each entry is a list of dictionaries with the keys 'corpus_id' and 'score', sorted by decreasing cosine similarity scores. |
""" |
if isinstance(query_embeddings, (np.ndarray, np.generic)): |
query_embeddings = torch.from_numpy(query_embeddings) |
elif isinstance(query_embeddings, list): |
query_embeddings = torch.stack(query_embeddings) |
if len(query_embeddings.shape) == 1: |
query_embeddings = query_embeddings.unsqueeze(0) |
if isinstance(corpus_embeddings, (np.ndarray, np.generic)): |
corpus_embeddings = torch.from_numpy(corpus_embeddings) |
elif isinstance(corpus_embeddings, list): |
corpus_embeddings = torch.stack(corpus_embeddings) |
if corpus_embeddings.device != query_embeddings.device: |
query_embeddings = query_embeddings.to(corpus_embeddings.device) |
queries_result_list = [[] for _ in range(len(query_embeddings))] |
for query_start_idx in range(0, len(query_embeddings), query_chunk_size): |
for corpus_start_idx in range(0, len(corpus_embeddings), corpus_chunk_size): |
cos_scores = score_function(query_embeddings[query_start_idx:query_start_idx+query_chunk_size], corpus_embeddings[corpus_start_idx:corpus_start_idx+corpus_chunk_size]) |
cos_scores_top_k_values, cos_scores_top_k_idx = torch.topk(cos_scores, min(top_k, len(cos_scores[0])), dim=1, largest=True, sorted=False) |
cos_scores_top_k_values = cos_scores_top_k_values.cpu().tolist() |
cos_scores_top_k_idx = cos_scores_top_k_idx.cpu().tolist() |
for query_itr in range(len(cos_scores)): |
for sub_corpus_id, score in zip(cos_scores_top_k_idx[query_itr], cos_scores_top_k_values[query_itr]): |
corpus_id = corpus_start_idx + sub_corpus_id |
query_id = query_start_idx + query_itr |
if len(queries_result_list[query_id]) < top_k: |
heapq.heappush(queries_result_list[query_id], (score, corpus_id)) |
else: |
heapq.heappushpop(queries_result_list[query_id], (score, corpus_id)) |
for query_id in range(len(queries_result_list)): |
for doc_itr in range(len(queries_result_list[query_id])): |
score, corpus_id = queries_result_list[query_id][doc_itr] |
queries_result_list[query_id][doc_itr] = {'corpus_id': corpus_id, 'score': score} |
queries_result_list[query_id] = sorted(queries_result_list[query_id], key=lambda x: x['score'], reverse=True) |
return queries_result_list |
def http_get(url, path): |
""" |
Downloads a URL to a given path on disc |
""" |
if os.path.dirname(path) != '': |
os.makedirs(os.path.dirname(path), exist_ok=True) |
req = requests.get(url, stream=True) |
if req.status_code != 200: |
print("Exception when trying to download {}. Response {}".format(url, req.status_code), file=sys.stderr) |
req.raise_for_status() |
return |
download_filepath = path+"_part" |
with open(download_filepath, "wb") as file_binary: |
content_length = req.headers.get('Content-Length') |
total = int(content_length) if content_length is not None else None |
progress = tqdm(unit="B", total=total, unit_scale=True) |
for chunk in req.iter_content(chunk_size=1024): |
if chunk: |
progress.update(len(chunk)) |
file_binary.write(chunk) |
os.rename(download_filepath, path) |
progress.close() |
def batch_to_device(batch, target_device: device): |
""" |
send a pytorch batch to a device (CPU/GPU) |
""" |
for key in batch: |
if isinstance(batch[key], Tensor): |
batch[key] = batch[key].to(target_device) |
return batch |
def fullname(o): |
""" |
Gives a full name (package_name.class_name) for a class / object in Python. Will |
be used to load the correct classes from JSON files |
""" |
module = o.__class__.__module__ |
if module is None or module == str.__class__.__module__: |
return o.__class__.__name__ |
else: |
return module + '.' + o.__class__.__name__ |
def import_from_string(dotted_path): |
""" |
Import a dotted module path and return the attribute/class designated by the |
last name in the path. Raise ImportError if the import failed. |
""" |
try: |
module_path, class_name = dotted_path.rsplit('.', 1) |
except ValueError: |
msg = "%s doesn't look like a module path" % dotted_path |
raise ImportError(msg) |
try: |
module = importlib.import_module(dotted_path) |
except: |
module = importlib.import_module(module_path) |
try: |
return getattr(module, class_name) |
except AttributeError: |
msg = 'Module "%s" does not define a "%s" attribute/class' % (module_path, class_name) |
raise ImportError(msg) |
def community_detection(embeddings, threshold=0.75, min_community_size=10, batch_size=1024): |
""" |
Function for Fast Community Detection |
Finds in the embeddings all communities, i.e. embeddings that are close (closer than threshold). |
Returns only communities that are larger than min_community_size. The communities are returned |
in decreasing order. The first element in each list is the central point in the community. |
""" |
if not isinstance(embeddings, torch.Tensor): |
embeddings = torch.tensor(embeddings) |
threshold = torch.tensor(threshold, device=embeddings.device) |
extracted_communities = [] |
min_community_size = min(min_community_size, len(embeddings)) |
sort_max_size = min(max(2 * min_community_size, 50), len(embeddings)) |
for start_idx in range(0, len(embeddings), batch_size): |
cos_scores = cos_sim(embeddings[start_idx:start_idx + batch_size], embeddings) |
top_k_values, _ = cos_scores.topk(k=min_community_size, largest=True) |
for i in range(len(top_k_values)): |
if top_k_values[i][-1] >= threshold: |
new_cluster = [] |
top_val_large, top_idx_large = cos_scores[i].topk(k=sort_max_size, largest=True) |
while top_val_large[-1] > threshold and sort_max_size < len(embeddings): |
sort_max_size = min(2 * sort_max_size, len(embeddings)) |
top_val_large, top_idx_large = cos_scores[i].topk(k=sort_max_size, largest=True) |
for idx, val in zip(top_idx_large.tolist(), top_val_large): |
if val < threshold: |
break |
new_cluster.append(idx) |
extracted_communities.append(new_cluster) |
del cos_scores |
extracted_communities = sorted(extracted_communities, key=lambda x: len(x), reverse=True) |
unique_communities = [] |
extracted_ids = set() |
for cluster_id, community in enumerate(extracted_communities): |
community = sorted(community) |
non_overlapped_community = [] |
for idx in community: |
if idx not in extracted_ids: |
non_overlapped_community.append(idx) |
if len(non_overlapped_community) >= min_community_size: |
unique_communities.append(non_overlapped_community) |
extracted_ids.update(non_overlapped_community) |
unique_communities = sorted(unique_communities, key=lambda x: len(x), reverse=True) |
return unique_communities |
def snapshot_download( |
repo_id: str, |
revision: Optional[str] = None, |
cache_dir: Union[str, Path, None] = None, |
library_name: Optional[str] = None, |
library_version: Optional[str] = None, |
user_agent: Union[Dict, str, None] = None, |
ignore_files: Optional[List[str]] = None, |
use_auth_token: Union[bool, str, None] = None |
) -> str: |
""" |
Method derived from huggingface_hub. |
Adds a new parameters 'ignore_files', which allows to ignore certain files / file-patterns |
""" |
if cache_dir is None: |
if isinstance(cache_dir, Path): |
cache_dir = str(cache_dir) |
_api = HfApi() |
token = None |
if isinstance(use_auth_token, str): |
token = use_auth_token |
elif use_auth_token: |
token = HfFolder.get_token() |
model_info = _api.model_info(repo_id=repo_id, revision=revision, token=token) |
storage_folder = os.path.join( |
cache_dir, repo_id.replace("/", "_") |
) |
all_files = model_info.siblings |
for idx, repofile in enumerate(all_files): |
if repofile.rfilename == "modules.json": |
del all_files[idx] |
all_files.append(repofile) |
break |
for model_file in all_files: |
if ignore_files is not None: |
skip_download = False |
for pattern in ignore_files: |
if fnmatch.fnmatch(model_file.rfilename, pattern): |
skip_download = True |
break |
if skip_download: |
continue |
url = hf_hub_url( |
repo_id, filename=model_file.rfilename, revision=model_info.sha |
) |
relative_filepath = os.path.join(*model_file.rfilename.split("/")) |
nested_dirname = os.path.dirname( |
os.path.join(storage_folder, relative_filepath) |
) |
os.makedirs(nested_dirname, exist_ok=True) |
cached_download_args = {'url': url, |
'cache_dir': storage_folder, |
'force_filename': relative_filepath, |
'library_name': library_name, |
'library_version': library_version, |
'user_agent': user_agent, |
'use_auth_token': use_auth_token} |
if version.parse(huggingface_hub.__version__) >= version.parse("0.8.1"): |
cached_download_args['legacy_cache_layout'] = True |
path = cached_download(**cached_download_args) |
if os.path.exists(path + ".lock"): |
os.remove(path + ".lock") |
return storage_folder |