Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException, Form,BackgroundTasks | |
| from fastapi.responses import FileResponse | |
| import shutil,os,uuid,asyncio | |
| from zipfile import ZipFile | |
| from typing import List | |
| app = FastAPI() | |
| # Get the secret key from environment variable | |
| SECRET_KEY = "123" | |
| def cleanup_directory(path: str): | |
| shutil.rmtree(path) | |
| def upload_files( | |
| background_tasks: BackgroundTasks, | |
| files: List[UploadFile] = File(...), | |
| api_key: str = Form(...), | |
| percentage: int = Form(...), | |
| job_description : str = Form(...)): | |
| if api_key != SECRET_KEY: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| if not (5 <= len(files) <= 900): | |
| raise HTTPException(status_code=400, detail="Number of files must be between 20 and 900.") | |
| if percentage < 0 or percentage > 100: | |
| raise HTTPException(status_code=400, detail="Percentage must be between 0 and 100.") | |
| # Validate all files are PDFs | |
| for file in files: | |
| if not file.filename.endswith(".pdf"): | |
| raise HTTPException(status_code=400, detail="All files must be PDF.") | |
| pdfs_paths = [] | |
| main_path = f"data/resumes/{str(uuid.uuid4())}" | |
| # Create the directory if it doesn't exist | |
| os.makedirs(main_path, exist_ok=True) | |
| for file in files: | |
| file_location = f"{main_path}/{file.filename}" | |
| with open(file_location, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| pdfs_paths.append(file_location) | |
| try: | |
| pdfs_path_with_description = [{"pdf_path":pdf_path,"job_des":job_description} for pdf_path in pdfs_paths] | |
| from app import compression | |
| # short_listed_files_paths = asyncio.run(compression(pdfs_path_with_description,percentage)) | |
| short_listed_files_paths = compression(pdfs_path_with_description,percentage) | |
| zip_filename = f"{main_path}/{str(uuid.uuid4())}.zip" | |
| with ZipFile(zip_filename, 'w') as zipf: | |
| for file_path in short_listed_files_paths: | |
| zipf.write(file_path, os.path.basename(file_path)) | |
| # Return the file response and clean up after sending | |
| background_tasks.add_task(cleanup_directory, main_path) | |
| return FileResponse(zip_filename, media_type='application/zip', filename=zip_filename) | |
| except Exception as e: | |
| # logging.error(f"Error occurred: {str(e)}") | |
| cleanup_directory(main_path) | |
| raise HTTPException(status_code=500, detail=str(e)) |