Spaces:
Sleeping
Sleeping
import time | |
from datetime import datetime | |
import os, warnings, nltk, json, re | |
import numpy as np | |
from nltk.stem import WordNetLemmatizer | |
from dotenv import load_dotenv | |
from sklearn.preprocessing import MinMaxScaler | |
os.environ['CURL_CA_BUNDLE'] = "" | |
warnings.filterwarnings('ignore') | |
nltk.download('wordnet') | |
load_dotenv() | |
from datasets import load_dataset | |
import bm25s | |
from bm25s.hf import BM25HF | |
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import FileResponse | |
from fastapi.staticfiles import StaticFiles | |
from schemas import * | |
from bs4 import BeautifulSoup | |
import requests | |
lemmatizer = WordNetLemmatizer() | |
spec_metadatas = load_dataset("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF_TOKEN"]) | |
spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF_TOKEN"]) | |
bm25_index = BM25HF.load_from_hub("OrganizedProgrammers/ETSIBM25IndexSingle", load_corpus=True, token=os.environ["HF_TOKEN"]) | |
spec_metadatas = spec_metadatas["train"].to_list() | |
spec_contents = spec_contents["train"].to_list() | |
def get_document(spec_id: str, spec_title: Optional[str]): | |
text = [f"{spec_id} - {spec_title}" if spec_title else f"{spec_id}"] | |
for section in spec_contents: | |
if spec_id == section["doc_id"]: | |
text.extend([section['section'], section['content']]) | |
return text | |
app = FastAPI(title="3GPP Document Finder Back-End", description="Backend for 3GPPDocFinder - Searching technical documents & specifications from 3GPP FTP server") | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
class DocFinder: | |
def __init__(self): | |
self.main_ftp_url = "https://docbox.etsi.org/SET" | |
self.session = requests.Session() | |
req = self.session.post("https://portal.etsi.org/ETSIPages/LoginEOL.ashx", verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}, data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")})) | |
print(req.content, req.status_code) | |
def get_workgroup(self, doc: str): | |
main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None | |
if main_tsg is None: | |
return None, None, None | |
regex = re.search(r'\(([^)]+)\)', doc) | |
workgroup = "20" + regex.group(1) | |
return main_tsg, workgroup, doc | |
def find_workgroup_url(self, main_tsg, workgroup): | |
response = self.session.get(f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS", verify=False) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for item in soup.find_all("tr"): | |
link = item.find("a") | |
if link and workgroup in link.get_text(): | |
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}" | |
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}" | |
def get_docs_from_url(self, url): | |
try: | |
response = self.session.get(url, verify=False, timeout=15) | |
soup = BeautifulSoup(response.text, "html.parser") | |
return [item.get_text() for item in soup.select("tr td a")] | |
except Exception as e: | |
print(f"Error accessing {url}: {e}") | |
return [] | |
def search_document(self, doc_id: str): | |
original = doc_id | |
main_tsg, workgroup, doc = self.get_workgroup(doc_id) | |
urls = [] | |
if main_tsg: | |
wg_url = self.find_workgroup_url(main_tsg, workgroup) | |
print(wg_url) | |
if wg_url: | |
files = self.get_docs_from_url(wg_url) | |
print(files) | |
for f in files: | |
if doc in f.lower() or original in f: | |
print(f) | |
doc_url = f"{wg_url}/{f}" | |
urls.append(doc_url) | |
return urls[0] if len(urls) == 1 else urls[-2] if len(urls) > 1 else f"Document {doc_id} not found" | |
class SpecFinder: | |
def __init__(self): | |
self.main_url = "https://www.etsi.org/deliver/etsi_ts" | |
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"} | |
def get_spec_path(self, doc_id: str): | |
if "-" in doc_id: | |
position, part = doc_id.split("-") | |
else: | |
position, part = doc_id, None | |
position = position.replace(" ", "") | |
if part: | |
if len(part) == 1: | |
part = "0" + part | |
spec_folder = position + part if part is not None else position | |
return f"{int(position) - (int(position)%100)}_{int(position) - (int(position)%100) + 99}/{spec_folder}" | |
def get_docs_from_url(self, url): | |
try: | |
response = requests.get(url, verify=False, timeout=15) | |
soup = BeautifulSoup(response.text, "html.parser") | |
docs = [item.get_text() for item in soup.find_all("a")][1:] | |
return docs | |
except Exception as e: | |
print(f"Error accessing {url}: {e}") | |
return [] | |
def search_document(self, doc_id: str): | |
# Example : 103 666[-2 opt] | |
original = doc_id | |
url = f"{self.main_url}/{self.get_spec_path(original)}/" | |
print(url) | |
releases = self.get_docs_from_url(url) | |
files = self.get_docs_from_url(url + releases[-1]) | |
for f in files: | |
if f.endswith(".pdf"): | |
return url + releases[-1] + "/" + f | |
return f"Specification {doc_id} not found" | |
async def main_menu(): | |
return FileResponse(os.path.join("templates", "index.html")) | |
finder_doc = DocFinder() | |
finder_spec = SpecFinder() | |
def find_document(request: DocRequest): | |
start_time = time.time() | |
finder = finder_spec if request.doc_id[0].isnumeric() else finder_doc | |
print(finder) | |
result = finder.search_document(request.doc_id) | |
if "not found" not in result and "Could not" not in result and "Unable" not in result: | |
return DocResponse( | |
doc_id=request.doc_id, | |
url=result, | |
search_time=time.time() - start_time | |
) if not isinstance(result, list) else result | |
else: | |
raise HTTPException(status_code=404, detail=result) | |
def find_documents_batch(request: BatchDocRequest): | |
start_time = time.time() | |
results = {} | |
missing = [] | |
for doc_id in request.doc_ids: | |
finder = finder_doc if doc_id[0].isalpha() else finder_spec | |
result = finder.search_document(doc_id) | |
if "not found" not in result and "Could not" not in result and "Unable" not in result: | |
results[doc_id] = result | |
else: | |
missing.append(doc_id) | |
return BatchDocResponse( | |
results=results, | |
missing=missing, | |
search_time=time.time() - start_time | |
) | |
def search_specification_by_keywords(request: KeywordRequest): | |
start_time = time.time() | |
boolSensitiveCase = request.case_sensitive | |
search_mode = request.search_mode | |
spec_type = request.spec_type | |
keywords = [string.lower() if boolSensitiveCase else string for string in request.keywords.split(",")] | |
print(keywords) | |
unique_specs = set() | |
results = [] | |
if keywords == [""] and search_mode == "deep": | |
raise HTTPException(status_code=400, detail="You must enter keywords in deep search mode !") | |
for spec in spec_metadatas: | |
valid = False | |
if spec['id'] in unique_specs: continue | |
if spec.get('type', None) is None or (spec_type is not None and spec["type"] != spec_type): continue | |
if search_mode == "deep": | |
contents = [] | |
doc = get_document(spec["id"], spec["title"]) | |
docValid = len(doc) > 1 | |
if request.mode == "and": | |
string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}" | |
if all(keyword in (string.lower() if boolSensitiveCase else string) for keyword in keywords): | |
valid = True | |
if search_mode == "deep": | |
if docValid: | |
for x in range(1, len(doc) - 1, 2): | |
section_title = doc[x] | |
section_content = doc[x+1] | |
if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower(): | |
if all(keyword in (section_content.lower() if boolSensitiveCase else section_content) for keyword in keywords): | |
valid = True | |
contents.append({section_title: section_content}) | |
elif request.mode == "or": | |
string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}" | |
if any(keyword in (string.lower() if boolSensitiveCase else string) for keyword in keywords): | |
valid = True | |
if search_mode == "deep": | |
if docValid: | |
for x in range(1, len(doc) - 1, 2): | |
section_title = doc[x] | |
section_content = doc[x+1] | |
if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower(): | |
if any(keyword in (section_content.lower() if boolSensitiveCase else section_content) for keyword in keywords): | |
valid = True | |
contents.append({section_title: section_content}) | |
if valid: | |
spec_content = spec | |
if search_mode == "deep": | |
spec_content["contains"] = {k: v for d in contents for k, v in d.items()} | |
results.append(spec_content) | |
else: | |
unique_specs.add(spec['id']) | |
if len(results) > 0: | |
return KeywordResponse( | |
results=results, | |
search_time=time.time() - start_time | |
) | |
else: | |
raise HTTPException(status_code=404, detail="Specifications not found") | |
def bm25_search_specification(request: BM25KeywordRequest): | |
start_time = time.time() | |
spec_type = request.spec_type | |
threshold = request.threshold | |
query = request.keywords | |
results_out = [] | |
query_tokens = bm25s.tokenize(query) | |
results, scores = bm25_index.retrieve(query_tokens, k=len(bm25_index.corpus)) | |
print("BM25 raw scores:", scores) | |
def calculate_boosted_score(metadata, score, query): | |
title = set(metadata['title'].lower().split()) | |
q = set(query.lower().split()) | |
spec_id_presence = 0.5 if metadata['id'].lower() in q else 0 | |
booster = len(q & title) * 0.5 | |
return score + spec_id_presence + booster | |
spec_scores = {} | |
spec_indices = {} | |
spec_details = {} | |
for i in range(results.shape[1]): | |
doc = results[0, i] | |
score = scores[0, i] | |
spec = doc["metadata"]["id"] | |
boosted_score = calculate_boosted_score(doc['metadata'], score, query) | |
if spec not in spec_scores or boosted_score > spec_scores[spec]: | |
spec_scores[spec] = boosted_score | |
spec_indices[spec] = i | |
spec_details[spec] = { | |
'original_score': score, | |
'boosted_score': boosted_score, | |
'doc': doc | |
} | |
def normalize_scores(scores_dict): | |
if not scores_dict: | |
return {} | |
scores_array = np.array(list(scores_dict.values())).reshape(-1, 1) | |
scaler = MinMaxScaler() | |
normalized_scores = scaler.fit_transform(scores_array).flatten() | |
normalized_dict = {} | |
for i, spec in enumerate(scores_dict.keys()): | |
normalized_dict[spec] = normalized_scores[i] | |
return normalized_dict | |
normalized_scores = normalize_scores(spec_scores) | |
for spec in spec_details: | |
spec_details[spec]["normalized_score"] = normalized_scores[spec] | |
unique_specs = sorted(normalized_scores.keys(), key=lambda x: normalized_scores[x], reverse=True) | |
for rank, spec in enumerate(unique_specs, 1): | |
details = spec_details[spec] | |
metadata = details['doc']['metadata'] | |
if metadata.get('type', None) is None or (spec_type is not None and metadata["type"] != spec_type): | |
continue | |
if details['normalized_score'] < threshold / 100: | |
break | |
results_out.append(metadata) | |
if len(results_out) > 0: | |
return KeywordResponse( | |
results=results_out, | |
search_time=time.time() - start_time | |
) | |
else: | |
raise HTTPException(status_code=404, detail="Specifications not found") |