from threading import Thread from copy import deepcopy from uuid import uuid4 from time import sleep import json import time import re import os def syncmethod(func): def function(*args, **kwargs): th = Thread(target=func, args=args, kwargs=kwargs) th.start() return function @syncmethod def interval(fc, time): while True: sleep(time) fc() class FileManager: def __init__(self): super(FileManager, self).__init__() self.data_file = "collector_data.json" # self.base_url = "http://localhost:7860" self.base_url = "https://astraos-nx-storage.hf.space" if not os.path.exists(self.data_file): self.writeJson() else: self.checkFiles() def formatPath(self, path): new_path = re.sub(r'\s+|\s', '-', path) new_path = re.sub(r'\!|\?|\#|\*|Ç|ç|ê|Ê|ã|Ã', '', new_path) return new_path def rename(self, path, new_path): os.rename(path, new_path) def readJson(self, path): with open(path, 'r') as json_data: return json.load(json_data) def writeJson(self, data={}): data_file = open(self.data_file, "w") json.dump(data, data_file, indent=2) data_file.close() def start(self, gap=5): interval(self.checkFiles, gap * 60) def checkFiles(self): data_file = self.readJson(self.data_file) for idx, file_infor in enumerate(deepcopy(data_file)): if file_infor["expiresAt"] < time.time(): try: os.remove(file_infor["path"]) except Exception as e: print(e) del data_file[idx] self.writeJson(data=data_file) def reValidate(self, path, expiresAt=60): data_file = self.readJson(self.data_file) for file_infor in data_file: if file_infor["path"] == path: file_infor["expiresAt"] = time.time() + (expiresAt * 60) self.writeJson(data=data_file) def addPath(self, path, expiresAt=60): new_path = self.formatPath(path) self.rename(path, new_path) data_file = self.readJson(self.data_file) fc = lambda file_infor: file_infor["path"] != new_path data_file = list(filter(fc, data_file)) new_file_infor = { "path": new_path, "filename": new_path.split("/")[-1], "id": str(uuid4()), "expiresAt": time.time() + (expiresAt * 60) # + 30 minutes } data_file.append(new_file_infor) self.writeJson(data=data_file) return new_file_infor def hasFile(self, path): path = self.formatPath(path) data_file = self.readJson(self.data_file) for file_infor in data_file: if file_infor["path"] == path: self.reValidate(path) return True def saveFile(self, data, filename): filename = self.formatPath(filename) file_path = f"fl/{str(uuid4())}_{filename}" with open(file_path, "wb") as file: file.write(data) return self.addPath(file_path) def getFileObjByPath(self, path): path = self.formatPath(path) data_file = self.readJson(self.data_file) for file_infor in data_file: if file_infor["path"] == path: self.reValidate(path) return file_infor def getFileObjById(self, uid): data_file = self.readJson(self.data_file) for file_infor in data_file: if file_infor["id"] == uid: self.reValidate(file_infor["path"]) return file_infor def getFileById(self, uid): file_path = self.getFilePathById(uid) if not file_path: return with open(file_path, "rb") as file: file_data = file.read() return file_data def getFileUrlById(self, uid): file_path = self.getFilePathById(uid) if not file_path: return return self.getFileUrlByPath(file_path) def getFileUrlByPath(self, path): return f"{self.base_url}/file=./{path}"