Spaces:
Build error
Build error
import time | |
import threading | |
import subprocess | |
from typing import List, Union | |
class Channel: | |
def __init__(self, source, destination, sync_deletions=False, every=60, exclude: Union[str, List, None] = None): | |
self.source = source | |
self.destination = destination | |
self.event = threading.Event() | |
self.syncing_thread = threading.Thread(target=self._sync, args=()) | |
self.sync_deletions = sync_deletions | |
self.every = every | |
if not exclude: exclude = [] | |
if isinstance(exclude,str): exclude = [exclude] | |
self.exclude = exclude | |
self.command = ['rsync', '-aP'] | |
def alive(self): | |
if self.syncing_thread.is_alive(): return True | |
else: return False | |
def _sync(self): | |
command = self.command | |
for exclusion in self.exclude: | |
command.append(f'--exclude={exclusion}') | |
command.extend([f'{self.source}/', f'{self.destination}/']) | |
if self.sync_deletions: command.append('--delete') | |
while not self.event.is_set(): | |
subprocess.run(command) | |
time.sleep(self.every) | |
def copy(self): | |
command = self.command | |
for exclusion in self.exclude: | |
command.append(f'--exclude={exclusion}') | |
command.extend([f'{self.source}/', f'{self.destination}/']) | |
if self.sync_deletions: command.append('--delete') | |
subprocess.run(command) | |
return True | |
def start(self): | |
if self.syncing_thread.is_alive(): | |
self.event.set() | |
self.syncing_thread.join() | |
if self.event.is_set(): self.event.clear() | |
if self.syncing_thread._started.is_set(): self.syncing_thread = threading.Thread(target=self._sync, args=()) | |
self.syncing_thread.start() | |
return self.alive() | |
def stop(self): | |
if self.alive(): | |
self.event.set() | |
self.syncing_thread.join() | |
while self.alive(): | |
if not self.alive(): break | |
return not self.alive() |