from io import StringIO import numpy as np import pandas as pd import requests from bs4 import BeautifulSoup import json import os import pymupdf as fitz import uuid import zipfile import io import subprocess import os import re import time from datetime import datetime from dotenv import load_dotenv import warnings from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import Any, Dict, List, Literal, Optional load_dotenv() warnings.filterwarnings("ignore") app = FastAPI(title="3GPP Document Finder API", description="API to find 3GPP documents based on TSG document IDs") app.mount("/static", StaticFiles(directory="static"), name="static") origins = [ "*", ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def get_pdf_bytes(specification: str, version: str): doc_id = specification series = doc_id.split(".")[0] response = requests.get(f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", verify=False) if response.status_code != 200: raise Exception("Téléchargement du ZIP échoué") zip_bytes = io.BytesIO(response.content) with zipfile.ZipFile(zip_bytes) as zf: for file_name in zf.namelist(): if file_name.endswith("doc") or file_name.endswith("docx"): ext = file_name.split(".")[-1] doc_bytes = zf.read(file_name) temp_id = str(uuid.uuid4()) input_path = f"/tmp/{temp_id}.{ext}" output_path = f"/tmp/{temp_id}.pdf" with open(input_path, "wb") as f: f.write(doc_bytes) subprocess.run([ "libreoffice", "--headless", "--convert-to", "pdf", "--outdir", "/tmp", input_path ], check=True) with open(output_path, "rb") as f: pdf_data = f.read() os.remove(input_path) os.remove(output_path) return io.BytesIO(pdf_data) raise Exception("Aucun fichier .doc/.docx trouvé dans le ZIP") def get_scope(specification: str, version: str): pdf_bytes = get_pdf_bytes(specification, version) doc = fitz.open(stream=pdf_bytes, filetype="pdf") for content in doc.get_toc(): if "scope" in content[1].lower(): page_num = content[2] - 1 break doc = doc[page_num:] pdf_full_text = " ".join(page.get_text("text") for page in doc) pdf_postprocess_text = re.sub(r"\s+", " ", pdf_full_text) pdf_postprocess_text = pdf_postprocess_text.replace("1 Scope", " !-! ") pdf_postprocess_text = pdf_postprocess_text.replace("2 Reference", " !-! ") pdf_postprocess_text = pdf_postprocess_text.replace("", "- ") return pdf_postprocess_text.split(" !-! ")[1] class DocRequest(BaseModel): doc_id: str release: Optional[int] = None class DocResponse(BaseModel): doc_id: str url: str scope: Optional[str] = None search_time: float class BatchDocRequest(BaseModel): doc_ids: List[str] release: Optional[int] = None class BatchDocResponse(BaseModel): results: Dict[str, str] missing: List[str] search_time: float class KeywordRequest(BaseModel): keywords: str release: Optional[str] = None wg: Optional[str] = None spec_type: Optional[Literal["TS", "TR"]] = None mode: Optional[Literal["and", "or"]] = "and" class KeywordResponse(BaseModel): results: List[Dict[str, str]] search_time: float class TsgDocFinder: def __init__(self): self.main_ftp_url = "https://www.3gpp.org/ftp" self.indexer_file = "indexed_docs.json" self.indexer, self.last_indexer_date = self.load_indexer() def load_indexer(self): """Load existing index if available""" if os.path.exists(self.indexer_file): with open(self.indexer_file, "r", encoding="utf-8") as f: x = json.load(f) return x["docs"], x["last_indexed_date"] return {}, None def save_indexer(self): """Save the updated index""" with open(self.indexer_file, "w", encoding="utf-8") as f: today = datetime.today() output = {"docs": self.indexer, "last_indexed_date": today.strftime("%d/%m/%Y-%H:%M:%S")} json.dump(output, f, indent=4, ensure_ascii=False) def get_workgroup(self, doc): main_tsg = "tsg_ct" if doc[0] == "C" else "tsg_sa" if doc[0] == "S" else None if main_tsg is None: return None, None, None workgroup = f"WG{int(doc[1])}" if doc[1].isnumeric() else main_tsg.upper() return main_tsg, workgroup, doc def find_workgroup_url(self, main_tsg, workgroup): """Find the URL for the specific workgroup""" response = requests.get(f"{self.main_ftp_url}/{main_tsg}", verify=False) soup = BeautifulSoup(response.text, 'html.parser') for item in soup.find_all("tr"): link = item.find("a") if link and workgroup in link.get_text(): return f"{self.main_ftp_url}/{main_tsg}/{link.get_text()}" return f"{self.main_ftp_url}/{main_tsg}/{workgroup}" def get_docs_from_url(self, url): """Get list of documents/directories from a URL""" try: response = requests.get(url, verify=False, timeout=10) soup = BeautifulSoup(response.text, "html.parser") return [item.get_text() for item in soup.select("tr td a")] except Exception as e: print(f"Error accessing {url}: {e}") return [] def search_document(self, doc_id: str, release = None): """Search for a specific document by its ID""" original_id = doc_id # Check if already indexed if original_id in self.indexer: return self.indexer[original_id] for doc in self.indexer: if doc.startswith(original_id): return self.indexer[doc] # Parse the document ID main_tsg, workgroup, doc = self.get_workgroup(doc_id) if not main_tsg: return f"Could not parse document ID: {doc_id}" print(f"Searching for {original_id} (parsed as {doc}) in {main_tsg}/{workgroup}...") # Find the workgroup URL wg_url = self.find_workgroup_url(main_tsg, workgroup) if not wg_url: return f"Could not find workgroup for {doc_id}" # Search in the workgroup directories meeting_folders = self.get_docs_from_url(wg_url) for folder in meeting_folders: meeting_url = f"{wg_url}/{folder}" meeting_contents = self.get_docs_from_url(meeting_url) key = "docs" if "docs" in [x.lower() for x in meeting_contents] else "tdocs" if "tdocs" in [x.lower() for x in meeting_contents] else None if key is not None: docs_url = f"{meeting_url}/{key}" print(f"Checking {docs_url}...") files = self.get_docs_from_url(docs_url) # Check for the document in the main Docs folder for file in files: if doc in file.lower() or original_id in file: doc_url = f"{docs_url}/{file}" self.indexer[original_id] = doc_url return doc_url # Check in ZIP subfolder if it exists if "zip" in [x for x in files]: zip_url = f"{docs_url}/zip" print(f"Checking {zip_url}...") zip_files = self.get_docs_from_url(zip_url) for file in zip_files: if doc in file.lower() or original_id in file: doc_url = f"{zip_url}/{file}" self.indexer[original_id] = doc_url self.save_indexer() return doc_url return f"Document {doc_id} not found" class SpecDocFinder: def __init__(self): self.chars = "0123456789abcdefghijklmnopqrstuvwxyz" def search_document(self, doc_id, release = None): series = doc_id.split(".")[0] while len(series) < 2: series = "0" + series url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}" response = requests.get(url, verify=False) soup = BeautifulSoup(response.text, 'html.parser') items = soup.find_all("tr")[1:] version_found = None if release is None: try: item = items[-1].find("a") except Exception as e: print(e) return f"Unable to find specification {doc_id}" a, b, c = [_ for _ in item.get_text().split("-")[-1].replace(".zip", "")] version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" version_found = (version, item.get("href")) _, spec_url = version_found return spec_url if version_found is not None else f"Specification {doc_id} not found" else: for item in items: x = item.find("a") if f"{doc_id.replace('.', '')}-{self.chars[int(release)]}" in x.get_text(): a, b, c = [_ for _ in x.get_text().split("-")[-1].replace(".zip", "")] version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" version_found = (version, x.get("href")) _, spec_url = version_found return spec_url if version_found is not None else f"Specification {doc_id} not found" finder_tsg = TsgDocFinder() finder_spec = SpecDocFinder() @app.get("/") async def main_menu(): return FileResponse(os.path.join("templates", "index.html")) @app.post("/search-spec", response_model=KeywordResponse) def search_spec(request: KeywordRequest): chars = "0123456789abcdefghijklmnopqrstuvwxyz" start_time = time.time() response = requests.get(f'https://www.3gpp.org/dynareport?code=status-report.htm', headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, verify=False) dfs = pd.read_html(StringIO(response.text), storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, encoding="utf-8") for x in range(len(dfs)): dfs[x] = dfs[x].replace({np.nan: None}) columns_needed = [0, 1, 2, 3, 4] extracted_dfs: List[pd.DataFrame] = [df.iloc[:, columns_needed] for df in dfs] columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] specifications = [] for df in extracted_dfs: for index, row in df.iterrows(): doc = row.to_list() doc_dict = dict(zip(columns, doc)) specifications.append(doc_dict) kws = [_.lower() for _ in request.keywords.split(" ")] results = [] for spec in specifications: if request.mode == "and": if not all(kw in spec["title"].lower() for kw in kws): continue elif request.mode == "or": if not any(kw in spec["title"].lower() for kw in kws): continue release = request.release working_group = request.wg spec_type = request.spec_type if spec.get('vers', None) is None or (release is not None and spec["vers"].split(".")[0] != str(release)): continue if spec.get('WG', None) is None or (working_group is not None and spec["WG"] != working_group): continue if spec_type is not None and spec["type"] != spec_type: continue doc_id = str(spec["spec_num"]) series = doc_id.split(".")[0] a, b, c = str(spec["vers"]).split(".") print(spec["vers"]) if not (int(a) > 35 or int(b) > 35 or int(c) > 35): spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{chars[int(a)]}{chars[int(b)]}{chars[int(c)]}.zip" else: x,y,z = str(a), str(b), str(c) while len(x) < 2: x = "0" + x while len(y) < 2: y = "0" + y while len(z) < 2: z = "0" + z spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{x}{y}{z}.zip" results.append({ "id": str(spec["spec_num"]), "title": spec["title"], "type": "Technical Specification" if spec["type"] == "TS" else "Technical Report", "release": str(spec["vers"].split(".")[0]), "version": str(spec["vers"]), "working_group": spec["WG"], "url": spec_url }) if len(results) > 0: return KeywordResponse( results=results, search_time=time.time() - start_time ) else: raise HTTPException(status_code=404, detail="Specification not found") @app.post("/find", response_model=DocResponse) def find_document(request: DocRequest): start_time = time.time() finder = finder_tsg if request.doc_id[0].isalpha() else finder_spec print(finder) result = finder.search_document(request.doc_id, request.release) if "not found" not in result and "Could not" not in result and "Unable" not in result: version = result.split("/")[-1].replace(".zip", "").split("-")[-1] return DocResponse( doc_id=request.doc_id, url=result, search_time=time.time() - start_time ) if isinstance(finder, TsgDocFinder) else DocResponse( doc_id=request.doc_id, url=result, search_time=time.time() - start_time, scope=get_scope(request.doc_id, version) ) else: raise HTTPException(status_code=404, detail=result) @app.post("/batch", response_model=BatchDocResponse) def find_documents_batch(request: BatchDocRequest): start_time = time.time() results = {} missing = [] for doc_id in request.doc_ids: finder = finder_tsg if doc_id[0].isalpha() else finder_spec result = finder.search_document(doc_id) if "not found" not in result and "Could not" not in result and "Unable" not in result: results[doc_id] = result else: missing.append(doc_id) return BatchDocResponse( results=results, missing=missing, search_time=time.time() - start_time )