from io import StringIO import bm25s import numpy as np import pandas as pd import requests from bs4 import BeautifulSoup import json import os import traceback import uuid import zipfile import io import subprocess import os import re import time from datetime import datetime from dotenv import load_dotenv import warnings from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware import nltk from nltk.stem import WordNetLemmatizer from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import Any, Dict, List, Literal, Optional from sklearn.preprocessing import MinMaxScaler nltk.download("wordnet") load_dotenv() warnings.filterwarnings("ignore") app = FastAPI(title="3GPP Document Finder API", description="API to find 3GPP documents based on TSG document IDs") app.mount("/static", StaticFiles(directory="static"), name="static") origins = [ "*", ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def get_text(specification: str, version: str): """Récupère les bytes du PDF à partir d'une spécification et d'une version.""" doc_id = specification series = doc_id.split(".")[0] response = requests.get( f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", verify=False, headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} ) if response.status_code != 200: raise Exception(f"Téléchargement du ZIP échoué pour {specification}-{version}") zip_bytes = io.BytesIO(response.content) with zipfile.ZipFile(zip_bytes) as zf: for file_name in zf.namelist(): if file_name.endswith("zip"): print("Another ZIP !") zip_bytes = io.BytesIO(zf.read(file_name)) zf = zipfile.ZipFile(zip_bytes) for file_name2 in zf.namelist(): if file_name2.endswith("doc") or file_name2.endswith("docx"): if "cover" in file_name2.lower(): print("COVER !") continue ext = file_name2.split(".")[-1] doc_bytes = zf.read(file_name2) temp_id = str(uuid.uuid4()) input_path = f"/tmp/{temp_id}.{ext}" output_path = f"/tmp/{temp_id}.txt" with open(input_path, "wb") as f: f.write(doc_bytes) subprocess.run([ "libreoffice", "--headless", "--convert-to", "txt", "--outdir", "/tmp", input_path ], check=True) with open(output_path, "r") as f: txt_data = [line.strip() for line in f if line.strip()] os.remove(input_path) os.remove(output_path) return txt_data elif file_name.endswith("doc") or file_name.endswith("docx"): if "cover" in file_name.lower(): print("COVER !") continue ext = file_name.split(".")[-1] doc_bytes = zf.read(file_name) temp_id = str(uuid.uuid4()) input_path = f"/tmp/{temp_id}.{ext}" output_path = f"/tmp/{temp_id}.txt" print("Ecriture") with open(input_path, "wb") as f: f.write(doc_bytes) print("Convertissement") subprocess.run([ "libreoffice", "--headless", "--convert-to", "txt", "--outdir", "/tmp", input_path ], check=True) print("Ecriture TXT") with open(output_path, "r", encoding="utf-8") as f: txt_data = [line.strip() for line in f if line.strip()] os.remove(input_path) os.remove(output_path) return txt_data raise Exception(f"Aucun fichier .doc/.docx trouvé dans le ZIP pour {specification}-{version}") def get_scope(specification: str, version: str): try: spec_text = get_text(specification, version) scp_i = 0 nxt_i = 0 for x in range(len(spec_text)): text = spec_text[x] if re.search(r"scope$", text, flags=re.IGNORECASE): scp_i = x nxt_i = scp_i + 10 if re.search(r"references$", text, flags=re.IGNORECASE): nxt_i = x return re.sub(r"\s+", " ", " ".join(spec_text[scp_i+1:nxt_i])) if len(spec_text[scp_i+1:nxt_i]) < 2 else "Not found" except Exception as e: traceback.print_exception(e) return "Not found (error)" def get_spec_content(specification: str, version: str): text = get_text(specification, version) forewords = [] for x in range(len(text)): line = text[x] if "Foreword" in line: forewords.append(x) if len(forewords) >= 2: break toc_brut = text[forewords[0]:forewords[1]] chapters = [] for line in toc_brut: x = line.split("\t") if re.search(r"^\d+\t[\ \S]+", line): chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2])) if re.search(r"^\d+\.\d+\t[\ \S]+", line): chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2])) if re.search(r"^\d+\.\d+\.\d+\t[\ \S]+", line): chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2])) if re.search(r"^\d+\.\d+\.\d+.\d+\t[\ \S]+", line): chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2])) if re.search(r"^\d+\.\d+\.\d+.\d+.\d+\t[\ \S]+", line): chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2])) real_toc_indexes = {} for chapter in chapters: try: x = text.index(chapter) real_toc_indexes[chapter] = x except ValueError as e: try: number = chapter.split("\t")[0] + "\t" for line in text[forewords[1]:]: if number in line: x = text.index(line) real_toc_indexes[line] = x break except: real_toc_indexes[chapter] = -float("inf") document = {} toc = list(real_toc_indexes.keys()) index_toc = list(real_toc_indexes.values()) curr_index = 0 for x in range(1, len(toc)): document[toc[curr_index].replace("\t", " ")] = re.sub(r"[\ \t]+", " ", "\n".join(text[index_toc[curr_index]+1:index_toc[x]])) curr_index = x document[toc[curr_index].replace("\t"," ")] = re.sub(r"\s+", " ", " ".join(text[index_toc[curr_index]+1:])) return document def caseSensitive(string: str, sensitive: bool): return string if sensitive else string.lower() class DocRequest(BaseModel): doc_id: str release: Optional[int] = None class DocResponse(BaseModel): doc_id: str url: str version: str scope: Optional[str] = None search_time: float class BatchDocRequest(BaseModel): doc_ids: List[str] release: Optional[int] = None class BatchDocResponse(BaseModel): results: Dict[str, str] missing: List[str] search_time: float class KeywordRequest2(BaseModel): keywords: Optional[str] = "" threshold: Optional[int] = 60 release: Optional[str] = None working_group: Optional[str] = None spec_type: Optional[Literal["TS", "TR"]] = None class KeywordRequest(BaseModel): keywords: Optional[str] = "" search_mode: Literal["quick", "deep"] case_sensitive: Optional[bool] = False release: Optional[str] = None working_group: Optional[str] = None spec_type: Optional[Literal["TS", "TR"]] = None mode: Optional[Literal["and", "or"]] = "and" class KeywordResponse(BaseModel): results: List[Dict[str, Any]] search_time: float class TsgDocFinder: def __init__(self): self.main_ftp_url = "https://www.3gpp.org/ftp" self.indexer_file = "indexed_docs.json" self.indexer, self.last_indexer_date = self.load_indexer() def load_indexer(self): """Load existing index if available""" if os.path.exists(self.indexer_file): with open(self.indexer_file, "r", encoding="utf-8") as f: x = json.load(f) return x["docs"], x["last_indexed_date"] return {}, None def save_indexer(self): """Save the updated index""" self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S") with open(self.indexer_file, "w", encoding="utf-8") as f: today = datetime.today() output = {"docs": self.indexer, "last_indexed_date": self.last_indexer_date} json.dump(output, f, indent=4, ensure_ascii=False) def get_workgroup(self, doc): main_tsg = "tsg_ct" if doc[0] == "C" else "tsg_sa" if doc[0] == "S" else None if main_tsg is None: return None, None, None workgroup = f"WG{int(doc[1])}" if doc[1].isnumeric() else main_tsg.upper() return main_tsg, workgroup, doc def find_workgroup_url(self, main_tsg, workgroup): """Find the URL for the specific workgroup""" response = requests.get(f"{self.main_ftp_url}/{main_tsg}", verify=False) soup = BeautifulSoup(response.text, 'html.parser') for item in soup.find_all("tr"): link = item.find("a") if link and workgroup in link.get_text(): return f"{self.main_ftp_url}/{main_tsg}/{link.get_text()}" return f"{self.main_ftp_url}/{main_tsg}/{workgroup}" def get_docs_from_url(self, url): """Get list of documents/directories from a URL""" try: response = requests.get(url, verify=False, timeout=10) soup = BeautifulSoup(response.text, "html.parser") return [item.get_text() for item in soup.select("tr td a")] except Exception as e: print(f"Error accessing {url}: {e}") return [] def search_document(self, doc_id: str, release=None): original_id = doc_id if original_id in self.indexer: return self.indexer[original_id] for doc in self.indexer: if doc.startswith(original_id): return self.indexer[doc] # 2. Recherche live "classique" (TSG/CT) main_tsg, workgroup, doc = self.get_workgroup(doc_id) if main_tsg: wg_url = self.find_workgroup_url(main_tsg, workgroup) if wg_url: meeting_folders = self.get_docs_from_url(wg_url) for folder in meeting_folders: meeting_url = f"{wg_url}/{folder}" meeting_contents = self.get_docs_from_url(meeting_url) key = "docs" if "docs" in [x.lower() for x in meeting_contents] else "tdocs" if "tdocs" in [x.lower() for x in meeting_contents] else None if key is not None: docs_url = f"{meeting_url}/{key}" files = self.get_docs_from_url(docs_url) for file in files: if doc in file.lower() or original_id in file: doc_url = f"{docs_url}/{file}" self.indexer[original_id] = doc_url return doc_url # ZIP subfolder if "zip" in [x for x in files]: zip_url = f"{docs_url}/zip" zip_files = self.get_docs_from_url(zip_url) for file in zip_files: if doc in file.lower() or original_id in file: doc_url = f"{zip_url}/{file}" self.indexer[original_id] = doc_url self.save_indexer() return doc_url # 3. Dernier recours : tenter dans /ftp/workshop (recherche live) workshop_url = f"{self.main_ftp_url}/workshop" meetings = self.get_docs_from_url(workshop_url) for meeting in meetings: if meeting in ['./', '../']: continue meeting_url = f"{workshop_url}/{meeting}" contents = self.get_docs_from_url(meeting_url) for sub in contents: if sub.lower() in ['docs', 'tdocs']: docs_url = f"{meeting_url}/{sub}" files = self.get_docs_from_url(docs_url) for file in files: if doc_id.lower() in file.lower() or original_id in file: doc_url = f"{docs_url}/{file}" self.indexer[original_id] = doc_url self.save_indexer() return doc_url if "zip" in [x.lower() for x in files]: zip_url = f"{docs_url}/zip" zip_files = self.get_docs_from_url(zip_url) for file in zip_files: if doc_id.lower() in file.lower() or original_id in file: doc_url = f"{zip_url}/{file}" self.indexer[original_id] = doc_url self.save_indexer() return doc_url return f"Document {doc_id} not found" class SpecDocFinder: def __init__(self): self.chars = "0123456789abcdefghijklmnopqrstuvwxyz" self.indexer_file = "indexed_specifications.json" self.doc_zip = "indexed_docs_content.zip" self.indexer_specs, self.indexer_scopes, self.last_indexer_date = self.load_indexer() self.indexer_documents = self.load_documents() def load_indexer(self): """Load existing index if available""" if os.path.exists(self.indexer_file): with open(self.indexer_file, "r", encoding="utf-8") as f: x = json.load(f) return x["specs"], x["scopes"], x["last_indexed_date"] return {}, {}, None def load_documents(self): if os.path.exists(self.doc_zip): with zipfile.ZipFile(open(self.doc_zip, "rb")) as zf: for file_name in zf.namelist(): if file_name.endswith(".json"): doc_bytes = zf.read(file_name) try: doc_data = json.loads(doc_bytes.decode("utf-8")) print("Documents loaded successfully !") return doc_data except json.JSONDecodeError as e: print(f"Error while decoding the JSON file {file_name}: {e}") print("Failed !") return {} def get_document(self, spec, version): doc = self.indexer_documents.get(spec) if doc: return doc else: return get_spec_content(spec, version) def get_section(self, doc, chapter): return doc[chapter] def save_indexer(self): """Save the updated index""" self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S") with open(self.indexer_file, "w", encoding="utf-8") as f: today = datetime.today() output = {"specs": self.indexer_specs, "scopes": self.indexer_scopes, "last_indexed_date": self.last_indexer_date} json.dump(output, f, indent=4, ensure_ascii=False) def search_document(self, doc_id, release = None): for key in self.indexer_specs.keys(): if str(doc_id) in key: return self.indexer_specs[key]['url'] series = doc_id.split(".")[0] while len(series) < 2: series = "0" + series url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}" response = requests.get(url, verify=False) soup = BeautifulSoup(response.text, 'html.parser') items = soup.find_all("tr")[1:] version_found = None if release is None: try: item = items[-1].find("a") except Exception as e: print(e) return f"Unable to find specification {doc_id}" a, b, c = [_ for _ in item.get_text().split("-")[-1].replace(".zip", "")] version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" version_found = (version, item.get("href")) _, spec_url = version_found return spec_url if version_found is not None else f"Specification {doc_id} not found" else: for item in items: x = item.find("a") if f"{doc_id.replace('.', '')}-{self.chars[int(release)]}" in x.get_text(): a, b, c = [_ for _ in x.get_text().split("-")[-1].replace(".zip", "")] version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" version_found = (version, x.get("href")) _, spec_url = version_found return spec_url if version_found is not None else f"Specification {doc_id} not found" finder_tsg = TsgDocFinder() finder_spec = SpecDocFinder() lemmatizer = WordNetLemmatizer() if os.path.exists("bm25s.zip"): with zipfile.ZipFile("bm25s.zip", 'r') as zip_ref: zip_ref.extractall(".") bm25_engine = bm25s.BM25.load("3gpp_bm25_docs", load_corpus=True) @app.get("/") async def main_menu(): return FileResponse(os.path.join("templates", "index.html")) @app.post("/search-spec/experimental", response_model=KeywordResponse) def search_spec_bm25(request: KeywordRequest2): start_time = time.time() release = request.release working_group = request.working_group spec_type = request.spec_type threshold = request.threshold query = lemmatizer.lemmatize(request.keywords) results_out = [] query_tokens = bm25s.tokenize(query) results, scores = bm25_engine.retrieve(query_tokens, k=len(bm25_engine.corpus)) def calculate_boosted_score(metadata, score, query): title = {lemmatizer.lemmatize(metadata['title']).lower()} q = {query.lower()} spec_id_presence = 0.5 if len(q & {metadata['id']}) > 0 else 0 booster = len(q & title) * 0.5 return score + spec_id_presence + booster spec_scores = {} spec_indices = {} spec_details = {} for i in range(results.shape[1]): doc = results[0, i] score = scores[0, i] spec = doc["metadata"]["id"] boosted_score = calculate_boosted_score(doc['metadata'], score, query) if spec not in spec_scores or boosted_score > spec_scores[spec]: spec_scores[spec] = boosted_score spec_indices[spec] = i spec_details[spec] = { 'original_score': score, 'boosted_score': boosted_score, 'doc': doc } def normalize_scores(scores_dict): if not scores_dict: return {} scores_array = np.array(list(scores_dict.values())).reshape(-1, 1) scaler = MinMaxScaler() normalized_scores = scaler.fit_transform(scores_array).flatten() normalized_dict = {} for i, spec in enumerate(scores_dict.keys()): normalized_dict[spec] = normalized_scores[i] return normalized_dict normalized_scores = normalize_scores(spec_scores) for spec in spec_details: spec_details[spec]["normalized_score"] = normalized_scores[spec] unique_specs = sorted(normalized_scores.keys(), key=lambda x: normalized_scores[x], reverse=True) for rank, spec in enumerate(unique_specs, 1): details = spec_details[spec] metadata = details['doc']['metadata'] if metadata.get('version', None) is None or (release is not None and metadata["version"].split(".")[0] != str(release)): continue if metadata.get('type', None) is None or (spec_type is not None and metadata["type"] != spec_type): continue if metadata.get('working_group', None) is None or (working_group is not None and metadata["working_group"] != working_group): continue if details['normalized_score'] < threshold / 100: break results_out.append(metadata) if len(results_out) > 0: return KeywordResponse( results=results_out, search_time=time.time() - start_time ) else: raise HTTPException(status_code=404, detail="Specifications not found") @app.post("/search-spec", response_model=KeywordResponse) def search_spec(request: KeywordRequest): start_time = time.time() booleanLowered = request.case_sensitive search_mode = request.search_mode release = request.release working_group = request.working_group spec_type = request.spec_type kws = [caseSensitive(_, booleanLowered) for _ in request.keywords.split(",")] print(kws) unique_specs = set() results = [] if kws == [""] and search_mode == "deep": raise HTTPException(status_code=400, detail="You must enter keywords in deep search mode !") for string, spec in finder_spec.indexer_specs.items(): put = False if spec['id'] in unique_specs: continue if spec.get('version', None) is None or (release is not None and spec["version"].split(".")[0] != str(release)): continue if spec.get('type', None) is None or (spec_type is not None and spec["type"] != spec_type): continue if spec.get('working_group', None) is None or (working_group is not None and spec["working_group"] != working_group): continue if kws != "": if search_mode == "deep": contents = [] version = finder_spec.search_document(spec['id'], spec['release']).split("/")[-1].replace(".zip", "").split("-")[-1] doc = finder_spec.get_document(spec['id'], version) docValid = not isinstance(doc, str) if request.mode == "and": if all(kw in caseSensitive(string, booleanLowered) for kw in kws): put = True if search_mode == "deep": if docValid: for chapter in list(doc.keys())[1:]: if "references" not in chapter.lower() and "void" not in chapter.lower() and "annex" not in doc[chapter].lower(): if all(kw in caseSensitive(doc[chapter], booleanLowered) for kw in kws): put = True contents.append(chapter) elif request.mode == "or": if any(kw in caseSensitive(string, booleanLowered) for kw in kws): put = True if search_mode == "deep": if docValid: for chapter in list(doc.keys())[1:]: if "references" not in chapter.lower() and "void" not in chapter.lower() and "annex" not in doc[chapter].lower(): if any(kw in caseSensitive(doc[chapter], booleanLowered) for kw in kws): put = True contents.append(chapter) else: put = True if put: spec_content = spec if search_mode == "deep": spec_content["contains"] = {chap: doc[chap] for chap in contents} results.append(spec_content) else: unique_specs.add(spec['id']) if len(results) > 0: return KeywordResponse( results=results, search_time=time.time() - start_time ) else: raise HTTPException(status_code=404, detail="Specifications not found") @app.post("/find", response_model=DocResponse) def find_document(request: DocRequest): start_time = time.time() finder = finder_tsg if request.doc_id[0].isalpha() else finder_spec result = finder.search_document(request.doc_id, request.release) if "not found" not in result and "Could not" not in result and "Unable" not in result: version = result.split("/")[-1].replace(".zip", "").split("-")[-1] return DocResponse( doc_id=request.doc_id, version=version, url=result, search_time=time.time() - start_time ) if isinstance(finder, TsgDocFinder) else DocResponse( doc_id=request.doc_id, version=version, url=result, search_time=time.time() - start_time, scope=finder.indexer_scopes[request.doc_id] if request.doc_id in finder.indexer_scopes else get_scope(request.doc_id, version) ) else: raise HTTPException(status_code=404, detail=result) @app.post("/batch", response_model=BatchDocResponse) def find_documents_batch(request: BatchDocRequest): start_time = time.time() results = {} missing = [] for doc_id in request.doc_ids: finder = finder_tsg if doc_id[0].isalpha() else finder_spec result = finder.search_document(doc_id) if "not found" not in result and "Could not" not in result and "Unable" not in result: results[doc_id] = result else: missing.append(doc_id) return BatchDocResponse( results=results, missing=missing, search_time=time.time() - start_time )