Spaces:
Runtime error
Runtime error
File size: 4,574 Bytes
56b6519 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import ssl
import hashlib
import requests
import os
import shutil
import zipfile
from pathlib import Path
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from transformers import pipeline
from inferencer import inferencer
from cvss_inferencer import inferencer as cvss_inferencer
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Permitir solicitudes desde cualquier origen
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"], # Métodos HTTP permitidos
allow_headers=["*"], # Cabeceras permitidas
)
LOCAL_FILE_PATH = Path("/app/modelo_cwe.zip")
REMOTE_FILE_URL = "https://drive.usercontent.google.com/download?id=1OtRNObv-Il2B5nDnpzMSGj_yBJAlskuS&export=download&confirm="
CWE_MODEL_FOLDER_PATH = Path("/app/modelo_cwe")
APP_PATH = Path("/app")
BYTE_RANGE = (0, 10485759) # 10 MB
def calculate_local_checksum(file_path, byte_limit):
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
chunk = f.read(byte_limit)
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
def calculate_remote_checksum(url, byte_range):
headers = {'Range': f'bytes={byte_range[0]}-{byte_range[1]}'}
try:
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise Exception(f"Error al obtener el archivo remoto: {str(e)}") from e
sha256_hash = hashlib.sha256()
for chunk in response.iter_content(chunk_size=8192):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
class VulnerabilityRequest(BaseModel):
vuln: str
@app.post("/classify")
async def classify_vulnerability(vuln_request: VulnerabilityRequest):
vuln = vuln_request.vuln
result = inferencer(vuln)
return {"result": result}
@app.post("/cvss")
async def classify_vulnerability(vuln_request: VulnerabilityRequest):
vuln = vuln_request.vuln
result = cvss_inferencer(vuln)
return {"result": result}
@app.get("/")
async def read_root():
example_vuln = (
"Los dispositivos de CPU Siemens SIMATIC S7-300 permiten a los atacantes remotos causar una denegación de servicio "
"(transición de modo de defecto) a través de paquetes elaborados en (1) puerto TCP 102 o (2) Profibus."
)
result = inferencer(example_vuln)
return {"example_vuln": example_vuln, "result": result}
@app.get("/check_cwe_update")
async def check_cwe_update():
try:
local_checksum = calculate_local_checksum(LOCAL_FILE_PATH, byte_limit=BYTE_RANGE[1] + 1)
remote_checksum = calculate_remote_checksum(REMOTE_FILE_URL, BYTE_RANGE)
match = local_checksum == remote_checksum
return {"checksum_match": match}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/update_cwe_model")
async def update_cwe_model():
backup_file_path = f"{LOCAL_FILE_PATH}.bak"
try:
if os.path.exists(LOCAL_FILE_PATH):
if os.path.exists(backup_file_path):
os.remove(backup_file_path)
shutil.move(LOCAL_FILE_PATH, backup_file_path)
response = requests.get(REMOTE_FILE_URL, stream=True)
if response.status_code == 200:
with open(LOCAL_FILE_PATH, "wb") as local_zip:
for chunk in response.iter_content(chunk_size=8192):
local_zip.write(chunk)
else:
if os.path.exists(backup_file_path):
shutil.move(backup_file_path, LOCAL_FILE_PATH)
raise HTTPException(status_code=500, detail="Failed to download CWE model")
if os.path.exists(CWE_MODEL_FOLDER_PATH):
shutil.rmtree(CWE_MODEL_FOLDER_PATH)
with zipfile.ZipFile(LOCAL_FILE_PATH, 'r') as zip_ref:
zip_ref.extractall(APP_PATH)
if os.path.exists(backup_file_path):
os.remove(backup_file_path)
return {"status": "success", "message": "Updated CWE model successfully"}
except Exception as e:
if os.path.exists(backup_file_path):
shutil.move(backup_file_path, LOCAL_FILE_PATH)
raise HTTPException(status_code=500, detail=str(e)) from e
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
ssl_keyfile="ssl/key.pem",
ssl_certfile="ssl/cert.pem",
ssl_version=ssl.PROTOCOL_TLS_SERVER
)
|