Spaces:
Runtime error
Runtime error
| from threading import Thread | |
| from copy import deepcopy | |
| from uuid import uuid4 | |
| from time import sleep | |
| import json | |
| import time | |
| import re | |
| import os | |
| def syncmethod(func): | |
| def function(*args, **kwargs): | |
| th = Thread(target=func, args=args, kwargs=kwargs) | |
| th.start() | |
| return function | |
| def interval(fc, time): | |
| while True: | |
| sleep(time) | |
| fc() | |
| class FileManager: | |
| def __init__(self): | |
| super(FileManager, self).__init__() | |
| self.data_file = "collector_data.json" | |
| # self.base_url = "http://localhost:7860" | |
| self.base_url = "https://astraos-nx-storage.hf.space" | |
| if not os.path.exists(self.data_file): self.writeJson() | |
| else: self.checkFiles() | |
| def formatPath(self, path): | |
| new_path = re.sub(r'\s+|\s', '-', path) | |
| new_path = re.sub(r'\!|\?|\#|\*|Ç|ç|ê|Ê|ã|Ã', '', new_path) | |
| return new_path | |
| def rename(self, path, new_path): | |
| os.rename(path, new_path) | |
| def readJson(self, path): | |
| with open(path, 'r') as json_data: | |
| return json.load(json_data) | |
| def writeJson(self, data={}): | |
| data_file = open(self.data_file, "w") | |
| json.dump(data, data_file, indent=2) | |
| data_file.close() | |
| def start(self, gap=5): | |
| interval(self.checkFiles, gap * 60) | |
| def checkFiles(self): | |
| data_file = self.readJson(self.data_file) | |
| for idx, file_infor in enumerate(deepcopy(data_file)): | |
| if file_infor["expiresAt"] < time.time(): | |
| try: | |
| os.remove(file_infor["path"]) | |
| except Exception as e: | |
| print(e) | |
| del data_file[idx] | |
| self.writeJson(data=data_file) | |
| def reValidate(self, path, expiresAt=60): | |
| data_file = self.readJson(self.data_file) | |
| for file_infor in data_file: | |
| if file_infor["path"] == path: | |
| file_infor["expiresAt"] = time.time() + (expiresAt * 60) | |
| self.writeJson(data=data_file) | |
| def addPath(self, path, expiresAt=60): | |
| new_path = self.formatPath(path) | |
| self.rename(path, new_path) | |
| data_file = self.readJson(self.data_file) | |
| fc = lambda file_infor: file_infor["path"] != new_path | |
| data_file = list(filter(fc, data_file)) | |
| new_file_infor = { | |
| "path": new_path, | |
| "filename": new_path.split("/")[-1], | |
| "id": str(uuid4()), | |
| "expiresAt": time.time() + (expiresAt * 60) # + 30 minutes | |
| } | |
| data_file.append(new_file_infor) | |
| self.writeJson(data=data_file) | |
| return new_file_infor | |
| def hasFile(self, path): | |
| path = self.formatPath(path) | |
| data_file = self.readJson(self.data_file) | |
| for file_infor in data_file: | |
| if file_infor["path"] == path: | |
| self.reValidate(path) | |
| return True | |
| def saveFile(self, data, filename): | |
| filename = self.formatPath(filename) | |
| file_path = f"fl/{str(uuid4())}_{filename}" | |
| with open(file_path, "wb") as file: | |
| file.write(data) | |
| return self.addPath(file_path) | |
| def getFileObjByPath(self, path): | |
| path = self.formatPath(path) | |
| data_file = self.readJson(self.data_file) | |
| for file_infor in data_file: | |
| if file_infor["path"] == path: | |
| self.reValidate(path) | |
| return file_infor | |
| def getFileObjById(self, uid): | |
| data_file = self.readJson(self.data_file) | |
| for file_infor in data_file: | |
| if file_infor["id"] == uid: | |
| self.reValidate(file_infor["path"]) | |
| return file_infor | |
| def getFileById(self, uid): | |
| file_path = self.getFilePathById(uid) | |
| if not file_path: return | |
| with open(file_path, "rb") as file: | |
| file_data = file.read() | |
| return file_data | |
| def getFileUrlById(self, uid): | |
| file_path = self.getFilePathById(uid) | |
| if not file_path: return | |
| return self.getFileUrlByPath(file_path) | |
| def getFileUrlByPath(self, path): | |
| return f"{self.base_url}/file=./{path}" | |