Source code

Revision control

Other Tools

1
# This Source Code Form is subject to the terms of the Mozilla Public
2
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
3
# You can obtain one at http://mozilla.org/MPL/2.0/.
4
5
# flake8: noqa: F821
6
7
import fcntl
8
import os
9
import select
10
import time
11
from subprocess import Popen, PIPE
12
13
14
class TaskPool(object):
15
# Run a series of subprocesses. Try to keep up to a certain number going in
16
# parallel at any given time. Enforce time limits.
17
#
18
# This is implemented using non-blocking I/O, and so is Unix-specific.
19
#
20
# We assume that, if a task closes its standard error, then it's safe to
21
# wait for it to terminate. So an ill-behaved task that closes its standard
22
# output and then hangs will hang us, as well. However, as it takes special
23
# effort to close one's standard output, this seems unlikely to be a
24
# problem in practice.
25
26
# A task we should run in a subprocess. Users should subclass this and
27
# fill in the methods as given.
28
class Task(object):
29
def __init__(self):
30
self.pipe = None
31
self.start_time = None
32
33
# Record that this task is running, with |pipe| as its Popen object,
34
# and should time out at |deadline|.
35
def start(self, pipe, deadline):
36
self.pipe = pipe
37
self.deadline = deadline
38
39
# Return a shell command (a string or sequence of arguments) to be
40
# passed to Popen to run the task. The command will be given
41
# /dev/null as its standard input, and pipes as its standard output
42
# and error.
43
def cmd(self):
44
raise NotImplementedError
45
46
# TaskPool calls this method to report that the process wrote
47
# |string| to its standard output.
48
def onStdout(self, string):
49
raise NotImplementedError
50
51
# TaskPool calls this method to report that the process wrote
52
# |string| to its standard error.
53
def onStderr(self, string):
54
raise NotImplementedError
55
56
# TaskPool calls this method to report that the process terminated,
57
# yielding |returncode|.
58
def onFinished(self, returncode):
59
raise NotImplementedError
60
61
# TaskPool calls this method to report that the process timed out and
62
# was killed.
63
def onTimeout(self):
64
raise NotImplementedError
65
66
# If a task output handler (onStdout, onStderr) throws this, we terminate
67
# the task.
68
class TerminateTask(Exception):
69
pass
70
71
def __init__(self, tasks, cwd='.', job_limit=4, timeout=150):
72
self.pending = iter(tasks)
73
self.cwd = cwd
74
self.job_limit = job_limit
75
self.timeout = timeout
76
self.next_pending = next(self.pending, None)
77
78
def run_all(self):
79
# The currently running tasks: a set of Task instances.
80
running = set()
81
with open(os.devnull, 'r') as devnull:
82
while True:
83
while len(running) < self.job_limit and self.next_pending:
84
task = self.next_pending
85
p = Popen(task.cmd(), bufsize=16384,
86
stdin=devnull, stdout=PIPE, stderr=PIPE,
87
cwd=self.cwd)
88
89
# Put the stdout and stderr pipes in non-blocking mode. See
90
# the post-'select' code below for details.
91
flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
92
fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
93
flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL)
94
fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
95
96
task.start(p, time.time() + self.timeout)
97
running.add(task)
98
self.next_pending = next(self.pending, None)
99
100
# If we have no tasks running, and the above wasn't able to
101
# start any new ones, then we must be done!
102
if not running:
103
break
104
105
# How many seconds do we have until the earliest deadline?
106
now = time.time()
107
secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0)
108
109
# Wait for output or a timeout.
110
stdouts_and_stderrs = ([t.pipe.stdout for t in running]
111
+ [t.pipe.stderr for t in running])
112
(readable, w, x) = select.select(stdouts_and_stderrs, [], [],
113
secs_to_next_deadline)
114
finished = set()
115
terminate = set()
116
for t in running:
117
# Since we've placed the pipes in non-blocking mode, these
118
# 'read's will simply return as many bytes as are available,
119
# rather than blocking until they have accumulated the full
120
# amount requested (or reached EOF). The 'read's should
121
# never throw, since 'select' has told us there was
122
# something available.
123
if t.pipe.stdout in readable:
124
output = t.pipe.stdout.read(16384)
125
if len(output):
126
try:
127
t.onStdout(output.decode('utf-8'))
128
except TerminateTask:
129
terminate.add(t)
130
if t.pipe.stderr in readable:
131
output = t.pipe.stderr.read(16384)
132
if len(output):
133
try:
134
t.onStderr(output.decode('utf-8'))
135
except TerminateTask:
136
terminate.add(t)
137
else:
138
# We assume that, once a task has closed its stderr,
139
# it will soon terminate. If a task closes its
140
# stderr and then hangs, we'll hang too, here.
141
t.pipe.wait()
142
t.onFinished(t.pipe.returncode)
143
finished.add(t)
144
# Remove the finished tasks from the running set. (Do this here
145
# to avoid mutating the set while iterating over it.)
146
running -= finished
147
148
# Terminate any tasks whose handlers have asked us to do so.
149
for t in terminate:
150
t.pipe.terminate()
151
t.pipe.wait()
152
running.remove(t)
153
154
# Terminate any tasks which have missed their deadline.
155
finished = set()
156
for t in running:
157
if now >= t.deadline:
158
t.pipe.terminate()
159
t.pipe.wait()
160
t.onTimeout()
161
finished.add(t)
162
# Remove the finished tasks from the running set. (Do this here
163
# to avoid mutating the set while iterating over it.)
164
running -= finished
165
return None
166
167
168
def get_cpu_count():
169
"""
170
Guess at a reasonable parallelism count to set as the default for the
171
current machine and run.
172
"""
173
# Python 2.6+
174
try:
175
import multiprocessing
176
return multiprocessing.cpu_count()
177
except (ImportError, NotImplementedError):
178
pass
179
180
# POSIX
181
try:
182
res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
183
if res > 0:
184
return res
185
except (AttributeError, ValueError):
186
pass
187
188
# Windows
189
try:
190
res = int(os.environ['NUMBER_OF_PROCESSORS'])
191
if res > 0:
192
return res
193
except (KeyError, ValueError):
194
pass
195
196
return 1
197
198
199
if __name__ == '__main__':
200
# Test TaskPool by using it to implement the unique 'sleep sort' algorithm.
201
def sleep_sort(ns, timeout):
202
sorted = []
203
204
class SortableTask(TaskPool.Task):
205
def __init__(self, n):
206
super(SortableTask, self).__init__()
207
self.n = n
208
209
def start(self, pipe, deadline):
210
super(SortableTask, self).start(pipe, deadline)
211
212
def cmd(self):
213
return ['sh', '-c', 'echo out; sleep %d; echo err>&2' % (self.n,)]
214
215
def onStdout(self, text):
216
print('%d stdout: %r' % (self.n, text))
217
218
def onStderr(self, text):
219
print('%d stderr: %r' % (self.n, text))
220
221
def onFinished(self, returncode):
222
print('%d (rc=%d)' % (self.n, returncode))
223
sorted.append(self.n)
224
225
def onTimeout(self):
226
print('%d timed out' % (self.n,))
227
228
p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout)
229
p.run_all()
230
return sorted
231
232
print(repr(sleep_sort([1, 1, 2, 3, 5, 8, 13, 21, 34], 15)))