ETSIDocFinder / app.py
om4r932's picture
First version - Single search only -- Proto
9f149f3
raw
history blame
18 kB
from io import StringIO
import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
import json
import os
import traceback
import uuid
import zipfile
import io
import subprocess
import os
import re
import time
from datetime import datetime
import warnings
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Any, Dict, List, Literal, Optional
warnings.filterwarnings("ignore")
app = FastAPI(title="3GPP Document Finder API",
description="API to find 3GPP documents based on TSG document IDs")
app.mount("/static", StaticFiles(directory="static"), name="static")
origins = [
"*",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# def get_text(specification: str, version: str):
# """Récupère les bytes du PDF à partir d'une spécification et d'une version."""
# doc_id = specification
# series = doc_id.split(".")[0]
# response = requests.get(
# f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip",
# verify=False,
# headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
# )
# if response.status_code != 200:
# raise Exception(f"Téléchargement du ZIP échoué pour {specification}-{version}")
# zip_bytes = io.BytesIO(response.content)
# with zipfile.ZipFile(zip_bytes) as zf:
# for file_name in zf.namelist():
# if file_name.endswith("zip"):
# print("Another ZIP !")
# zip_bytes = io.BytesIO(zf.read(file_name))
# zf = zipfile.ZipFile(zip_bytes)
# for file_name2 in zf.namelist():
# if file_name2.endswith("doc") or file_name2.endswith("docx"):
# if "cover" in file_name2.lower():
# print("COVER !")
# continue
# ext = file_name2.split(".")[-1]
# doc_bytes = zf.read(file_name2)
# temp_id = str(uuid.uuid4())
# input_path = f"/tmp/{temp_id}.{ext}"
# output_path = f"/tmp/{temp_id}.txt"
# with open(input_path, "wb") as f:
# f.write(doc_bytes)
# subprocess.run([
# "libreoffice",
# "--headless",
# "--convert-to", "txt",
# "--outdir", "/tmp",
# input_path
# ], check=True)
# with open(output_path, "r") as f:
# txt_data = [line.strip() for line in f if line.strip()]
# os.remove(input_path)
# os.remove(output_path)
# return txt_data
# elif file_name.endswith("doc") or file_name.endswith("docx"):
# if "cover" in file_name.lower():
# print("COVER !")
# continue
# ext = file_name.split(".")[-1]
# doc_bytes = zf.read(file_name)
# temp_id = str(uuid.uuid4())
# input_path = f"/tmp/{temp_id}.{ext}"
# output_path = f"/tmp/{temp_id}.txt"
# print("Ecriture")
# with open(input_path, "wb") as f:
# f.write(doc_bytes)
# print("Convertissement")
# subprocess.run([
# "libreoffice",
# "--headless",
# "--convert-to", "txt",
# "--outdir", "/tmp",
# input_path
# ], check=True)
# print("Ecriture TXT")
# with open(output_path, "r", encoding="utf-8") as f:
# txt_data = [line.strip() for line in f if line.strip()]
# os.remove(input_path)
# os.remove(output_path)
# return txt_data
# raise Exception(f"Aucun fichier .doc/.docx trouvé dans le ZIP pour {specification}-{version}")
# def get_scope(specification: str, version: str):
# try:
# spec_text = get_text(specification, version)
# scp_i = 0
# nxt_i = 0
# for x in range(len(spec_text)):
# text = spec_text[x]
# if re.search(r"scope$", text, flags=re.IGNORECASE):
# scp_i = x
# nxt_i = scp_i + 10
# if re.search(r"references$", text, flags=re.IGNORECASE):
# nxt_i = x
# return re.sub(r"\s+", " ", " ".join(spec_text[scp_i+1:nxt_i])) if len(spec_text[scp_i+1:nxt_i]) < 2 else "Not found"
# except Exception as e:
# traceback.print_exception(e)
# return "Not found (error)"
class DocRequest(BaseModel):
doc_id: str
class DocResponse(BaseModel):
doc_id: str
url: str
scope: Optional[str] = None
search_time: float
# class BatchDocRequest(BaseModel):
# doc_ids: List[str]
# release: Optional[int] = None
# class BatchDocResponse(BaseModel):
# results: Dict[str, str]
# missing: List[str]
# search_time: float
# class KeywordRequest(BaseModel):
# keywords: str
# release: Optional[str] = None
# wg: Optional[str] = None
# spec_type: Optional[Literal["TS", "TR"]] = None
# mode: Optional[Literal["and", "or"]] = "and"
# class KeywordResponse(BaseModel):
# results: List[Dict[str, str]]
# search_time: float
class DocFinder:
def __init__(self):
self.main_ftp_url = "https://docbox.etsi.org/SET"
self.session = requests.Session()
self.indexer_file = "indexed_docs.json"
self.indexer, self.last_indexer_date = self.load_indexer()
self.session.post("https://portal.etsi.org/ETSIPages/LoginEOL.ashx", verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}, data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}))
def load_indexer(self):
if os.path.exists(self.indexer_file):
with open(self.indexer_file, "r", encoding="utf-8") as f:
x = json.load(f)
return x["docs"], x["last_indexed_date"]
return {}, None
def save_indexer(self):
today = datetime.today()
self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S")
with open(self.indexer_file, "w", encoding="utf-8") as f:
output = {"docs": self.indexer, "last_indexed_date": self.last_indexer_date}
json.dump(output, f, indent=4, ensure_ascii=False)
def get_workgroup(self, doc: str):
main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None
if main_tsg is None:
return None, None, None
regex = re.search(r'\(([^)]+)\)', doc)
workgroup = "20" + regex.group(1)
return main_tsg, workgroup, doc
def find_workgroup_url(self, main_tsg, workgroup):
response = self.session.get(f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS", verify=False)
soup = BeautifulSoup(response.text, 'html.parser')
for item in soup.find_all("tr"):
link = item.find("a")
if link and workgroup in link.get_text():
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}"
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}"
def get_docs_from_url(self, url):
try:
response = self.session.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, "html.parser")
return [item.get_text() for item in soup.select("tr td a")]
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def search_document(self, doc_id: str):
original = doc_id
if original in self.indexer:
return self.indexer[original]
for doc in self.indexer:
if doc.startswith(original):
return self.indexer[doc]
main_tsg, workgroup, doc = self.get_workgroup(doc_id)
if main_tsg:
wg_url = self.find_workgroup_url(main_tsg, workgroup)
if wg_url:
files = self.get_docs_from_url(wg_url)
for f in files:
if doc in f.lower() or original in f:
doc_url = f"{wg_url}/{f}"
self.indexer[original] = doc_url
self.save_indexer()
return doc_url
return f"Document {doc_id} not found"
# class DocFinder:
# def __init__(self):
# self.main_ftp_url = "https://www.3gpp.org/ftp"
# self.indexer_file = "indexed_docs.json"
# self.indexer, self.last_indexer_date = self.load_indexer()
# def load_indexer(self):
# """Load existing index if available"""
# if os.path.exists(self.indexer_file):
# with open(self.indexer_file, "r", encoding="utf-8") as f:
# x = json.load(f)
# return x["docs"], x["last_indexed_date"]
# return {}, None
# def save_indexer(self):
# """Save the updated index"""
# self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S")
# with open(self.indexer_file, "w", encoding="utf-8") as f:
# today = datetime.today()
# output = {"docs": self.indexer, "last_indexed_date": self.last_indexer_date}
# json.dump(output, f, indent=4, ensure_ascii=False)
# def get_workgroup(self, doc):
# main_tsg = "tsg_ct" if doc[0] == "C" else "tsg_sa" if doc[0] == "S" else None
# if main_tsg is None:
# return None, None, None
# workgroup = f"WG{int(doc[1])}" if doc[1].isnumeric() else main_tsg.upper()
# return main_tsg, workgroup, doc
# def find_workgroup_url(self, main_tsg, workgroup):
# """Find the URL for the specific workgroup"""
# response = requests.get(f"{self.main_ftp_url}/{main_tsg}", verify=False)
# soup = BeautifulSoup(response.text, 'html.parser')
# for item in soup.find_all("tr"):
# link = item.find("a")
# if link and workgroup in link.get_text():
# return f"{self.main_ftp_url}/{main_tsg}/{link.get_text()}"
# return f"{self.main_ftp_url}/{main_tsg}/{workgroup}"
# def get_docs_from_url(self, url):
# """Get list of documents/directories from a URL"""
# try:
# response = requests.get(url, verify=False, timeout=10)
# soup = BeautifulSoup(response.text, "html.parser")
# return [item.get_text() for item in soup.select("tr td a")]
# except Exception as e:
# print(f"Error accessing {url}: {e}")
# return []
# def search_document(self, doc_id: str, release=None):
# original_id = doc_id
# if original_id in self.indexer:
# return self.indexer[original_id]
# for doc in self.indexer:
# if doc.startswith(original_id):
# return self.indexer[doc]
# # 2. Recherche live "classique" (TSG/CT)
# main_tsg, workgroup, doc = self.get_workgroup(doc_id)
# if main_tsg:
# wg_url = self.find_workgroup_url(main_tsg, workgroup)
# if wg_url:
# meeting_folders = self.get_docs_from_url(wg_url)
# for folder in meeting_folders:
# meeting_url = f"{wg_url}/{folder}"
# meeting_contents = self.get_docs_from_url(meeting_url)
# key = "docs" if "docs" in [x.lower() for x in meeting_contents] else "tdocs" if "tdocs" in [x.lower() for x in meeting_contents] else None
# if key is not None:
# docs_url = f"{meeting_url}/{key}"
# files = self.get_docs_from_url(docs_url)
# for file in files:
# if doc in file.lower() or original_id in file:
# doc_url = f"{docs_url}/{file}"
# self.indexer[original_id] = doc_url
# return doc_url
# # ZIP subfolder
# if "zip" in [x for x in files]:
# zip_url = f"{docs_url}/zip"
# zip_files = self.get_docs_from_url(zip_url)
# for file in zip_files:
# if doc in file.lower() or original_id in file:
# doc_url = f"{zip_url}/{file}"
# self.indexer[original_id] = doc_url
# self.save_indexer()
# return doc_url
# # 3. Dernier recours : tenter dans /ftp/workshop (recherche live)
# workshop_url = f"{self.main_ftp_url}/workshop"
# meetings = self.get_docs_from_url(workshop_url)
# for meeting in meetings:
# if meeting in ['./', '../']:
# continue
# meeting_url = f"{workshop_url}/{meeting}"
# contents = self.get_docs_from_url(meeting_url)
# for sub in contents:
# if sub.lower() in ['docs', 'tdocs']:
# docs_url = f"{meeting_url}/{sub}"
# files = self.get_docs_from_url(docs_url)
# for file in files:
# if doc_id.lower() in file.lower() or original_id in file:
# doc_url = f"{docs_url}/{file}"
# self.indexer[original_id] = doc_url
# self.save_indexer()
# return doc_url
# if "zip" in [x.lower() for x in files]:
# zip_url = f"{docs_url}/zip"
# zip_files = self.get_docs_from_url(zip_url)
# for file in zip_files:
# if doc_id.lower() in file.lower() or original_id in file:
# doc_url = f"{zip_url}/{file}"
# self.indexer[original_id] = doc_url
# self.save_indexer()
# return doc_url
# return f"Document {doc_id} not found"
@app.get("/")
async def main_menu():
return FileResponse(os.path.join("templates", "index.html"))
# @app.post("/search-spec", response_model=KeywordResponse)
# def search_spec(request: KeywordRequest):
# start_time = time.time()
# kws = [_.lower() for _ in request.keywords.split(" ")]
# results = []
# for string, spec in finder_spec.indexer_specs.items():
# if request.mode == "and":
# if not all(kw in string.lower() for kw in kws):
# continue
# elif request.mode == "or":
# if not any(kw in string.lower() for kw in kws):
# continue
# release = request.release
# working_group = request.wg
# spec_type = request.spec_type
# if spec.get('version', None) is None or (release is not None and spec["version"].split(".")[0] != str(release)):
# continue
# if spec.get('working_group', None) is None or (working_group is not None and spec["working_group"] != working_group):
# continue
# if spec_type is not None and spec["type"] != spec_type:
# continue
# results.append(spec)
# if len(results) > 0:
# return KeywordResponse(
# results=results,
# search_time=time.time() - start_time
# )
# else:
# raise HTTPException(status_code=404, detail="Specifications not found")
finder = DocFinder()
@app.post("/find", response_model=DocResponse)
def find_document(request: DocRequest):
start_time = time.time()
result = finder.search_document(request.doc_id)
if "not found" not in result and "Could not" not in result and "Unable" not in result:
return DocResponse(
doc_id=request.doc_id,
url=result,
search_time=time.time() - start_time
)
else:
raise HTTPException(status_code=404, detail=result)
# @app.post("/batch", response_model=BatchDocResponse)
# def find_documents_batch(request: BatchDocRequest):
# start_time = time.time()
# results = {}
# missing = []
# for doc_id in request.doc_ids:
# finder = finder_tsg if doc_id[0].isalpha() else finder_spec
# result = finder.search_document(doc_id)
# if "not found" not in result and "Could not" not in result and "Unable" not in result:
# results[doc_id] = result
# else:
# missing.append(doc_id)
# return BatchDocResponse(
# results=results,
# missing=missing,
# search_time=time.time() - start_time
# )