Spaces:
Sleeping
Sleeping
File size: 2,792 Bytes
fa525b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
from fastapi import FastAPI, File, UploadFile, HTTPException, Form,BackgroundTasks
from fastapi.responses import FileResponse
import shutil,os,uuid,asyncio
from zipfile import ZipFile
from typing import List
app = FastAPI()
import logging
# Configure logging
logging.basicConfig(
filename='app.log', # Log file path
level=logging.ERROR, # Minimum severity level to log (e.g., ERROR, WARNING, INFO)
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# Get the secret key from environment variable
SECRET_KEY = "123"
def cleanup_directory(path: str):
shutil.rmtree(path)
@app.post("/uploadfiles/")
def upload_files(
background_tasks: BackgroundTasks,
files: List[UploadFile] = File(...),
api_key: str = Form(...),
percentage: int = Form(...),
job_description : str = Form(...)):
if api_key != SECRET_KEY:
raise HTTPException(status_code=401, detail="Unauthorized")
if not (5 <= len(files) <= 900):
raise HTTPException(status_code=400, detail="Number of files must be between 20 and 900.")
if percentage < 0 or percentage > 100:
raise HTTPException(status_code=400, detail="Percentage must be between 0 and 100.")
# Validate all files are PDFs
for file in files:
if not file.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="All files must be PDF.")
pdfs_paths = []
main_path = f"data/resumes/{str(uuid.uuid4())}"
# Create the directory if it doesn't exist
os.makedirs(main_path, exist_ok=True)
for file in files:
file_location = f"{main_path}/{file.filename}"
with open(file_location, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
pdfs_paths.append(file_location)
try:
pdfs_path_with_description = [{"pdf_path":pdf_path,"job_des":job_description} for pdf_path in pdfs_paths]
from app import compression
# short_listed_files_paths = asyncio.run(compression(pdfs_path_with_description,percentage))
short_listed_files_paths = compression(pdfs_path_with_description,percentage)
zip_filename = f"{main_path}/{str(uuid.uuid4())}.zip"
with ZipFile(zip_filename, 'w') as zipf:
for file_path in short_listed_files_paths:
zipf.write(file_path, os.path.basename(file_path))
# Return the file response and clean up after sending
background_tasks.add_task(cleanup_directory, main_path)
return FileResponse(zip_filename, media_type='application/zip', filename=zip_filename)
except Exception as e:
logging.error(f"Error occurred: {str(e)}")
cleanup_directory(main_path)
raise HTTPException(status_code=500, detail=str(e)) |