Spaces:
Running
Running
Create app.py
Browse files
app.py
ADDED
@@ -0,0 +1,78 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from transformers import AutoTokenizer, AutoModel
|
2 |
+
import torch
|
3 |
+
import faiss
|
4 |
+
import gradio as gr
|
5 |
+
import json
|
6 |
+
|
7 |
+
class FaissTextRetrieval:
|
8 |
+
def __init__(self, model_name):
|
9 |
+
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
|
10 |
+
self.model = AutoModel.from_pretrained(model_name).eval()
|
11 |
+
self.device = "cpu"
|
12 |
+
|
13 |
+
self.all_index = faiss.read_index("data/all.index")
|
14 |
+
with open("data/all.json", "r") as f:
|
15 |
+
self.all_id2label = {int(k):v for k, v in json.load(f).items()}
|
16 |
+
|
17 |
+
self.general_index = faiss.read_index("data/general.index")
|
18 |
+
with open("data/general.json", "r") as f:
|
19 |
+
self.general_id2label = {int(k):v for k, v in json.load(f).items()}
|
20 |
+
|
21 |
+
self.character_index = faiss.read_index("data/character.index")
|
22 |
+
with open("data/character.json", "r") as f:
|
23 |
+
self.character_id2label = {int(k):v for k, v in json.load(f).items()}
|
24 |
+
|
25 |
+
def to(self, device, dtype=torch.float32):
|
26 |
+
self.device = device
|
27 |
+
self.dtype = dtype if "cuda" in device else torch.float32
|
28 |
+
self.model.to(device, dtype=dtype)
|
29 |
+
|
30 |
+
@torch.no_grad()
|
31 |
+
def average_pool(self, last_hidden_states, attention_mask):
|
32 |
+
last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0)
|
33 |
+
return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]
|
34 |
+
|
35 |
+
@torch.no_grad()
|
36 |
+
def get_embeddings(self, input_texts: list):
|
37 |
+
batch_dict = self.tokenizer(input_texts, max_length=512, padding=True, truncation=True, return_tensors='pt')
|
38 |
+
input_ids = batch_dict["input_ids"].to(self.device)
|
39 |
+
attention_mask = batch_dict["attention_mask"].to(self.device)
|
40 |
+
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
|
41 |
+
embeddings = self.average_pool(outputs.last_hidden_state, attention_mask)
|
42 |
+
embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
|
43 |
+
|
44 |
+
return embeddings
|
45 |
+
|
46 |
+
def search(self, query, top_k: int = 5, search_type = "all") -> list:
|
47 |
+
query = "query:" + query
|
48 |
+
query_embeddings = self.get_embeddings([query]).float().cpu().numpy()
|
49 |
+
|
50 |
+
if search_type == "all":
|
51 |
+
index = self.all_index
|
52 |
+
id2label = self.all_id2label
|
53 |
+
elif search_type == "general":
|
54 |
+
index = self.general_index
|
55 |
+
id2label = self.general_id2label
|
56 |
+
elif search_type == "character":
|
57 |
+
index = self.character_index
|
58 |
+
id2label = self.character_id2label
|
59 |
+
|
60 |
+
distances, indices = index.search(query_embeddings, top_k)
|
61 |
+
results = {id2label[idx]:distances[0][j] for j, idx in enumerate(indices[0])}
|
62 |
+
|
63 |
+
return results
|
64 |
+
|
65 |
+
def reset(self):
|
66 |
+
self.passage_texts = []
|
67 |
+
self.index = None
|
68 |
+
|
69 |
+
def main():
|
70 |
+
rag = FaissTextRetrieval("intfloat/multilingual-e5-large")
|
71 |
+
|
72 |
+
def search(query, search_type):
|
73 |
+
return rag.search(query, top_k=50, search_type=search_type)
|
74 |
+
|
75 |
+
gr.Interface(search, inputs=("textarea", gr.Radio(["all", "general", "character"])), outputs="label", title="Tag Search", description="Search for tags in the dataset.").launch()
|
76 |
+
|
77 |
+
if __name__ == "__main__":
|
78 |
+
main()
|