from io import StringIO import numpy as np import pandas as pd import requests from bs4 import BeautifulSoup import json import os import time from datetime import datetime import traceback from dotenv import load_dotenv import warnings from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import Any, Dict, List, Literal, Optional load_dotenv() warnings.filterwarnings("ignore") app = FastAPI(title="3GPP Document Finder API", description="API to find 3GPP documents based on TSG document IDs") app.mount("/static", StaticFiles(directory="static"), name="static") origins = [ "*", ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class DocRequest(BaseModel): doc_id: str release: Optional[int] = None class DocResponse(BaseModel): doc_id: str url: str search_time: float class BatchDocRequest(BaseModel): doc_ids: List[str] release: Optional[int] = None class BatchDocResponse(BaseModel): results: Dict[str, str] missing: List[str] search_time: float class KeywordRequest(BaseModel): keywords: str release: Optional[str] = None version: Optional[str] = None wg: Optional[str] = None spec_type: Optional[Literal["TS", "TR"]] = None mode: Optional[Literal["and", "or"]] = "and" class KeywordResponse(BaseModel): results: List[Dict[str, str]] search_time: float class TsgDocFinder: def __init__(self): self.main_ftp_url = "https://www.3gpp.org/ftp" self.indexer_file = "indexed_docs.json" self.indexer, self.last_indexer_date = self.load_indexer() def load_indexer(self): """Load existing index if available""" if os.path.exists(self.indexer_file): with open(self.indexer_file, "r", encoding="utf-8") as f: x = json.load(f) return x["docs"], x["last_indexed_date"] return {}, None def save_indexer(self): """Save the updated index""" with open(self.indexer_file, "w", encoding="utf-8") as f: today = datetime.today() output = {"docs": self.indexer, "last_indexed_date": today.strftime("%d/%m/%Y-%H:%M:%S")} json.dump(output, f, indent=4, ensure_ascii=False) def get_workgroup(self, doc): main_tsg = "tsg_ct" if doc[0] == "C" else "tsg_sa" if doc[0] == "S" else None if main_tsg is None: return None, None, None workgroup = f"WG{int(doc[1])}" if doc[1].isnumeric() else main_tsg.upper() return main_tsg, workgroup, doc def find_workgroup_url(self, main_tsg, workgroup): """Find the URL for the specific workgroup""" response = requests.get(f"{self.main_ftp_url}/{main_tsg}", verify=False) soup = BeautifulSoup(response.text, 'html.parser') for item in soup.find_all("tr"): link = item.find("a") if link and workgroup in link.get_text(): return f"{self.main_ftp_url}/{main_tsg}/{link.get_text()}" return f"{self.main_ftp_url}/{main_tsg}/{workgroup}" def get_docs_from_url(self, url): """Get list of documents/directories from a URL""" try: response = requests.get(url, verify=False, timeout=10) soup = BeautifulSoup(response.text, "html.parser") return [item.get_text() for item in soup.select("tr td a")] except Exception as e: print(f"Error accessing {url}: {e}") return [] def search_document(self, doc_id: str, release = None): """Search for a specific document by its ID""" original_id = doc_id # Check if already indexed if original_id in self.indexer: return self.indexer[original_id] for doc in self.indexer: if doc.startswith(original_id): return self.indexer[doc] # Parse the document ID main_tsg, workgroup, doc = self.get_workgroup(doc_id) if not main_tsg: return f"Could not parse document ID: {doc_id}" print(f"Searching for {original_id} (parsed as {doc}) in {main_tsg}/{workgroup}...") # Find the workgroup URL wg_url = self.find_workgroup_url(main_tsg, workgroup) if not wg_url: return f"Could not find workgroup for {doc_id}" # Search in the workgroup directories meeting_folders = self.get_docs_from_url(wg_url) for folder in meeting_folders: meeting_url = f"{wg_url}/{folder}" meeting_contents = self.get_docs_from_url(meeting_url) key = "docs" if "docs" in [x.lower() for x in meeting_contents] else "tdocs" if "tdocs" in [x.lower() for x in meeting_contents] else None if key is not None: docs_url = f"{meeting_url}/{key}" print(f"Checking {docs_url}...") files = self.get_docs_from_url(docs_url) # Check for the document in the main Docs folder for file in files: if doc in file.lower() or original_id in file: doc_url = f"{docs_url}/{file}" self.indexer[original_id] = doc_url return doc_url # Check in ZIP subfolder if it exists if "zip" in [x for x in files]: zip_url = f"{docs_url}/zip" print(f"Checking {zip_url}...") zip_files = self.get_docs_from_url(zip_url) for file in zip_files: if doc in file.lower() or original_id in file: doc_url = f"{zip_url}/{file}" self.indexer[original_id] = doc_url self.save_indexer() return doc_url return f"Document {doc_id} not found" class SpecDocFinder: def __init__(self): self.chars = "0123456789abcdefghijklmnopqrstuvwxyz" def search_document(self, doc_id, release = None): series = doc_id.split(".")[0] while len(series) < 2: series = "0" + series url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}" response = requests.get(url, verify=False) soup = BeautifulSoup(response.text, 'html.parser') items = soup.find_all("tr")[1:] version_found = None if release is None: try: item = items[-1].find("a") except Exception as e: return f"Unable to find specification {doc_id} : {e}" a, b, c = [_ for _ in item.get_text().split("-")[1].replace(".zip", "")] version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" version_found = (version, item.get("href")) _, spec_url = version_found return spec_url if version_found is not None else f"Specification {doc_id} not found" else: for item in items: x = item.find("a") if f"{doc_id.replace('.', '')}-{self.chars[int(release)]}" in x.get_text(): a, b, c = [_ for _ in x.get_text().split("-")[1].replace(".zip", "")] version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" version_found = (version, x.get("href")) _, spec_url = version_found return spec_url if version_found is not None else f"Specification {doc_id} not found" finder_tsg = TsgDocFinder() finder_spec = SpecDocFinder() @app.get("/") async def main_menu(): return FileResponse(os.path.join("templates", "index.html")) @app.post("/search-spec", response_model=KeywordResponse) def search_spec(request: KeywordRequest): start_time = time.time() response = requests.get(f'https://www.3gpp.org/dynareport?code=status-report.htm', headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, verify=False) dfs = pd.read_html(StringIO(response.text), storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, encoding="utf-8") for x in range(len(dfs)): dfs[x] = dfs[x].replace({np.nan: None}) columns_needed = [0, 1, 2, 3, 4] extracted_dfs: List[pd.DataFrame] = [df.iloc[:, columns_needed] for df in dfs] columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] specifications = [] for df in extracted_dfs: for index, row in df.iterrows(): doc = row.to_list() doc_dict = dict(zip(columns, doc)) specifications.append(doc_dict) kws = [_.lower() for _ in request.keywords.split(" ")] results = [] for spec in specifications: if request.mode == "and": if not all(kw in spec["title"].lower() for kw in kws): continue elif request.mode == "or": if not any(kw in spec["title"].lower() for kw in kws): continue release = request.release version = request.version working_group = request.wg spec_type = request.spec_type if spec.get('vers', None) is None or (release is not None and spec["vers"].split(".")[0] != str(release)): continue if spec.get('vers', None) is None or (version is not None and spec["vers"] != version): continue if spec.get('WG', None) is None or (working_group is not None and spec["WG"] != working_group): continue if spec_type is not None and spec["type"] != spec_type: continue results.append({ "id": str(spec["spec_num"]), "title": spec["title"], "type": "Technical Specification" if spec["type"] == "TS" else "Technical Report", "release": str(spec["vers"].split(".")[0]), "version": str(spec["vers"]), "working_group": spec["WG"] }) if len(results) > 0: return KeywordResponse( results=results, search_time=time.time() - start_time ) else: raise HTTPException(status_code=404, detail="Specification not found") @app.post("/find", response_model=DocResponse) def find_document(request: DocRequest): start_time = time.time() finder = finder_tsg if request.doc_id[0].isalpha() else finder_spec print(finder) result = finder.search_document(request.doc_id, request.release) print(result) if "not found" not in result and "Could not" not in result and "Unable" not in result: return DocResponse( doc_id=request.doc_id, url=result, search_time=time.time() - start_time ) else: raise HTTPException(status_code=404, detail=result) @app.post("/batch", response_model=BatchDocResponse) def find_documents_batch(request: BatchDocRequest): start_time = time.time() results = {} missing = [] for doc_id in request.doc_ids: finder = finder_tsg if doc_id[0].isalpha() else finder_spec result = finder.search_document(doc_id) if "not found" not in result and "Could not" not in result and "Unable" not in result: results[doc_id] = result else: missing.append(doc_id) return BatchDocResponse( results=results, missing=missing, search_time=time.time() - start_time )