Source code

Revision control

Other Tools

1
from __future__ import unicode_literals
2
3
import multiprocessing
4
import threading
5
import traceback
6
from six.moves.queue import Empty
7
from collections import namedtuple
8
from multiprocessing import Process, current_process, Queue
9
10
from mozlog import structuredlog, capture
11
12
# Special value used as a sentinal in various commands
13
Stop = object()
14
15
16
def release_mozlog_lock():
17
try:
18
from mozlog.structuredlog import StructuredLogger
19
try:
20
StructuredLogger._lock.release()
21
except threading.ThreadError:
22
pass
23
except ImportError:
24
pass
25
26
27
class MessageLogger(object):
28
def __init__(self, message_func):
29
self.send_message = message_func
30
31
def _log_data(self, action, **kwargs):
32
self.send_message("log", action, kwargs)
33
34
def process_output(self, process, data, command):
35
self._log_data("process_output", process=process, data=data, command=command)
36
37
38
def _log_func(level_name):
39
def log(self, message):
40
self._log_data(level_name.lower(), message=message)
41
log.__doc__ = """Log a message with level %s
42
43
:param message: The string message to log
44
""" % level_name
45
log.__name__ = str(level_name).lower()
46
return log
47
48
# Create all the methods on StructuredLog for debug levels
49
for level_name in structuredlog.log_levels:
50
setattr(MessageLogger, level_name.lower(), _log_func(level_name))
51
52
53
class TestRunner(object):
54
def __init__(self, logger, command_queue, result_queue, executor):
55
"""Class implementing the main loop for running tests.
56
57
This class delegates the job of actually running a test to the executor
58
that is passed in.
59
60
:param logger: Structured logger
61
:param command_queue: subprocess.Queue used to send commands to the
62
process
63
:param result_queue: subprocess.Queue used to send results to the
64
parent TestRunnerManager process
65
:param executor: TestExecutor object that will actually run a test.
66
"""
67
self.command_queue = command_queue
68
self.result_queue = result_queue
69
70
self.executor = executor
71
self.name = current_process().name
72
self.logger = logger
73
74
def __enter__(self):
75
return self
76
77
def __exit__(self, exc_type, exc_value, traceback):
78
self.teardown()
79
80
def setup(self):
81
self.logger.debug("Executor setup")
82
try:
83
self.executor.setup(self)
84
except Exception:
85
# The caller is responsible for logging the exception if required
86
self.send_message("init_failed")
87
else:
88
self.send_message("init_succeeded")
89
self.logger.debug("Executor setup done")
90
91
def teardown(self):
92
self.executor.teardown()
93
self.send_message("runner_teardown")
94
self.result_queue = None
95
self.command_queue = None
96
self.browser = None
97
98
def run(self):
99
"""Main loop accepting commands over the pipe and triggering
100
the associated methods"""
101
self.setup()
102
commands = {"run_test": self.run_test,
103
"reset": self.reset,
104
"stop": self.stop,
105
"wait": self.wait}
106
while True:
107
command, args = self.command_queue.get()
108
try:
109
rv = commands[command](*args)
110
except Exception:
111
self.send_message("error",
112
"Error running command %s with arguments %r:\n%s" %
113
(command, args, traceback.format_exc()))
114
else:
115
if rv is Stop:
116
break
117
118
def stop(self):
119
return Stop
120
121
def reset(self):
122
self.executor.reset()
123
124
def run_test(self, test):
125
try:
126
return self.executor.run_test(test)
127
except Exception:
128
self.logger.critical(traceback.format_exc())
129
raise
130
131
def wait(self):
132
self.executor.wait()
133
self.send_message("wait_finished")
134
135
def send_message(self, command, *args):
136
self.result_queue.put((command, args))
137
138
139
def start_runner(runner_command_queue, runner_result_queue,
140
executor_cls, executor_kwargs,
141
executor_browser_cls, executor_browser_kwargs,
142
capture_stdio, stop_flag):
143
"""Launch a TestRunner in a new process"""
144
145
def send_message(command, *args):
146
runner_result_queue.put((command, args))
147
148
def handle_error(e):
149
logger.critical(traceback.format_exc())
150
stop_flag.set()
151
152
# Ensure that when we start this in a new process we have the global lock
153
# in the logging module unlocked
154
release_mozlog_lock()
155
156
logger = MessageLogger(send_message)
157
158
with capture.CaptureIO(logger, capture_stdio):
159
try:
160
browser = executor_browser_cls(**executor_browser_kwargs)
161
executor = executor_cls(browser, **executor_kwargs)
162
with TestRunner(logger, runner_command_queue, runner_result_queue, executor) as runner:
163
try:
164
runner.run()
165
except KeyboardInterrupt:
166
stop_flag.set()
167
except Exception as e:
168
handle_error(e)
169
except Exception as e:
170
handle_error(e)
171
172
173
manager_count = 0
174
175
176
def next_manager_number():
177
global manager_count
178
local = manager_count = manager_count + 1
179
return local
180
181
182
class BrowserManager(object):
183
def __init__(self, logger, browser, command_queue, no_timeout=False):
184
self.logger = logger
185
self.browser = browser
186
self.no_timeout = no_timeout
187
self.browser_settings = None
188
self.last_test = None
189
190
self.started = False
191
192
self.init_timer = None
193
self.command_queue = command_queue
194
195
def update_settings(self, test):
196
browser_settings = self.browser.settings(test)
197
restart_required = ((self.browser_settings is not None and
198
self.browser_settings != browser_settings) or
199
(self.last_test != test and test.expected() == "CRASH"))
200
self.browser_settings = browser_settings
201
self.last_test = test
202
return restart_required
203
204
def init(self, group_metadata):
205
"""Launch the browser that is being tested,
206
and the TestRunner process that will run the tests."""
207
# It seems that this lock is helpful to prevent some race that otherwise
208
# sometimes stops the spawned processes initialising correctly, and
209
# leaves this thread hung
210
if self.init_timer is not None:
211
self.init_timer.cancel()
212
213
self.logger.debug("Init called, starting browser and runner")
214
215
if not self.no_timeout:
216
self.init_timer = threading.Timer(self.browser.init_timeout,
217
self.init_timeout)
218
try:
219
if self.init_timer is not None:
220
self.init_timer.start()
221
self.logger.debug("Starting browser with settings %r" % self.browser_settings)
222
self.browser.start(group_metadata=group_metadata, **self.browser_settings)
223
self.browser_pid = self.browser.pid()
224
except Exception:
225
self.logger.warning("Failure during init %s" % traceback.format_exc())
226
if self.init_timer is not None:
227
self.init_timer.cancel()
228
self.logger.error(traceback.format_exc())
229
succeeded = False
230
else:
231
succeeded = True
232
self.started = True
233
234
return succeeded
235
236
def send_message(self, command, *args):
237
self.command_queue.put((command, args))
238
239
def init_timeout(self):
240
# This is called from a separate thread, so we send a message to the
241
# main loop so we get back onto the manager thread
242
self.logger.debug("init_failed called from timer")
243
self.send_message("init_failed")
244
245
def after_init(self):
246
"""Callback when we have started the browser, started the remote
247
control connection, and we are ready to start testing."""
248
if self.init_timer is not None:
249
self.init_timer.cancel()
250
251
def stop(self, force=False):
252
self.browser.stop(force=force)
253
self.started = False
254
255
def cleanup(self):
256
if self.init_timer is not None:
257
self.init_timer.cancel()
258
259
def check_crash(self, test_id):
260
return self.browser.check_crash(process=self.browser_pid, test=test_id)
261
262
def is_alive(self):
263
return self.browser.is_alive()
264
265
266
class _RunnerManagerState(object):
267
before_init = namedtuple("before_init", [])
268
initializing = namedtuple("initializing_browser",
269
["test", "test_group", "group_metadata", "failure_count"])
270
running = namedtuple("running", ["test", "test_group", "group_metadata"])
271
restarting = namedtuple("restarting", ["test", "test_group", "group_metadata"])
272
error = namedtuple("error", [])
273
stop = namedtuple("stop", [])
274
275
276
RunnerManagerState = _RunnerManagerState()
277
278
279
class TestRunnerManager(threading.Thread):
280
def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs,
281
executor_cls, executor_kwargs, stop_flag, rerun=1, pause_after_test=False,
282
pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None,
283
capture_stdio=True):
284
"""Thread that owns a single TestRunner process and any processes required
285
by the TestRunner (e.g. the Firefox binary).
286
287
TestRunnerManagers are responsible for launching the browser process and the
288
runner process, and for logging the test progress. The actual test running
289
is done by the TestRunner. In particular they:
290
291
* Start the binary of the program under test
292
* Start the TestRunner
293
* Tell the TestRunner to start a test, if any
294
* Log that the test started
295
* Log the test results
296
* Take any remedial action required e.g. restart crashed or hung
297
processes
298
"""
299
self.suite_name = suite_name
300
301
self.test_source = test_source_cls(test_queue)
302
303
self.browser_cls = browser_cls
304
self.browser_kwargs = browser_kwargs
305
306
self.executor_cls = executor_cls
307
self.executor_kwargs = executor_kwargs
308
309
# Flags used to shut down this thread if we get a sigint
310
self.parent_stop_flag = stop_flag
311
self.child_stop_flag = multiprocessing.Event()
312
313
self.rerun = rerun
314
self.run_count = 0
315
self.pause_after_test = pause_after_test
316
self.pause_on_unexpected = pause_on_unexpected
317
self.restart_on_unexpected = restart_on_unexpected
318
self.debug_info = debug_info
319
320
self.manager_number = next_manager_number()
321
322
self.command_queue = Queue()
323
self.remote_queue = Queue()
324
325
self.test_runner_proc = None
326
327
threading.Thread.__init__(self, name="TestRunnerManager-%i" % self.manager_number)
328
# This is started in the actual new thread
329
self.logger = None
330
331
self.test_count = 0
332
self.unexpected_count = 0
333
334
# This may not really be what we want
335
self.daemon = True
336
337
self.max_restarts = 5
338
339
self.browser = None
340
341
self.capture_stdio = capture_stdio
342
343
def run(self):
344
"""Main loop for the TestRunnerManager.
345
346
TestRunnerManagers generally receive commands from their
347
TestRunner updating them on the status of a test. They
348
may also have a stop flag set by the main thread indicating
349
that the manager should shut down the next time the event loop
350
spins."""
351
self.logger = structuredlog.StructuredLogger(self.suite_name)
352
with self.browser_cls(self.logger, **self.browser_kwargs) as browser:
353
self.browser = BrowserManager(self.logger,
354
browser,
355
self.command_queue,
356
no_timeout=self.debug_info is not None)
357
dispatch = {
358
RunnerManagerState.before_init: self.start_init,
359
RunnerManagerState.initializing: self.init,
360
RunnerManagerState.running: self.run_test,
361
RunnerManagerState.restarting: self.restart_runner
362
}
363
364
self.state = RunnerManagerState.before_init()
365
end_states = (RunnerManagerState.stop,
366
RunnerManagerState.error)
367
368
try:
369
while not isinstance(self.state, end_states):
370
f = dispatch.get(self.state.__class__)
371
while f:
372
self.logger.debug("Dispatch %s" % f.__name__)
373
if self.should_stop():
374
return
375
new_state = f()
376
if new_state is None:
377
break
378
self.state = new_state
379
self.logger.debug("new state: %s" % self.state.__class__.__name__)
380
if isinstance(self.state, end_states):
381
return
382
f = dispatch.get(self.state.__class__)
383
384
new_state = None
385
while new_state is None:
386
new_state = self.wait_event()
387
if self.should_stop():
388
return
389
self.state = new_state
390
self.logger.debug("new state: %s" % self.state.__class__.__name__)
391
except Exception as e:
392
self.logger.error(traceback.format_exc(e))
393
raise
394
finally:
395
self.logger.debug("TestRunnerManager main loop terminating, starting cleanup")
396
clean = isinstance(self.state, RunnerManagerState.stop)
397
self.stop_runner(force=not clean)
398
self.teardown()
399
self.logger.debug("TestRunnerManager main loop terminated")
400
401
def wait_event(self):
402
dispatch = {
403
RunnerManagerState.before_init: {},
404
RunnerManagerState.initializing:
405
{
406
"init_succeeded": self.init_succeeded,
407
"init_failed": self.init_failed,
408
},
409
RunnerManagerState.running:
410
{
411
"test_ended": self.test_ended,
412
"wait_finished": self.wait_finished,
413
},
414
RunnerManagerState.restarting: {},
415
RunnerManagerState.error: {},
416
RunnerManagerState.stop: {},
417
None: {
418
"runner_teardown": self.runner_teardown,
419
"log": self.log,
420
"error": self.error
421
}
422
}
423
try:
424
command, data = self.command_queue.get(True, 1)
425
self.logger.debug("Got command: %r" % command)
426
except IOError:
427
self.logger.error("Got IOError from poll")
428
return RunnerManagerState.restarting(0)
429
except Empty:
430
if (self.debug_info and self.debug_info.interactive and
431
self.browser.started and not self.browser.is_alive()):
432
self.logger.debug("Debugger exited")
433
return RunnerManagerState.stop()
434
435
if (isinstance(self.state, RunnerManagerState.running) and
436
not self.test_runner_proc.is_alive()):
437
if not self.command_queue.empty():
438
# We got a new message so process that
439
return
440
441
# If we got to here the runner presumably shut down
442
# unexpectedly
443
self.logger.info("Test runner process shut down")
444
445
if self.state.test is not None:
446
# This could happen if the test runner crashed for some other
447
# reason
448
# Need to consider the unlikely case where one test causes the
449
# runner process to repeatedly die
450
self.logger.critical("Last test did not complete")
451
return RunnerManagerState.error()
452
self.logger.warning("More tests found, but runner process died, restarting")
453
return RunnerManagerState.restarting(0)
454
else:
455
f = (dispatch.get(self.state.__class__, {}).get(command) or
456
dispatch.get(None, {}).get(command))
457
if not f:
458
self.logger.warning("Got command %s in state %s" %
459
(command, self.state.__class__.__name__))
460
return
461
return f(*data)
462
463
def should_stop(self):
464
return self.child_stop_flag.is_set() or self.parent_stop_flag.is_set()
465
466
def start_init(self):
467
test, test_group, group_metadata = self.get_next_test()
468
if test is None:
469
return RunnerManagerState.stop()
470
else:
471
return RunnerManagerState.initializing(test, test_group, group_metadata, 0)
472
473
def init(self):
474
assert isinstance(self.state, RunnerManagerState.initializing)
475
if self.state.failure_count > self.max_restarts:
476
self.logger.critical("Max restarts exceeded")
477
return RunnerManagerState.error()
478
479
self.browser.update_settings(self.state.test)
480
481
result = self.browser.init(self.state.group_metadata)
482
if result is Stop:
483
return RunnerManagerState.error()
484
elif not result:
485
return RunnerManagerState.initializing(self.state.test,
486
self.state.test_group,
487
self.state.group_metadata,
488
self.state.failure_count + 1)
489
else:
490
self.executor_kwargs["group_metadata"] = self.state.group_metadata
491
self.start_test_runner()
492
493
def start_test_runner(self):
494
# Note that we need to be careful to start the browser before the
495
# test runner to ensure that any state set when the browser is started
496
# can be passed in to the test runner.
497
assert isinstance(self.state, RunnerManagerState.initializing)
498
assert self.command_queue is not None
499
assert self.remote_queue is not None
500
self.logger.info("Starting runner")
501
executor_browser_cls, executor_browser_kwargs = self.browser.browser.executor_browser()
502
503
args = (self.remote_queue,
504
self.command_queue,
505
self.executor_cls,
506
self.executor_kwargs,
507
executor_browser_cls,
508
executor_browser_kwargs,
509
self.capture_stdio,
510
self.child_stop_flag)
511
self.test_runner_proc = Process(target=start_runner,
512
args=args,
513
name="TestRunner-%i" % self.manager_number)
514
self.test_runner_proc.start()
515
self.logger.debug("Test runner started")
516
# Now we wait for either an init_succeeded event or an init_failed event
517
518
def init_succeeded(self):
519
assert isinstance(self.state, RunnerManagerState.initializing)
520
self.browser.after_init()
521
return RunnerManagerState.running(self.state.test,
522
self.state.test_group,
523
self.state.group_metadata)
524
525
def init_failed(self):
526
assert isinstance(self.state, RunnerManagerState.initializing)
527
self.browser.check_crash(None)
528
self.browser.after_init()
529
self.stop_runner(force=True)
530
return RunnerManagerState.initializing(self.state.test,
531
self.state.test_group,
532
self.state.group_metadata,
533
self.state.failure_count + 1)
534
535
def get_next_test(self, test_group=None):
536
test = None
537
while test is None:
538
while test_group is None or len(test_group) == 0:
539
test_group, group_metadata = self.test_source.group()
540
if test_group is None:
541
self.logger.info("No more tests")
542
return None, None, None
543
test = test_group.popleft()
544
self.run_count = 0
545
return test, test_group, group_metadata
546
547
def run_test(self):
548
assert isinstance(self.state, RunnerManagerState.running)
549
assert self.state.test is not None
550
551
if self.browser.update_settings(self.state.test):
552
self.logger.info("Restarting browser for new test environment")
553
return RunnerManagerState.restarting(self.state.test,
554
self.state.test_group,
555
self.state.group_metadata)
556
557
self.logger.test_start(self.state.test.id)
558
if self.rerun > 1:
559
self.logger.info("Run %d/%d" % (self.run_count, self.rerun))
560
self.send_message("reset")
561
self.run_count += 1
562
self.send_message("run_test", self.state.test)
563
564
def test_ended(self, test, results):
565
"""Handle the end of a test.
566
567
Output the result of each subtest, and the result of the overall
568
harness to the logs.
569
"""
570
assert isinstance(self.state, RunnerManagerState.running)
571
assert test == self.state.test
572
# Write the result of each subtest
573
file_result, test_results = results
574
subtest_unexpected = False
575
for result in test_results:
576
if test.disabled(result.name):
577
continue
578
expected = test.expected(result.name)
579
known_intermittent = test.known_intermittent(result.name)
580
is_unexpected = expected != result.status and result.status not in known_intermittent
581
582
if is_unexpected:
583
self.unexpected_count += 1
584
self.logger.debug("Unexpected count in this thread %i" % self.unexpected_count)
585
subtest_unexpected = True
586
self.logger.test_status(test.id,
587
result.name,
588
result.status,
589
message=result.message,
590
expected=expected,
591
known_intermittent=known_intermittent,
592
stack=result.stack)
593
594
# We have a couple of status codes that are used internally, but not exposed to the
595
# user. These are used to indicate that some possibly-broken state was reached
596
# and we should restart the runner before the next test.
597
# INTERNAL-ERROR indicates a Python exception was caught in the harness
598
# EXTERNAL-TIMEOUT indicates we had to forcibly kill the browser from the harness
599
# because the test didn't return a result after reaching the test-internal timeout
600
status_subns = {"INTERNAL-ERROR": "ERROR",
601
"EXTERNAL-TIMEOUT": "TIMEOUT"}
602
expected = test.expected()
603
known_intermittent = test.known_intermittent()
604
status = status_subns.get(file_result.status, file_result.status)
605
606
if self.browser.check_crash(test.id) and status != "CRASH":
607
self.logger.info("Found a crash dump; should change status from %s to CRASH but this causes instability" % (status,))
608
609
self.test_count += 1
610
is_unexpected = expected != status and status not in known_intermittent
611
if is_unexpected:
612
self.unexpected_count += 1
613
self.logger.debug("Unexpected count in this thread %i" % self.unexpected_count)
614
615
if "assertion_count" in file_result.extra:
616
assertion_count = file_result.extra.pop("assertion_count")
617
if assertion_count > 0:
618
self.logger.assertion_count(test.id,
619
int(assertion_count),
620
test.min_assertion_count,
621
test.max_assertion_count)
622
623
file_result.extra["test_timeout"] = test.timeout * self.executor_kwargs['timeout_multiplier']
624
625
self.logger.test_end(test.id,
626
status,
627
message=file_result.message,
628
expected=expected,
629
known_intermittent=known_intermittent,
630
extra=file_result.extra,
631
stack=file_result.stack)
632
633
restart_before_next = (test.restart_after or
634
file_result.status in ("CRASH", "EXTERNAL-TIMEOUT", "INTERNAL-ERROR") or
635
((subtest_unexpected or is_unexpected) and
636
self.restart_on_unexpected))
637
638
if (not file_result.status == "CRASH" and
639
self.pause_after_test or
640
(self.pause_on_unexpected and (subtest_unexpected or is_unexpected))):
641
self.logger.info("Pausing until the browser exits")
642
self.send_message("wait")
643
else:
644
return self.after_test_end(test, restart_before_next)
645
646
def wait_finished(self):
647
assert isinstance(self.state, RunnerManagerState.running)
648
self.logger.debug("Wait finished")
649
650
# The browser should be stopped already, but this ensures we do any
651
# post-stop processing
652
return self.after_test_end(self.state.test, True)
653
654
def after_test_end(self, test, restart):
655
assert isinstance(self.state, RunnerManagerState.running)
656
if self.run_count == self.rerun:
657
test, test_group, group_metadata = self.get_next_test()
658
if test is None:
659
return RunnerManagerState.stop()
660
if test_group != self.state.test_group:
661
# We are starting a new group of tests, so force a restart
662
restart = True
663
else:
664
test_group = self.state.test_group
665
group_metadata = self.state.group_metadata
666
if restart:
667
return RunnerManagerState.restarting(test, test_group, group_metadata)
668
else:
669
return RunnerManagerState.running(test, test_group, group_metadata)
670
671
def restart_runner(self):
672
"""Stop and restart the TestRunner"""
673
assert isinstance(self.state, RunnerManagerState.restarting)
674
self.stop_runner()
675
return RunnerManagerState.initializing(self.state.test, self.state.test_group, self.state.group_metadata, 0)
676
677
def log(self, action, kwargs):
678
getattr(self.logger, action)(**kwargs)
679
680
def error(self, message):
681
self.logger.error(message)
682
self.restart_runner()
683
684
def stop_runner(self, force=False):
685
"""Stop the TestRunner and the browser binary."""
686
if self.test_runner_proc is None:
687
return
688
689
if self.test_runner_proc.is_alive():
690
self.send_message("stop")
691
try:
692
self.browser.stop(force=force)
693
self.ensure_runner_stopped()
694
finally:
695
self.cleanup()
696
697
def teardown(self):
698
self.logger.debug("TestRunnerManager teardown")
699
self.test_runner_proc = None
700
self.command_queue.close()
701
self.remote_queue.close()
702
self.command_queue = None
703
self.remote_queue = None
704
705
def ensure_runner_stopped(self):
706
self.logger.debug("ensure_runner_stopped")
707
if self.test_runner_proc is None:
708
return
709
710
self.logger.debug("waiting for runner process to end")
711
self.test_runner_proc.join(10)
712
self.logger.debug("After join")
713
if self.test_runner_proc.is_alive():
714
# This might leak a file handle from the queue
715
self.logger.warning("Forcibly terminating runner process")
716
self.test_runner_proc.terminate()
717
718
# Multiprocessing queues are backed by operating system pipes. If
719
# the pipe in the child process had buffered data at the time of
720
# forced termination, the queue is no longer in a usable state
721
# (subsequent attempts to retrieve items may block indefinitely).
722
# Discard the potentially-corrupted queue and create a new one.
723
self.command_queue.close()
724
self.command_queue = Queue()
725
self.remote_queue.close()
726
self.remote_queue = Queue()
727
else:
728
self.logger.debug("Runner process exited with code %i" % self.test_runner_proc.exitcode)
729
730
def runner_teardown(self):
731
self.ensure_runner_stopped()
732
return RunnerManagerState.stop()
733
734
def send_message(self, command, *args):
735
self.remote_queue.put((command, args))
736
737
def cleanup(self):
738
self.logger.debug("TestRunnerManager cleanup")
739
if self.browser:
740
self.browser.cleanup()
741
while True:
742
try:
743
cmd, data = self.command_queue.get_nowait()
744
except Empty:
745
break
746
else:
747
if cmd == "log":
748
self.log(*data)
749
elif cmd == "runner_teardown":
750
# It's OK for the "runner_teardown" message to be left in
751
# the queue during cleanup, as we will already have tried
752
# to stop the TestRunner in `stop_runner`.
753
pass
754
else:
755
self.logger.warning("Command left in command_queue during cleanup: %r, %r" % (cmd, data))
756
while True:
757
try:
758
cmd, data = self.remote_queue.get_nowait()
759
self.logger.warning("Command left in remote_queue during cleanup: %r, %r" % (cmd, data))
760
except Empty:
761
break
762
763
764
def make_test_queue(tests, test_source_cls, **test_source_kwargs):
765
queue = test_source_cls.make_queue(tests, **test_source_kwargs)
766
767
# There is a race condition that means sometimes we continue
768
# before the tests have been written to the underlying pipe.
769
# Polling the pipe for data here avoids that
770
queue._reader.poll(10)
771
assert not queue.empty()
772
return queue
773
774
775
class ManagerGroup(object):
776
def __init__(self, suite_name, size, test_source_cls, test_source_kwargs,
777
browser_cls, browser_kwargs,
778
executor_cls, executor_kwargs,
779
rerun=1,
780
pause_after_test=False,
781
pause_on_unexpected=False,
782
restart_on_unexpected=True,
783
debug_info=None,
784
capture_stdio=True):
785
"""Main thread object that owns all the TestRunnerManager threads."""
786
self.suite_name = suite_name
787
self.size = size
788
self.test_source_cls = test_source_cls
789
self.test_source_kwargs = test_source_kwargs
790
self.browser_cls = browser_cls
791
self.browser_kwargs = browser_kwargs
792
self.executor_cls = executor_cls
793
self.executor_kwargs = executor_kwargs
794
self.pause_after_test = pause_after_test
795
self.pause_on_unexpected = pause_on_unexpected
796
self.restart_on_unexpected = restart_on_unexpected
797
self.debug_info = debug_info
798
self.rerun = rerun
799
self.capture_stdio = capture_stdio
800
801
self.pool = set()
802
# Event that is polled by threads so that they can gracefully exit in the face
803
# of sigint
804
self.stop_flag = threading.Event()
805
self.logger = structuredlog.StructuredLogger(suite_name)
806
807
def __enter__(self):
808
return self
809
810
def __exit__(self, exc_type, exc_val, exc_tb):
811
self.stop()
812
813
def run(self, test_type, tests):
814
"""Start all managers in the group"""
815
self.logger.debug("Using %i processes" % self.size)
816
type_tests = tests[test_type]
817
if not type_tests:
818
self.logger.info("No %s tests to run" % test_type)
819
return
820
821
test_queue = make_test_queue(type_tests, self.test_source_cls, **self.test_source_kwargs)
822
823
for _ in range(self.size):
824
manager = TestRunnerManager(self.suite_name,
825
test_queue,
826
self.test_source_cls,
827
self.browser_cls,
828
self.browser_kwargs,
829
self.executor_cls,
830
self.executor_kwargs,
831
self.stop_flag,
832
self.rerun,
833
self.pause_after_test,
834
self.pause_on_unexpected,
835
self.restart_on_unexpected,
836
self.debug_info,
837
self.capture_stdio)
838
manager.start()
839
self.pool.add(manager)
840
self.wait()
841
842
def wait(self):
843
"""Wait for all the managers in the group to finish"""
844
for manager in self.pool:
845
manager.join()
846
847
def stop(self):
848
"""Set the stop flag so that all managers in the group stop as soon
849
as possible"""
850
self.stop_flag.set()
851
self.logger.debug("Stop flag set in ManagerGroup")
852
853
def test_count(self):
854
return sum(manager.test_count for manager in self.pool)
855
856
def unexpected_count(self):
857
return sum(manager.unexpected_count for manager in self.pool)