Joshua Lochner commited on
Commit
0e18e8c
·
1 Parent(s): 086ca93

Remove unused utilities

Browse files
Files changed (1) hide show
  1. src/utils.py +0 -166
src/utils.py CHANGED
@@ -1,10 +1,5 @@
1
  import re
2
- import os
3
- import signal
4
  import logging
5
- from time import sleep, time
6
- from multiprocessing import JoinableQueue, Event, Process
7
- from queue import Empty
8
 
9
  logger = logging.getLogger(__name__)
10
 
@@ -13,167 +8,6 @@ def re_findall(pattern, string):
13
  return [m.groupdict() for m in re.finditer(pattern, string)]
14
 
15
 
16
- class Task:
17
- def __init__(self, function, *args, **kwargs) -> None:
18
- self.function = function
19
- self.args = args
20
- self.kwargs = kwargs
21
-
22
- def run(self):
23
- return self.function(*self.args, **self.kwargs)
24
-
25
-
26
- class CallbackGenerator:
27
- def __init__(self, generator, callback):
28
- self.generator = generator
29
- self.callback = callback
30
-
31
- def __iter__(self):
32
- if self.callback is not None and callable(self.callback):
33
- for t in self.generator:
34
- self.callback(t)
35
- yield t
36
- else:
37
- yield from self.generator
38
-
39
-
40
- def start_worker(q: JoinableQueue, stop_event: Event): # TODO make class?
41
- logger.info('Starting worker...')
42
- while True:
43
- if stop_event.is_set():
44
- logger.info('Worker exiting because of stop_event')
45
- break
46
- # We set a timeout so we loop past 'stop_event' even if the queue is empty
47
- try:
48
- task = q.get(timeout=.01)
49
- except Empty:
50
- # Run next iteration of loop
51
- continue
52
-
53
- # Exit if end of queue
54
- if task is None:
55
- logger.info('Worker exiting because of None on queue')
56
- q.task_done()
57
- break
58
-
59
- try:
60
- task.run() # Do the task
61
- except: # Will also catch KeyboardInterrupt
62
- logger.exception(f'Failed to process task {task}', )
63
- # Can implement some kind of retry handling here
64
- finally:
65
- q.task_done()
66
-
67
-
68
- class InterruptibleTaskPool:
69
-
70
- # https://the-fonz.gitlab.io/posts/python-multiprocessing/
71
- def __init__(self,
72
- tasks=None,
73
- num_workers=None,
74
-
75
- callback=None, # Fired on start
76
- max_queue_size=1,
77
- grace_period=2,
78
- kill_period=30,
79
- ):
80
-
81
- self.tasks = CallbackGenerator(
82
- [] if tasks is None else tasks, callback)
83
- self.num_workers = os.cpu_count() if num_workers is None else num_workers
84
-
85
- self.max_queue_size = max_queue_size
86
- self.grace_period = grace_period
87
- self.kill_period = kill_period
88
-
89
- # The JoinableQueue has an internal counter that increments when an item is put on the queue and
90
- # decrements when q.task_done() is called. This allows us to wait until it's empty using .join()
91
- self.queue = JoinableQueue(maxsize=self.max_queue_size)
92
- # This is a process-safe version of the 'panic' variable shown above
93
- self.stop_event = Event()
94
-
95
- # n_workers: Start this many processes
96
- # max_queue_size: If queue exceeds this size, block when putting items on the queue
97
- # grace_period: Send SIGINT to processes if they don't exit within this time after SIGINT/SIGTERM
98
- # kill_period: Send SIGKILL to processes if they don't exit after this many seconds
99
-
100
- # self.on_task_complete = on_task_complete
101
- # self.raise_after_interrupt = raise_after_interrupt
102
-
103
- def __enter__(self):
104
- self.start()
105
- return self
106
-
107
- def __exit__(self, exc_type, exc_value, exc_traceback):
108
- pass
109
-
110
- def start(self) -> None:
111
- def handler(signalname):
112
- """
113
- Python 3.9 has `signal.strsignal(signalnum)` so this closure would not be needed.
114
- Also, 3.8 includes `signal.valid_signals()` that can be used to create a mapping for the same purpose.
115
- """
116
- def f(signal_received, frame):
117
- raise KeyboardInterrupt(f'{signalname} received')
118
- return f
119
-
120
- # This will be inherited by the child process if it is forked (not spawned)
121
- signal.signal(signal.SIGINT, handler('SIGINT'))
122
- signal.signal(signal.SIGTERM, handler('SIGTERM'))
123
-
124
- procs = []
125
-
126
- for i in range(self.num_workers):
127
- # Make it a daemon process so it is definitely terminated when this process exits,
128
- # might be overkill but is a nice feature. See
129
- # https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.Process.daemon
130
- p = Process(name=f'Worker-{i:02d}', daemon=True,
131
- target=start_worker, args=(self.queue, self.stop_event))
132
- procs.append(p)
133
- p.start()
134
-
135
- try:
136
- # Put tasks on queue
137
- for task in self.tasks:
138
- logger.info(f'Put task {task} on queue')
139
- self.queue.put(task)
140
-
141
- # Put exit tasks on queue
142
- for i in range(self.num_workers):
143
- self.queue.put(None)
144
-
145
- # Wait until all tasks are processed
146
- self.queue.join()
147
-
148
- except KeyboardInterrupt:
149
- logger.warning('Caught KeyboardInterrupt! Setting stop event...')
150
- # raise # TODO add option
151
- finally:
152
- self.stop_event.set()
153
- t = time()
154
- # Send SIGINT if process doesn't exit quickly enough, and kill it as last resort
155
- # .is_alive() also implicitly joins the process (good practice in linux)
156
- while True:
157
- alive_procs = [p for p in procs if p.is_alive()]
158
- if not alive_procs:
159
- break
160
- if time() > t + self.grace_period:
161
- for p in alive_procs:
162
- os.kill(p.pid, signal.SIGINT)
163
- logger.warning(f'Sending SIGINT to {p}')
164
- elif time() > t + self.kill_period:
165
- for p in alive_procs:
166
- logger.warning(f'Sending SIGKILL to {p}')
167
- # Queues and other inter-process communication primitives can break when
168
- # process is killed, but we don't care here
169
- p.kill()
170
- sleep(.01)
171
-
172
- sleep(.1)
173
- for p in procs:
174
- logger.info(f'Process status: {p}')
175
-
176
-
177
  def jaccard(x1, x2, y1, y2):
178
  # Calculate jaccard index
179
  intersection = max(0, min(x2, y2)-max(x1, y1))
 
1
  import re
 
 
2
  import logging
 
 
 
3
 
4
  logger = logging.getLogger(__name__)
5
 
 
8
  return [m.groupdict() for m in re.finditer(pattern, string)]
9
 
10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  def jaccard(x1, x2, y1, y2):
12
  # Calculate jaccard index
13
  intersection = max(0, min(x2, y2)-max(x1, y1))