|
from io import StringIO |
|
import numpy as np |
|
import pandas as pd |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import json |
|
import os |
|
import time |
|
from datetime import datetime |
|
import traceback |
|
from dotenv import load_dotenv |
|
import warnings |
|
from fastapi import FastAPI, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import FileResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from pydantic import BaseModel |
|
from typing import Any, Dict, List, Literal, Optional |
|
|
|
load_dotenv() |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
app = FastAPI(title="3GPP Document Finder API", |
|
description="API to find 3GPP documents based on TSG document IDs") |
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
origins = [ |
|
"*", |
|
] |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=origins, |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
class DocRequest(BaseModel): |
|
doc_id: str |
|
release: Optional[int] = None |
|
|
|
class DocResponse(BaseModel): |
|
doc_id: str |
|
url: str |
|
search_time: float |
|
|
|
class BatchDocRequest(BaseModel): |
|
doc_ids: List[str] |
|
release: Optional[int] = None |
|
|
|
class BatchDocResponse(BaseModel): |
|
results: Dict[str, str] |
|
missing: List[str] |
|
search_time: float |
|
|
|
class KeywordRequest(BaseModel): |
|
keywords: str |
|
release: Optional[str] = None |
|
version: Optional[str] = None |
|
wg: Optional[str] = None |
|
spec_type: Optional[Literal["TS", "TR"]] = None |
|
mode: Optional[Literal["and", "or"]] = "and" |
|
|
|
class KeywordResponse(BaseModel): |
|
results: List[Dict[str, str]] |
|
search_time: float |
|
|
|
class TsgDocFinder: |
|
def __init__(self): |
|
self.main_ftp_url = "https://www.3gpp.org/ftp" |
|
self.indexer_file = "indexed_docs.json" |
|
self.indexer, self.last_indexer_date = self.load_indexer() |
|
|
|
def load_indexer(self): |
|
"""Load existing index if available""" |
|
if os.path.exists(self.indexer_file): |
|
with open(self.indexer_file, "r", encoding="utf-8") as f: |
|
x = json.load(f) |
|
return x["docs"], x["last_indexed_date"] |
|
return {}, None |
|
|
|
def save_indexer(self): |
|
"""Save the updated index""" |
|
with open(self.indexer_file, "w", encoding="utf-8") as f: |
|
today = datetime.today() |
|
output = {"docs": self.indexer, "last_indexed_date": today.strftime("%d/%m/%Y-%H:%M:%S")} |
|
json.dump(output, f, indent=4, ensure_ascii=False) |
|
|
|
def get_workgroup(self, doc): |
|
main_tsg = "tsg_ct" if doc[0] == "C" else "tsg_sa" if doc[0] == "S" else None |
|
if main_tsg is None: |
|
return None, None, None |
|
workgroup = f"WG{int(doc[1])}" if doc[1].isnumeric() else main_tsg.upper() |
|
return main_tsg, workgroup, doc |
|
|
|
def find_workgroup_url(self, main_tsg, workgroup): |
|
"""Find the URL for the specific workgroup""" |
|
response = requests.get(f"{self.main_ftp_url}/{main_tsg}", verify=False) |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
for item in soup.find_all("tr"): |
|
link = item.find("a") |
|
if link and workgroup in link.get_text(): |
|
return f"{self.main_ftp_url}/{main_tsg}/{link.get_text()}" |
|
|
|
return f"{self.main_ftp_url}/{main_tsg}/{workgroup}" |
|
|
|
def get_docs_from_url(self, url): |
|
"""Get list of documents/directories from a URL""" |
|
try: |
|
response = requests.get(url, verify=False, timeout=10) |
|
soup = BeautifulSoup(response.text, "html.parser") |
|
return [item.get_text() for item in soup.select("tr td a")] |
|
except Exception as e: |
|
print(f"Error accessing {url}: {e}") |
|
return [] |
|
|
|
def search_document(self, doc_id: str, release = None): |
|
"""Search for a specific document by its ID""" |
|
original_id = doc_id |
|
|
|
|
|
if original_id in self.indexer: |
|
return self.indexer[original_id] |
|
|
|
for doc in self.indexer: |
|
if doc.startswith(original_id): |
|
return self.indexer[doc] |
|
|
|
|
|
main_tsg, workgroup, doc = self.get_workgroup(doc_id) |
|
if not main_tsg: |
|
return f"Could not parse document ID: {doc_id}" |
|
|
|
print(f"Searching for {original_id} (parsed as {doc}) in {main_tsg}/{workgroup}...") |
|
|
|
|
|
wg_url = self.find_workgroup_url(main_tsg, workgroup) |
|
if not wg_url: |
|
return f"Could not find workgroup for {doc_id}" |
|
|
|
|
|
meeting_folders = self.get_docs_from_url(wg_url) |
|
|
|
for folder in meeting_folders: |
|
meeting_url = f"{wg_url}/{folder}" |
|
meeting_contents = self.get_docs_from_url(meeting_url) |
|
key = "docs" if "docs" in [x.lower() for x in meeting_contents] else "tdocs" if "tdocs" in [x.lower() for x in meeting_contents] else None |
|
if key is not None: |
|
docs_url = f"{meeting_url}/{key}" |
|
print(f"Checking {docs_url}...") |
|
files = self.get_docs_from_url(docs_url) |
|
|
|
|
|
for file in files: |
|
if doc in file.lower() or original_id in file: |
|
doc_url = f"{docs_url}/{file}" |
|
self.indexer[original_id] = doc_url |
|
return doc_url |
|
|
|
|
|
if "zip" in [x for x in files]: |
|
zip_url = f"{docs_url}/zip" |
|
print(f"Checking {zip_url}...") |
|
zip_files = self.get_docs_from_url(zip_url) |
|
|
|
for file in zip_files: |
|
if doc in file.lower() or original_id in file: |
|
doc_url = f"{zip_url}/{file}" |
|
self.indexer[original_id] = doc_url |
|
self.save_indexer() |
|
return doc_url |
|
|
|
return f"Document {doc_id} not found" |
|
|
|
|
|
class SpecDocFinder: |
|
def __init__(self): |
|
self.chars = "0123456789abcdefghijklmnopqrstuvwxyz" |
|
|
|
def search_document(self, doc_id, release = None): |
|
series = doc_id.split(".")[0] |
|
while len(series) < 2: |
|
series = "0" + series |
|
|
|
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}" |
|
|
|
response = requests.get(url, verify=False) |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
items = soup.find_all("tr")[1:] |
|
version_found = None |
|
if release is None: |
|
try: |
|
item = items[-1].find("a") |
|
except Exception as e: |
|
return f"Unable to find specification {doc_id} : {e}" |
|
a, b, c = [_ for _ in item.get_text().split("-")[1].replace(".zip", "")] |
|
version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" |
|
version_found = (version, item.get("href")) |
|
_, spec_url = version_found |
|
return spec_url if version_found is not None else f"Specification {doc_id} not found" |
|
else: |
|
for item in items: |
|
x = item.find("a") |
|
if f"{doc_id.replace('.', '')}-{self.chars[int(release)]}" in x.get_text(): |
|
a, b, c = [_ for _ in x.get_text().split("-")[1].replace(".zip", "")] |
|
version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" |
|
version_found = (version, x.get("href")) |
|
_, spec_url = version_found |
|
return spec_url if version_found is not None else f"Specification {doc_id} not found" |
|
|
|
finder_tsg = TsgDocFinder() |
|
finder_spec = SpecDocFinder() |
|
|
|
@app.get("/") |
|
async def main_menu(): |
|
return FileResponse(os.path.join("templates", "index.html")) |
|
|
|
@app.post("/search-spec", response_model=KeywordResponse) |
|
def search_spec(request: KeywordRequest): |
|
start_time = time.time() |
|
response = requests.get(f'https://www.3gpp.org/dynareport?code=status-report.htm', headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, verify=False) |
|
dfs = pd.read_html(StringIO(response.text), storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, encoding="utf-8") |
|
|
|
for x in range(len(dfs)): |
|
dfs[x] = dfs[x].replace({np.nan: None}) |
|
|
|
columns_needed = [0, 1, 2, 3, 4] |
|
extracted_dfs: List[pd.DataFrame] = [df.iloc[:, columns_needed] for df in dfs] |
|
columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] |
|
|
|
specifications = [] |
|
|
|
for df in extracted_dfs: |
|
for index, row in df.iterrows(): |
|
doc = row.to_list() |
|
doc_dict = dict(zip(columns, doc)) |
|
specifications.append(doc_dict) |
|
|
|
kws = [_.lower() for _ in request.keywords.split(" ")] |
|
results = [] |
|
|
|
for spec in specifications: |
|
if request.mode == "and": |
|
if not all(kw in spec["title"].lower() for kw in kws): |
|
continue |
|
elif request.mode == "or": |
|
if not any(kw in spec["title"].lower() for kw in kws): |
|
continue |
|
release = request.release |
|
version = request.version |
|
working_group = request.wg |
|
spec_type = request.spec_type |
|
|
|
if spec.get('vers', None) is None or (release is not None and spec["vers"].split(".")[0] != str(release)): |
|
continue |
|
if spec.get('vers', None) is None or (version is not None and spec["vers"] != version): |
|
continue |
|
if spec.get('WG', None) is None or (working_group is not None and spec["WG"] != working_group): |
|
continue |
|
if spec_type is not None and spec["type"] != spec_type: |
|
continue |
|
|
|
results.append({ |
|
"id": str(spec["spec_num"]), |
|
"title": spec["title"], |
|
"type": "Technical Specification" if spec["type"] == "TS" else "Technical Report", |
|
"release": str(spec["vers"].split(".")[0]), |
|
"version": str(spec["vers"]), |
|
"working_group": spec["WG"] |
|
}) |
|
|
|
if len(results) > 0: |
|
return KeywordResponse( |
|
results=results, |
|
search_time=time.time() - start_time |
|
) |
|
else: |
|
raise HTTPException(status_code=404, detail="Specification not found") |
|
@app.post("/find", response_model=DocResponse) |
|
def find_document(request: DocRequest): |
|
start_time = time.time() |
|
finder = finder_tsg if request.doc_id[0].isalpha() else finder_spec |
|
print(finder) |
|
|
|
result = finder.search_document(request.doc_id, request.release) |
|
print(result) |
|
|
|
if "not found" not in result and "Could not" not in result and "Unable" not in result: |
|
return DocResponse( |
|
doc_id=request.doc_id, |
|
url=result, |
|
search_time=time.time() - start_time |
|
) |
|
else: |
|
raise HTTPException(status_code=404, detail=result) |
|
|
|
@app.post("/batch", response_model=BatchDocResponse) |
|
def find_documents_batch(request: BatchDocRequest): |
|
start_time = time.time() |
|
|
|
results = {} |
|
missing = [] |
|
|
|
for doc_id in request.doc_ids: |
|
finder = finder_tsg if doc_id[0].isalpha() else finder_spec |
|
result = finder.search_document(doc_id) |
|
if "not found" not in result and "Could not" not in result and "Unable" not in result: |
|
results[doc_id] = result |
|
else: |
|
missing.append(doc_id) |
|
|
|
return BatchDocResponse( |
|
results=results, |
|
missing=missing, |
|
search_time=time.time() - start_time |
|
) |