import re import gradio as gr from scipy.sparse import load_npz import torch from sklearn.metrics.pairwise import cosine_similarity from sklearn.preprocessing import normalize from transformers import BertTokenizer, BertModel import numpy as np from datasets import load_dataset from gensim.models import KeyedVectors import plotly.graph_objects as go from sklearn.decomposition import PCA from transformers import AutoTokenizer, AutoModel class ArxivSearch: def __init__(self, dataset, embedding="bert"): self.dataset = dataset self.embedding = embedding self.query = None self.documents = [] self.titles = [] self.raw_texts = [] self.arxiv_ids = [] self.last_results = [] self.query_encoding = None # model selection self.embedding_dropdown = gr.Dropdown( choices=["tfidf", "word2vec", "bert", "scibert"], value="bert", label="Model" ) self.plot_button = gr.Button("Show 3D Plot") # Gradio blocks for UI elements with gr.Blocks() as self.iface: gr.Markdown("# arXiv Search Engine") gr.Markdown("Search arXiv papers by keyword and embedding model.") self.plot_output = gr.Plot() with gr.Row(): self.query_box = gr.Textbox(lines=1, placeholder="Enter your search query", label="Query") self.embedding_dropdown.render() self.plot_button.render() with gr.Column(): self.search_button = gr.Button("Search") self.output_md = gr.Markdown() self.query_box.submit( self.search_function, inputs=[self.query_box, self.embedding_dropdown], outputs=self.output_md ) self.embedding_dropdown.change( self.search_function, inputs=[self.query_box, self.embedding_dropdown], outputs=self.output_md ) self.plot_button.click( self.plot_3d_embeddings, inputs=[], outputs=self.plot_output ) self.search_button.click( self.search_function, inputs=[self.query_box, self.embedding_dropdown], outputs=self.output_md ) self.load_data(dataset) self.load_model('tfidf') self.load_model('word2vec') self.load_model('bert') self.load_model('scibert') self.iface.launch() def load_data(self, dataset): train_data = dataset["train"] for item in train_data.select(range(len(train_data))): text = item["text"] if not text or len(text.strip()) < 10: continue lines = text.splitlines() title_lines = [] found_arxiv = False arxiv_id = None for line in lines: line_strip = line.strip() if not found_arxiv and line_strip.lower().startswith("arxiv:"): found_arxiv = True match = re.search(r'arxiv:\d{4}\.\d{4,5}v\d', line_strip, flags=re.IGNORECASE) if match: arxiv_id = match.group(0).lower() elif not found_arxiv: title_lines.append(line_strip) else: if line_strip.lower().startswith("abstract"): break title = " ".join(title_lines).strip() self.raw_texts.append(text.strip()) self.titles.append(title) self.documents.append(text.strip()) self.arxiv_ids.append(arxiv_id) def plot_3d_embeddings(self): # Example: plot random points, replace with your embeddings pca = PCA(n_components=3) results_indices = [i[0] for i in self.last_results] if self.embedding == "tfidf": all_indices = list(set(results_indices) | set(range(min(5000, self.tfidf_matrix.shape[0])))) all_data = self.tfidf_matrix[all_indices].toarray() pca.fit(all_data) reduced_data = pca.transform(self.tfidf_matrix[:5000].toarray()) reduced_results_points = pca.transform(self.tfidf_matrix[results_indices].toarray()) if len(results_indices) > 0 else np.empty((0, 3)) elif self.embedding == "word2vec": all_indices = list(set(results_indices) | set(range(min(5000, self.word2vec_embeddings.shape[0])))) all_data = self.word2vec_embeddings[all_indices] pca.fit(all_data) reduced_data = pca.transform(self.word2vec_embeddings[:5000]) reduced_results_points = pca.transform(self.word2vec_embeddings[results_indices]) if len(results_indices) > 0 else np.empty((0, 3)) query_point = pca.transform(self.query_encoding) if self.query_encoding is not None and self.query_encoding.shape[0] > 0 else np.empty((0, 3)) elif self.embedding == "bert": all_indices = list(set(results_indices) | set(range(min(5000, self.bert_embeddings.shape[0])))) all_data = self.bert_embeddings[all_indices] pca.fit(all_data) reduced_data = pca.transform(self.bert_embeddings[:5000]) reduced_results_points = pca.transform(self.bert_embeddings[results_indices]) if len(results_indices) > 0 else np.empty((0, 3)) query_point = pca.transform(self.query_encoding) if self.query_encoding is not None and self.query_encoding.shape[0] > 0 else np.empty((0, 3)) elif self.embedding == "scibert": all_indices = list(set(results_indices) | set(range(min(5000, self.scibert_embeddings.shape[0])))) all_data = self.scibert_embeddings[all_indices] pca.fit(all_data) reduced_data = pca.transform(self.scibert_embeddings[:5000]) reduced_results_points = pca.transform(self.scibert_embeddings[results_indices]) if len(results_indices) > 0 else np.empty((0, 3)) query_point = pca.transform(self.query_encoding) if self.query_encoding is not None and self.query_encoding.shape[0] > 0 else np.empty((0, 3)) else: raise ValueError(f"Unsupported embedding type: {self.embedding}") trace = go.Scatter3d( x=reduced_data[:, 0], y=reduced_data[:, 1], z=reduced_data[:, 2], mode='markers', marker=dict(size=3.5, color="#ffffff", opacity=0.2), name='All Documents' ) layout = go.Layout( margin=dict(l=0, r=0, b=0, t=0), scene=dict( xaxis_title='X', yaxis_title='Y', zaxis_title='Z', xaxis=dict(backgroundcolor='black', color='white', gridcolor='gray', zerolinecolor='gray'), yaxis=dict(backgroundcolor='black', color='white', gridcolor='gray', zerolinecolor='gray'), zaxis=dict(backgroundcolor='black', color='white', gridcolor='gray', zerolinecolor='gray'), ), paper_bgcolor='black', # Outside the plotting area plot_bgcolor='black', # Plotting area font=dict(color='white') # Axis and legend text ) if len(reduced_results_points) > 0: results_trace = go.Scatter3d( x=reduced_results_points[:, 0], y=reduced_results_points[:, 1], z=reduced_results_points[:, 2], mode='markers', marker=dict(size=3.5, color='orange', opacity=0.75), name='Results' ) if not self.embedding == "tfidf" and self.query_encoding is not None and self.query_encoding.shape[0] > 0: query_trace = go.Scatter3d( x=query_point[:, 0], y=query_point[:, 1], z=query_point[:, 2], mode='markers', marker=dict(size=5, color='red', opacity=0.8), name='Query' ) fig = go.Figure(data=[trace, results_trace, query_trace], layout=layout) else: fig = go.Figure(data=[trace, results_trace], layout=layout) else: fig = go.Figure(data=[trace], layout=layout) return fig def keyword_match_ranking(self, query, top_n=10): query_terms = query.lower().split() query_indices = [i for i, term in enumerate(self.feature_names) if term in query_terms] if not query_indices: return [] scores = [] for doc_idx in range(self.tfidf_matrix.shape[0]): doc_vector = self.tfidf_matrix[doc_idx] doc_score = sum(doc_vector[0, i] for i in query_indices) if doc_score > 0: scores.append((doc_idx, doc_score)) scores.sort(key=lambda x: x[1], reverse=True) return scores[:top_n] def word2vec_search(self, query, top_n=10): tokens = [word for word in query.split() if word in self.wv_model.key_to_index] if not tokens: return [] vectors = np.array([self.wv_model[word] for word in tokens]) query_vec = normalize(np.mean(vectors, axis=0).reshape(1, -1)) self.query_encoding = query_vec sims = cosine_similarity(query_vec, self.word2vec_embeddings).flatten() top_indices = sims.argsort()[::-1][:top_n] return [(i, sims[i]) for i in top_indices] def bert_search(self, query, top_n=10): with torch.no_grad(): inputs = self.tokenizer(query, return_tensors="pt", truncation=True, padding=True) outputs = self.model(**inputs) query_vec = normalize(outputs.last_hidden_state[:, 0, :].numpy()) self.query_encoding = query_vec sims = cosine_similarity(query_vec, self.bert_embeddings).flatten() top_indices = sims.argsort()[::-1][:top_n] return [(i, sims[i]) for i in top_indices] def scibert_search(self, query, top_n=10): with torch.no_grad(): inputs = self.sci_tokenizer(query, return_tensors="pt", truncation=True, padding=True, max_length=512) outputs = self.sci_model(**inputs) query_vec = normalize(outputs.last_hidden_state[:, 0, :].numpy()) self.query_encoding = query_vec sims = cosine_similarity(query_vec, self.scibert_embeddings).flatten() top_indices = sims.argsort()[::-1][:top_n] return [(i, sims[i]) for i in top_indices] def bert_search_2(self, query, top_n=10): with torch.no_grad(): inputs = self.tokenizer(query, return_tensors="pt", truncation=True, padding=True) outputs = self.model(**inputs) token_embeddings = outputs.last_hidden_state attention_mask = inputs['attention_mask'] mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() sentence_embeddings = torch.sum(token_embeddings * mask_expanded, dim=1) sum_mask = torch.clamp(mask_expanded.sum(1), min=1e-9) query_vec = sentence_embeddings / sum_mask self.query_encoding = query_vec sims = cosine_similarity(query_vec, self.bert_embeddings).flatten() top_indices = sims.argsort()[::-1][:top_n] return [(i, sims[i]) for i in top_indices] def load_model(self, embedding): self.embedding = embedding if self.embedding == "tfidf": self.tfidf_matrix = load_npz("TF-IDF embeddings/tfidf_matrix_train.npz") with open("TF-IDF embeddings/feature_names.txt", "r") as f: self.feature_names = [line.strip() for line in f.readlines()] elif self.embedding == "word2vec": # Use trimmed model here self.word2vec_embeddings = np.load("Word2Vec embeddings/word2vec_embedding.npz")["word2vec_embedding"] self.wv_model = KeyedVectors.load("models/word2vec-trimmed.model") elif self.embedding == "bert": self.bert_embeddings = np.load("BERT embeddings/bert_embedding.npz")["bert_embedding"] self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') self.model = BertModel.from_pretrained('bert-base-uncased') self.model.eval() elif self.embedding == "scibert": self.scibert_embeddings = np.load("SciBERT_embeddings/scibert_embedding.npz")["bert_embedding"] self.sci_tokenizer = AutoTokenizer.from_pretrained('allenai/scibert_scivocab_uncased') self.sci_model = AutoModel.from_pretrained('allenai/scibert_scivocab_uncased') self.sci_model.eval() else: raise ValueError(f"Unsupported embedding type: {self.embedding}") def snippet_before_abstract(self, text): pattern = re.compile(r'a\s*b\s*s\s*t\s*r\s*a\s*c\s*t|i\s*n\s*t\s*r\s*o\s*d\s*u\s*c\s*t\s*i\s*o\s*n', re.IGNORECASE) match = pattern.search(text) if match: return text[:match.start()].strip() else: return text[:100].strip() def search_function(self, query, embedding): self.embedding = embedding query = query.encode().decode('unicode_escape') # Interpret escape sequences # Load or switch embedding model here if needed if self.embedding == "tfidf": results = self.keyword_match_ranking(query) elif self.embedding == "word2vec": results = self.word2vec_search(query) elif self.embedding == "bert": results = self.bert_search(query) elif self.embedding == "scibert": results = self.scibert_search(query) else: return "No results found." if not results: self.last_results = [] return "No results found." if results: self.last_results = results output = "" display_rank = 1 for idx, score in results: if not self.arxiv_ids[idx]: continue link = f"https://arxiv.org/abs/{self.arxiv_ids[idx].replace('arxiv:', '')}" snippet = self.snippet_before_abstract(self.documents[idx]).replace('\n', '
') output += f"### Document {display_rank}\n" output += f"[arXiv Link]({link})\n\n" output += f"
{snippet}
\n\n---\n" display_rank += 1 return output if __name__ == "__main__": dataset = load_dataset("ccdv/arxiv-classification", "no_ref") # replace with your dataset search_engine = ArxivSearch(dataset) search_engine.iface.launch()