from io import StringIO | |
import numpy as np | |
import pandas as pd | |
import requests | |
from bs4 import BeautifulSoup | |
import json | |
import os | |
import traceback | |
import uuid | |
import zipfile | |
import io | |
import subprocess | |
import os | |
import re | |
import time | |
from datetime import datetime | |
import warnings | |
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import FileResponse | |
from fastapi.staticfiles import StaticFiles | |
from pydantic import BaseModel | |
from typing import Any, Dict, List, Literal, Optional | |
warnings.filterwarnings("ignore") | |
app = FastAPI(title="3GPP Document Finder API", | |
description="API to find 3GPP documents based on TSG document IDs") | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
origins = [ | |
"*", | |
] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# def get_text(specification: str, version: str): | |
# """Récupère les bytes du PDF à partir d'une spécification et d'une version.""" | |
# doc_id = specification | |
# series = doc_id.split(".")[0] | |
# response = requests.get( | |
# f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", | |
# verify=False, | |
# headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} | |
# ) | |
# if response.status_code != 200: | |
# raise Exception(f"Téléchargement du ZIP échoué pour {specification}-{version}") | |
# zip_bytes = io.BytesIO(response.content) | |
# with zipfile.ZipFile(zip_bytes) as zf: | |
# for file_name in zf.namelist(): | |
# if file_name.endswith("zip"): | |
# print("Another ZIP !") | |
# zip_bytes = io.BytesIO(zf.read(file_name)) | |
# zf = zipfile.ZipFile(zip_bytes) | |
# for file_name2 in zf.namelist(): | |
# if file_name2.endswith("doc") or file_name2.endswith("docx"): | |
# if "cover" in file_name2.lower(): | |
# print("COVER !") | |
# continue | |
# ext = file_name2.split(".")[-1] | |
# doc_bytes = zf.read(file_name2) | |
# temp_id = str(uuid.uuid4()) | |
# input_path = f"/tmp/{temp_id}.{ext}" | |
# output_path = f"/tmp/{temp_id}.txt" | |
# with open(input_path, "wb") as f: | |
# f.write(doc_bytes) | |
# subprocess.run([ | |
# "libreoffice", | |
# "--headless", | |
# "--convert-to", "txt", | |
# "--outdir", "/tmp", | |
# input_path | |
# ], check=True) | |
# with open(output_path, "r") as f: | |
# txt_data = [line.strip() for line in f if line.strip()] | |
# os.remove(input_path) | |
# os.remove(output_path) | |
# return txt_data | |
# elif file_name.endswith("doc") or file_name.endswith("docx"): | |
# if "cover" in file_name.lower(): | |
# print("COVER !") | |
# continue | |
# ext = file_name.split(".")[-1] | |
# doc_bytes = zf.read(file_name) | |
# temp_id = str(uuid.uuid4()) | |
# input_path = f"/tmp/{temp_id}.{ext}" | |
# output_path = f"/tmp/{temp_id}.txt" | |
# print("Ecriture") | |
# with open(input_path, "wb") as f: | |
# f.write(doc_bytes) | |
# print("Convertissement") | |
# subprocess.run([ | |
# "libreoffice", | |
# "--headless", | |
# "--convert-to", "txt", | |
# "--outdir", "/tmp", | |
# input_path | |
# ], check=True) | |
# print("Ecriture TXT") | |
# with open(output_path, "r", encoding="utf-8") as f: | |
# txt_data = [line.strip() for line in f if line.strip()] | |
# os.remove(input_path) | |
# os.remove(output_path) | |
# return txt_data | |
# raise Exception(f"Aucun fichier .doc/.docx trouvé dans le ZIP pour {specification}-{version}") | |
# def get_scope(specification: str, version: str): | |
# try: | |
# spec_text = get_text(specification, version) | |
# scp_i = 0 | |
# nxt_i = 0 | |
# for x in range(len(spec_text)): | |
# text = spec_text[x] | |
# if re.search(r"scope$", text, flags=re.IGNORECASE): | |
# scp_i = x | |
# nxt_i = scp_i + 10 | |
# if re.search(r"references$", text, flags=re.IGNORECASE): | |
# nxt_i = x | |
# return re.sub(r"\s+", " ", " ".join(spec_text[scp_i+1:nxt_i])) if len(spec_text[scp_i+1:nxt_i]) < 2 else "Not found" | |
# except Exception as e: | |
# traceback.print_exception(e) | |
# return "Not found (error)" | |
class DocRequest(BaseModel): | |
doc_id: str | |
class DocResponse(BaseModel): | |
doc_id: str | |
url: str | |
scope: Optional[str] = None | |
search_time: float | |
# class BatchDocRequest(BaseModel): | |
# doc_ids: List[str] | |
# release: Optional[int] = None | |
# class BatchDocResponse(BaseModel): | |
# results: Dict[str, str] | |
# missing: List[str] | |
# search_time: float | |
# class KeywordRequest(BaseModel): | |
# keywords: str | |
# release: Optional[str] = None | |
# wg: Optional[str] = None | |
# spec_type: Optional[Literal["TS", "TR"]] = None | |
# mode: Optional[Literal["and", "or"]] = "and" | |
# class KeywordResponse(BaseModel): | |
# results: List[Dict[str, str]] | |
# search_time: float | |
class DocFinder: | |
def __init__(self): | |
self.main_ftp_url = "https://docbox.etsi.org/SET" | |
self.session = requests.Session() | |
self.indexer_file = "indexed_docs.json" | |
self.indexer, self.last_indexer_date = self.load_indexer() | |
self.session.post("https://portal.etsi.org/ETSIPages/LoginEOL.ashx", verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}, data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")})) | |
def load_indexer(self): | |
if os.path.exists(self.indexer_file): | |
with open(self.indexer_file, "r", encoding="utf-8") as f: | |
x = json.load(f) | |
return x["docs"], x["last_indexed_date"] | |
return {}, None | |
def save_indexer(self): | |
today = datetime.today() | |
self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S") | |
with open(self.indexer_file, "w", encoding="utf-8") as f: | |
output = {"docs": self.indexer, "last_indexed_date": self.last_indexer_date} | |
json.dump(output, f, indent=4, ensure_ascii=False) | |
def get_workgroup(self, doc: str): | |
main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None | |
if main_tsg is None: | |
return None, None, None | |
regex = re.search(r'\(([^)]+)\)', doc) | |
workgroup = "20" + regex.group(1) | |
return main_tsg, workgroup, doc | |
def find_workgroup_url(self, main_tsg, workgroup): | |
response = self.session.get(f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS", verify=False) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for item in soup.find_all("tr"): | |
link = item.find("a") | |
if link and workgroup in link.get_text(): | |
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}" | |
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}" | |
def get_docs_from_url(self, url): | |
try: | |
response = self.session.get(url, verify=False, timeout=15) | |
soup = BeautifulSoup(response.text, "html.parser") | |
return [item.get_text() for item in soup.select("tr td a")] | |
except Exception as e: | |
print(f"Error accessing {url}: {e}") | |
return [] | |
def search_document(self, doc_id: str): | |
original = doc_id | |
if original in self.indexer: | |
return self.indexer[original] | |
for doc in self.indexer: | |
if doc.startswith(original): | |
return self.indexer[doc] | |
main_tsg, workgroup, doc = self.get_workgroup(doc_id) | |
if main_tsg: | |
wg_url = self.find_workgroup_url(main_tsg, workgroup) | |
if wg_url: | |
files = self.get_docs_from_url(wg_url) | |
for f in files: | |
if doc in f.lower() or original in f: | |
doc_url = f"{wg_url}/{f}" | |
self.indexer[original] = doc_url | |
self.save_indexer() | |
return doc_url | |
return f"Document {doc_id} not found" | |
# class DocFinder: | |
# def __init__(self): | |
# self.main_ftp_url = "https://www.3gpp.org/ftp" | |
# self.indexer_file = "indexed_docs.json" | |
# self.indexer, self.last_indexer_date = self.load_indexer() | |
# def load_indexer(self): | |
# """Load existing index if available""" | |
# if os.path.exists(self.indexer_file): | |
# with open(self.indexer_file, "r", encoding="utf-8") as f: | |
# x = json.load(f) | |
# return x["docs"], x["last_indexed_date"] | |
# return {}, None | |
# def save_indexer(self): | |
# """Save the updated index""" | |
# self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S") | |
# with open(self.indexer_file, "w", encoding="utf-8") as f: | |
# today = datetime.today() | |
# output = {"docs": self.indexer, "last_indexed_date": self.last_indexer_date} | |
# json.dump(output, f, indent=4, ensure_ascii=False) | |
# def get_workgroup(self, doc): | |
# main_tsg = "tsg_ct" if doc[0] == "C" else "tsg_sa" if doc[0] == "S" else None | |
# if main_tsg is None: | |
# return None, None, None | |
# workgroup = f"WG{int(doc[1])}" if doc[1].isnumeric() else main_tsg.upper() | |
# return main_tsg, workgroup, doc | |
# def find_workgroup_url(self, main_tsg, workgroup): | |
# """Find the URL for the specific workgroup""" | |
# response = requests.get(f"{self.main_ftp_url}/{main_tsg}", verify=False) | |
# soup = BeautifulSoup(response.text, 'html.parser') | |
# for item in soup.find_all("tr"): | |
# link = item.find("a") | |
# if link and workgroup in link.get_text(): | |
# return f"{self.main_ftp_url}/{main_tsg}/{link.get_text()}" | |
# return f"{self.main_ftp_url}/{main_tsg}/{workgroup}" | |
# def get_docs_from_url(self, url): | |
# """Get list of documents/directories from a URL""" | |
# try: | |
# response = requests.get(url, verify=False, timeout=10) | |
# soup = BeautifulSoup(response.text, "html.parser") | |
# return [item.get_text() for item in soup.select("tr td a")] | |
# except Exception as e: | |
# print(f"Error accessing {url}: {e}") | |
# return [] | |
# def search_document(self, doc_id: str, release=None): | |
# original_id = doc_id | |
# if original_id in self.indexer: | |
# return self.indexer[original_id] | |
# for doc in self.indexer: | |
# if doc.startswith(original_id): | |
# return self.indexer[doc] | |
# # 2. Recherche live "classique" (TSG/CT) | |
# main_tsg, workgroup, doc = self.get_workgroup(doc_id) | |
# if main_tsg: | |
# wg_url = self.find_workgroup_url(main_tsg, workgroup) | |
# if wg_url: | |
# meeting_folders = self.get_docs_from_url(wg_url) | |
# for folder in meeting_folders: | |
# meeting_url = f"{wg_url}/{folder}" | |
# meeting_contents = self.get_docs_from_url(meeting_url) | |
# key = "docs" if "docs" in [x.lower() for x in meeting_contents] else "tdocs" if "tdocs" in [x.lower() for x in meeting_contents] else None | |
# if key is not None: | |
# docs_url = f"{meeting_url}/{key}" | |
# files = self.get_docs_from_url(docs_url) | |
# for file in files: | |
# if doc in file.lower() or original_id in file: | |
# doc_url = f"{docs_url}/{file}" | |
# self.indexer[original_id] = doc_url | |
# return doc_url | |
# # ZIP subfolder | |
# if "zip" in [x for x in files]: | |
# zip_url = f"{docs_url}/zip" | |
# zip_files = self.get_docs_from_url(zip_url) | |
# for file in zip_files: | |
# if doc in file.lower() or original_id in file: | |
# doc_url = f"{zip_url}/{file}" | |
# self.indexer[original_id] = doc_url | |
# self.save_indexer() | |
# return doc_url | |
# # 3. Dernier recours : tenter dans /ftp/workshop (recherche live) | |
# workshop_url = f"{self.main_ftp_url}/workshop" | |
# meetings = self.get_docs_from_url(workshop_url) | |
# for meeting in meetings: | |
# if meeting in ['./', '../']: | |
# continue | |
# meeting_url = f"{workshop_url}/{meeting}" | |
# contents = self.get_docs_from_url(meeting_url) | |
# for sub in contents: | |
# if sub.lower() in ['docs', 'tdocs']: | |
# docs_url = f"{meeting_url}/{sub}" | |
# files = self.get_docs_from_url(docs_url) | |
# for file in files: | |
# if doc_id.lower() in file.lower() or original_id in file: | |
# doc_url = f"{docs_url}/{file}" | |
# self.indexer[original_id] = doc_url | |
# self.save_indexer() | |
# return doc_url | |
# if "zip" in [x.lower() for x in files]: | |
# zip_url = f"{docs_url}/zip" | |
# zip_files = self.get_docs_from_url(zip_url) | |
# for file in zip_files: | |
# if doc_id.lower() in file.lower() or original_id in file: | |
# doc_url = f"{zip_url}/{file}" | |
# self.indexer[original_id] = doc_url | |
# self.save_indexer() | |
# return doc_url | |
# return f"Document {doc_id} not found" | |
async def main_menu(): | |
return FileResponse(os.path.join("templates", "index.html")) | |
# @app.post("/search-spec", response_model=KeywordResponse) | |
# def search_spec(request: KeywordRequest): | |
# start_time = time.time() | |
# kws = [_.lower() for _ in request.keywords.split(" ")] | |
# results = [] | |
# for string, spec in finder_spec.indexer_specs.items(): | |
# if request.mode == "and": | |
# if not all(kw in string.lower() for kw in kws): | |
# continue | |
# elif request.mode == "or": | |
# if not any(kw in string.lower() for kw in kws): | |
# continue | |
# release = request.release | |
# working_group = request.wg | |
# spec_type = request.spec_type | |
# if spec.get('version', None) is None or (release is not None and spec["version"].split(".")[0] != str(release)): | |
# continue | |
# if spec.get('working_group', None) is None or (working_group is not None and spec["working_group"] != working_group): | |
# continue | |
# if spec_type is not None and spec["type"] != spec_type: | |
# continue | |
# results.append(spec) | |
# if len(results) > 0: | |
# return KeywordResponse( | |
# results=results, | |
# search_time=time.time() - start_time | |
# ) | |
# else: | |
# raise HTTPException(status_code=404, detail="Specifications not found") | |
finder = DocFinder() | |
def find_document(request: DocRequest): | |
start_time = time.time() | |
result = finder.search_document(request.doc_id) | |
if "not found" not in result and "Could not" not in result and "Unable" not in result: | |
return DocResponse( | |
doc_id=request.doc_id, | |
url=result, | |
search_time=time.time() - start_time | |
) | |
else: | |
raise HTTPException(status_code=404, detail=result) | |
# @app.post("/batch", response_model=BatchDocResponse) | |
# def find_documents_batch(request: BatchDocRequest): | |
# start_time = time.time() | |
# results = {} | |
# missing = [] | |
# for doc_id in request.doc_ids: | |
# finder = finder_tsg if doc_id[0].isalpha() else finder_spec | |
# result = finder.search_document(doc_id) | |
# if "not found" not in result and "Could not" not in result and "Unable" not in result: | |
# results[doc_id] = result | |
# else: | |
# missing.append(doc_id) | |
# return BatchDocResponse( | |
# results=results, | |
# missing=missing, | |
# search_time=time.time() - start_time | |
# ) |