Omar ID EL MOUMEN
Remove indexer options + add scope extractor functions + implemented on single document search
dab8149
from io import StringIO | |
import numpy as np | |
import pandas as pd | |
import requests | |
from bs4 import BeautifulSoup | |
import json | |
import os | |
import pymupdf as fitz | |
import uuid | |
import zipfile | |
import io | |
import subprocess | |
import os | |
import re | |
import time | |
from datetime import datetime | |
from dotenv import load_dotenv | |
import warnings | |
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import FileResponse | |
from fastapi.staticfiles import StaticFiles | |
from pydantic import BaseModel | |
from typing import Any, Dict, List, Literal, Optional | |
load_dotenv() | |
warnings.filterwarnings("ignore") | |
app = FastAPI(title="3GPP Document Finder API", | |
description="API to find 3GPP documents based on TSG document IDs") | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
origins = [ | |
"*", | |
] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
def get_pdf_bytes(specification: str, version: str): | |
doc_id = specification | |
series = doc_id.split(".")[0] | |
response = requests.get(f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", verify=False) | |
if response.status_code != 200: | |
raise Exception("Téléchargement du ZIP échoué") | |
zip_bytes = io.BytesIO(response.content) | |
with zipfile.ZipFile(zip_bytes) as zf: | |
for file_name in zf.namelist(): | |
if file_name.endswith("doc") or file_name.endswith("docx"): | |
ext = file_name.split(".")[-1] | |
doc_bytes = zf.read(file_name) | |
temp_id = str(uuid.uuid4()) | |
input_path = f"/tmp/{temp_id}.{ext}" | |
output_path = f"/tmp/{temp_id}.pdf" | |
with open(input_path, "wb") as f: | |
f.write(doc_bytes) | |
subprocess.run([ | |
"libreoffice", | |
"--headless", | |
"--convert-to", "pdf", | |
"--outdir", "/tmp", | |
input_path | |
], check=True) | |
with open(output_path, "rb") as f: | |
pdf_data = f.read() | |
os.remove(input_path) | |
os.remove(output_path) | |
return io.BytesIO(pdf_data) | |
raise Exception("Aucun fichier .doc/.docx trouvé dans le ZIP") | |
def get_scope(specification: str, version: str): | |
pdf_bytes = get_pdf_bytes(specification, version) | |
doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
for content in doc.get_toc(): | |
if "scope" in content[1].lower(): | |
page_num = content[2] - 1 | |
break | |
doc = doc[page_num:] | |
pdf_full_text = " ".join(page.get_text("text") for page in doc) | |
pdf_postprocess_text = re.sub(r"\s+", " ", pdf_full_text) | |
pdf_postprocess_text = pdf_postprocess_text.replace("1 Scope", " !-! ") | |
pdf_postprocess_text = pdf_postprocess_text.replace("2 Reference", " !-! ") | |
pdf_postprocess_text = pdf_postprocess_text.replace("", "- ") | |
return pdf_postprocess_text.split(" !-! ")[1] | |
class DocRequest(BaseModel): | |
doc_id: str | |
release: Optional[int] = None | |
class DocResponse(BaseModel): | |
doc_id: str | |
url: str | |
scope: Optional[str] = None | |
search_time: float | |
class BatchDocRequest(BaseModel): | |
doc_ids: List[str] | |
release: Optional[int] = None | |
class BatchDocResponse(BaseModel): | |
results: Dict[str, str] | |
missing: List[str] | |
search_time: float | |
class KeywordRequest(BaseModel): | |
keywords: str | |
release: Optional[str] = None | |
wg: Optional[str] = None | |
spec_type: Optional[Literal["TS", "TR"]] = None | |
mode: Optional[Literal["and", "or"]] = "and" | |
class KeywordResponse(BaseModel): | |
results: List[Dict[str, str]] | |
search_time: float | |
class TsgDocFinder: | |
def __init__(self): | |
self.main_ftp_url = "https://www.3gpp.org/ftp" | |
self.indexer_file = "indexed_docs.json" | |
self.indexer, self.last_indexer_date = self.load_indexer() | |
def load_indexer(self): | |
"""Load existing index if available""" | |
if os.path.exists(self.indexer_file): | |
with open(self.indexer_file, "r", encoding="utf-8") as f: | |
x = json.load(f) | |
return x["docs"], x["last_indexed_date"] | |
return {}, None | |
def save_indexer(self): | |
"""Save the updated index""" | |
with open(self.indexer_file, "w", encoding="utf-8") as f: | |
today = datetime.today() | |
output = {"docs": self.indexer, "last_indexed_date": today.strftime("%d/%m/%Y-%H:%M:%S")} | |
json.dump(output, f, indent=4, ensure_ascii=False) | |
def get_workgroup(self, doc): | |
main_tsg = "tsg_ct" if doc[0] == "C" else "tsg_sa" if doc[0] == "S" else None | |
if main_tsg is None: | |
return None, None, None | |
workgroup = f"WG{int(doc[1])}" if doc[1].isnumeric() else main_tsg.upper() | |
return main_tsg, workgroup, doc | |
def find_workgroup_url(self, main_tsg, workgroup): | |
"""Find the URL for the specific workgroup""" | |
response = requests.get(f"{self.main_ftp_url}/{main_tsg}", verify=False) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for item in soup.find_all("tr"): | |
link = item.find("a") | |
if link and workgroup in link.get_text(): | |
return f"{self.main_ftp_url}/{main_tsg}/{link.get_text()}" | |
return f"{self.main_ftp_url}/{main_tsg}/{workgroup}" | |
def get_docs_from_url(self, url): | |
"""Get list of documents/directories from a URL""" | |
try: | |
response = requests.get(url, verify=False, timeout=10) | |
soup = BeautifulSoup(response.text, "html.parser") | |
return [item.get_text() for item in soup.select("tr td a")] | |
except Exception as e: | |
print(f"Error accessing {url}: {e}") | |
return [] | |
def search_document(self, doc_id: str, release = None): | |
"""Search for a specific document by its ID""" | |
original_id = doc_id | |
# Check if already indexed | |
if original_id in self.indexer: | |
return self.indexer[original_id] | |
for doc in self.indexer: | |
if doc.startswith(original_id): | |
return self.indexer[doc] | |
# Parse the document ID | |
main_tsg, workgroup, doc = self.get_workgroup(doc_id) | |
if not main_tsg: | |
return f"Could not parse document ID: {doc_id}" | |
print(f"Searching for {original_id} (parsed as {doc}) in {main_tsg}/{workgroup}...") | |
# Find the workgroup URL | |
wg_url = self.find_workgroup_url(main_tsg, workgroup) | |
if not wg_url: | |
return f"Could not find workgroup for {doc_id}" | |
# Search in the workgroup directories | |
meeting_folders = self.get_docs_from_url(wg_url) | |
for folder in meeting_folders: | |
meeting_url = f"{wg_url}/{folder}" | |
meeting_contents = self.get_docs_from_url(meeting_url) | |
key = "docs" if "docs" in [x.lower() for x in meeting_contents] else "tdocs" if "tdocs" in [x.lower() for x in meeting_contents] else None | |
if key is not None: | |
docs_url = f"{meeting_url}/{key}" | |
print(f"Checking {docs_url}...") | |
files = self.get_docs_from_url(docs_url) | |
# Check for the document in the main Docs folder | |
for file in files: | |
if doc in file.lower() or original_id in file: | |
doc_url = f"{docs_url}/{file}" | |
self.indexer[original_id] = doc_url | |
return doc_url | |
# Check in ZIP subfolder if it exists | |
if "zip" in [x for x in files]: | |
zip_url = f"{docs_url}/zip" | |
print(f"Checking {zip_url}...") | |
zip_files = self.get_docs_from_url(zip_url) | |
for file in zip_files: | |
if doc in file.lower() or original_id in file: | |
doc_url = f"{zip_url}/{file}" | |
self.indexer[original_id] = doc_url | |
self.save_indexer() | |
return doc_url | |
return f"Document {doc_id} not found" | |
class SpecDocFinder: | |
def __init__(self): | |
self.chars = "0123456789abcdefghijklmnopqrstuvwxyz" | |
def search_document(self, doc_id, release = None): | |
series = doc_id.split(".")[0] | |
while len(series) < 2: | |
series = "0" + series | |
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}" | |
response = requests.get(url, verify=False) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
items = soup.find_all("tr")[1:] | |
version_found = None | |
if release is None: | |
try: | |
item = items[-1].find("a") | |
except Exception as e: | |
print(e) | |
return f"Unable to find specification {doc_id}" | |
a, b, c = [_ for _ in item.get_text().split("-")[-1].replace(".zip", "")] | |
version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" | |
version_found = (version, item.get("href")) | |
_, spec_url = version_found | |
return spec_url if version_found is not None else f"Specification {doc_id} not found" | |
else: | |
for item in items: | |
x = item.find("a") | |
if f"{doc_id.replace('.', '')}-{self.chars[int(release)]}" in x.get_text(): | |
a, b, c = [_ for _ in x.get_text().split("-")[-1].replace(".zip", "")] | |
version = f"{self.chars.index(a)}.{self.chars.index(b)}.{self.chars.index(c)}" | |
version_found = (version, x.get("href")) | |
_, spec_url = version_found | |
return spec_url if version_found is not None else f"Specification {doc_id} not found" | |
finder_tsg = TsgDocFinder() | |
finder_spec = SpecDocFinder() | |
async def main_menu(): | |
return FileResponse(os.path.join("templates", "index.html")) | |
def search_spec(request: KeywordRequest): | |
chars = "0123456789abcdefghijklmnopqrstuvwxyz" | |
start_time = time.time() | |
response = requests.get(f'https://www.3gpp.org/dynareport?code=status-report.htm', headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, verify=False) | |
dfs = pd.read_html(StringIO(response.text), storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, encoding="utf-8") | |
for x in range(len(dfs)): | |
dfs[x] = dfs[x].replace({np.nan: None}) | |
columns_needed = [0, 1, 2, 3, 4] | |
extracted_dfs: List[pd.DataFrame] = [df.iloc[:, columns_needed] for df in dfs] | |
columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] | |
specifications = [] | |
for df in extracted_dfs: | |
for index, row in df.iterrows(): | |
doc = row.to_list() | |
doc_dict = dict(zip(columns, doc)) | |
specifications.append(doc_dict) | |
kws = [_.lower() for _ in request.keywords.split(" ")] | |
results = [] | |
for spec in specifications: | |
if request.mode == "and": | |
if not all(kw in spec["title"].lower() for kw in kws): | |
continue | |
elif request.mode == "or": | |
if not any(kw in spec["title"].lower() for kw in kws): | |
continue | |
release = request.release | |
working_group = request.wg | |
spec_type = request.spec_type | |
if spec.get('vers', None) is None or (release is not None and spec["vers"].split(".")[0] != str(release)): | |
continue | |
if spec.get('WG', None) is None or (working_group is not None and spec["WG"] != working_group): | |
continue | |
if spec_type is not None and spec["type"] != spec_type: | |
continue | |
doc_id = str(spec["spec_num"]) | |
series = doc_id.split(".")[0] | |
a, b, c = str(spec["vers"]).split(".") | |
print(spec["vers"]) | |
if not (int(a) > 35 or int(b) > 35 or int(c) > 35): | |
spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{chars[int(a)]}{chars[int(b)]}{chars[int(c)]}.zip" | |
else: | |
x,y,z = str(a), str(b), str(c) | |
while len(x) < 2: | |
x = "0" + x | |
while len(y) < 2: | |
y = "0" + y | |
while len(z) < 2: | |
z = "0" + z | |
spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{x}{y}{z}.zip" | |
results.append({ | |
"id": str(spec["spec_num"]), | |
"title": spec["title"], | |
"type": "Technical Specification" if spec["type"] == "TS" else "Technical Report", | |
"release": str(spec["vers"].split(".")[0]), | |
"version": str(spec["vers"]), | |
"working_group": spec["WG"], | |
"url": spec_url | |
}) | |
if len(results) > 0: | |
return KeywordResponse( | |
results=results, | |
search_time=time.time() - start_time | |
) | |
else: | |
raise HTTPException(status_code=404, detail="Specification not found") | |
def find_document(request: DocRequest): | |
start_time = time.time() | |
finder = finder_tsg if request.doc_id[0].isalpha() else finder_spec | |
print(finder) | |
result = finder.search_document(request.doc_id, request.release) | |
if "not found" not in result and "Could not" not in result and "Unable" not in result: | |
version = result.split("/")[-1].replace(".zip", "").split("-")[-1] | |
return DocResponse( | |
doc_id=request.doc_id, | |
url=result, | |
search_time=time.time() - start_time | |
) if isinstance(finder, TsgDocFinder) else DocResponse( | |
doc_id=request.doc_id, | |
url=result, | |
search_time=time.time() - start_time, | |
scope=get_scope(request.doc_id, version) | |
) | |
else: | |
raise HTTPException(status_code=404, detail=result) | |
def find_documents_batch(request: BatchDocRequest): | |
start_time = time.time() | |
results = {} | |
missing = [] | |
for doc_id in request.doc_ids: | |
finder = finder_tsg if doc_id[0].isalpha() else finder_spec | |
result = finder.search_document(doc_id) | |
if "not found" not in result and "Could not" not in result and "Unable" not in result: | |
results[doc_id] = result | |
else: | |
missing.append(doc_id) | |
return BatchDocResponse( | |
results=results, | |
missing=missing, | |
search_time=time.time() - start_time | |
) |