Source code

Revision control

Other Tools

1
from __future__ import unicode_literals
2
3
import multiprocessing
4
import threading
5
import traceback
6
from six.moves.queue import Empty
7
from collections import namedtuple
8
from multiprocessing import Process, current_process, Queue
9
10
from mozlog import structuredlog, capture
11
12
# Special value used as a sentinal in various commands
13
Stop = object()
14
15
16
class MessageLogger(object):
17
def __init__(self, message_func):
18
self.send_message = message_func
19
20
def _log_data(self, action, **kwargs):
21
self.send_message("log", action, kwargs)
22
23
def process_output(self, process, data, command):
24
self._log_data("process_output", process=process, data=data, command=command)
25
26
27
def _log_func(level_name):
28
def log(self, message):
29
self._log_data(level_name.lower(), message=message)
30
log.__doc__ = """Log a message with level %s
31
32
:param message: The string message to log
33
""" % level_name
34
log.__name__ = str(level_name).lower()
35
return log
36
37
# Create all the methods on StructuredLog for debug levels
38
for level_name in structuredlog.log_levels:
39
setattr(MessageLogger, level_name.lower(), _log_func(level_name))
40
41
42
class TestRunner(object):
43
def __init__(self, logger, command_queue, result_queue, executor):
44
"""Class implementing the main loop for running tests.
45
46
This class delegates the job of actually running a test to the executor
47
that is passed in.
48
49
:param logger: Structured logger
50
:param command_queue: subprocess.Queue used to send commands to the
51
process
52
:param result_queue: subprocess.Queue used to send results to the
53
parent TestRunnerManager process
54
:param executor: TestExecutor object that will actually run a test.
55
"""
56
self.command_queue = command_queue
57
self.result_queue = result_queue
58
59
self.executor = executor
60
self.name = current_process().name
61
self.logger = logger
62
63
def __enter__(self):
64
return self
65
66
def __exit__(self, exc_type, exc_value, traceback):
67
self.teardown()
68
69
def setup(self):
70
self.logger.debug("Executor setup")
71
try:
72
self.executor.setup(self)
73
except Exception:
74
# The caller is responsible for logging the exception if required
75
self.send_message("init_failed")
76
else:
77
self.send_message("init_succeeded")
78
self.logger.debug("Executor setup done")
79
80
def teardown(self):
81
self.executor.teardown()
82
self.send_message("runner_teardown")
83
self.result_queue = None
84
self.command_queue = None
85
self.browser = None
86
87
def run(self):
88
"""Main loop accepting commands over the pipe and triggering
89
the associated methods"""
90
self.setup()
91
commands = {"run_test": self.run_test,
92
"reset": self.reset,
93
"stop": self.stop,
94
"wait": self.wait}
95
while True:
96
command, args = self.command_queue.get()
97
try:
98
rv = commands[command](*args)
99
except Exception:
100
self.send_message("error",
101
"Error running command %s with arguments %r:\n%s" %
102
(command, args, traceback.format_exc()))
103
else:
104
if rv is Stop:
105
break
106
107
def stop(self):
108
return Stop
109
110
def reset(self):
111
self.executor.reset()
112
113
def run_test(self, test):
114
try:
115
return self.executor.run_test(test)
116
except Exception:
117
self.logger.critical(traceback.format_exc())
118
raise
119
120
def wait(self):
121
self.executor.wait()
122
self.send_message("wait_finished")
123
124
def send_message(self, command, *args):
125
self.result_queue.put((command, args))
126
127
128
def start_runner(runner_command_queue, runner_result_queue,
129
executor_cls, executor_kwargs,
130
executor_browser_cls, executor_browser_kwargs,
131
capture_stdio, stop_flag):
132
"""Launch a TestRunner in a new process"""
133
134
def send_message(command, *args):
135
runner_result_queue.put((command, args))
136
137
def handle_error(e):
138
logger.critical(traceback.format_exc())
139
stop_flag.set()
140
141
logger = MessageLogger(send_message)
142
143
with capture.CaptureIO(logger, capture_stdio):
144
try:
145
browser = executor_browser_cls(**executor_browser_kwargs)
146
executor = executor_cls(browser, **executor_kwargs)
147
with TestRunner(logger, runner_command_queue, runner_result_queue, executor) as runner:
148
try:
149
runner.run()
150
except KeyboardInterrupt:
151
stop_flag.set()
152
except Exception as e:
153
handle_error(e)
154
except Exception as e:
155
handle_error(e)
156
157
158
manager_count = 0
159
160
161
def next_manager_number():
162
global manager_count
163
local = manager_count = manager_count + 1
164
return local
165
166
167
class BrowserManager(object):
168
def __init__(self, logger, browser, command_queue, no_timeout=False):
169
self.logger = logger
170
self.browser = browser
171
self.no_timeout = no_timeout
172
self.browser_settings = None
173
self.last_test = None
174
175
self.started = False
176
177
self.init_timer = None
178
self.command_queue = command_queue
179
180
def update_settings(self, test):
181
browser_settings = self.browser.settings(test)
182
restart_required = ((self.browser_settings is not None and
183
self.browser_settings != browser_settings) or
184
(self.last_test != test and test.expected() == "CRASH"))
185
self.browser_settings = browser_settings
186
self.last_test = test
187
return restart_required
188
189
def init(self, group_metadata):
190
"""Launch the browser that is being tested,
191
and the TestRunner process that will run the tests."""
192
# It seems that this lock is helpful to prevent some race that otherwise
193
# sometimes stops the spawned processes initialising correctly, and
194
# leaves this thread hung
195
if self.init_timer is not None:
196
self.init_timer.cancel()
197
198
self.logger.debug("Init called, starting browser and runner")
199
200
if not self.no_timeout:
201
self.init_timer = threading.Timer(self.browser.init_timeout,
202
self.init_timeout)
203
try:
204
if self.init_timer is not None:
205
self.init_timer.start()
206
self.logger.debug("Starting browser with settings %r" % self.browser_settings)
207
self.browser.start(group_metadata=group_metadata, **self.browser_settings)
208
self.browser_pid = self.browser.pid()
209
except Exception:
210
self.logger.warning("Failure during init %s" % traceback.format_exc())
211
if self.init_timer is not None:
212
self.init_timer.cancel()
213
self.logger.error(traceback.format_exc())
214
succeeded = False
215
else:
216
succeeded = True
217
self.started = True
218
219
return succeeded
220
221
def send_message(self, command, *args):
222
self.command_queue.put((command, args))
223
224
def init_timeout(self):
225
# This is called from a separate thread, so we send a message to the
226
# main loop so we get back onto the manager thread
227
self.logger.debug("init_failed called from timer")
228
self.send_message("init_failed")
229
230
def after_init(self):
231
"""Callback when we have started the browser, started the remote
232
control connection, and we are ready to start testing."""
233
if self.init_timer is not None:
234
self.init_timer.cancel()
235
236
def stop(self, force=False):
237
self.browser.stop(force=force)
238
self.started = False
239
240
def cleanup(self):
241
if self.init_timer is not None:
242
self.init_timer.cancel()
243
244
def check_crash(self, test_id):
245
return self.browser.check_crash(process=self.browser_pid, test=test_id)
246
247
def is_alive(self):
248
return self.browser.is_alive()
249
250
251
class _RunnerManagerState(object):
252
before_init = namedtuple("before_init", [])
253
initializing = namedtuple("initializing_browser",
254
["test", "test_group", "group_metadata", "failure_count"])
255
running = namedtuple("running", ["test", "test_group", "group_metadata"])
256
restarting = namedtuple("restarting", ["test", "test_group", "group_metadata"])
257
error = namedtuple("error", [])
258
stop = namedtuple("stop", [])
259
260
261
RunnerManagerState = _RunnerManagerState()
262
263
264
class TestRunnerManager(threading.Thread):
265
def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs,
266
executor_cls, executor_kwargs, stop_flag, rerun=1, pause_after_test=False,
267
pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None,
268
capture_stdio=True):
269
"""Thread that owns a single TestRunner process and any processes required
270
by the TestRunner (e.g. the Firefox binary).
271
272
TestRunnerManagers are responsible for launching the browser process and the
273
runner process, and for logging the test progress. The actual test running
274
is done by the TestRunner. In particular they:
275
276
* Start the binary of the program under test
277
* Start the TestRunner
278
* Tell the TestRunner to start a test, if any
279
* Log that the test started
280
* Log the test results
281
* Take any remedial action required e.g. restart crashed or hung
282
processes
283
"""
284
self.suite_name = suite_name
285
286
self.test_source = test_source_cls(test_queue)
287
288
self.browser_cls = browser_cls
289
self.browser_kwargs = browser_kwargs
290
291
self.executor_cls = executor_cls
292
self.executor_kwargs = executor_kwargs
293
294
# Flags used to shut down this thread if we get a sigint
295
self.parent_stop_flag = stop_flag
296
self.child_stop_flag = multiprocessing.Event()
297
298
self.rerun = rerun
299
self.run_count = 0
300
self.pause_after_test = pause_after_test
301
self.pause_on_unexpected = pause_on_unexpected
302
self.restart_on_unexpected = restart_on_unexpected
303
self.debug_info = debug_info
304
305
self.manager_number = next_manager_number()
306
307
self.command_queue = Queue()
308
self.remote_queue = Queue()
309
310
self.test_runner_proc = None
311
312
threading.Thread.__init__(self, name="TestRunnerManager-%i" % self.manager_number)
313
# This is started in the actual new thread
314
self.logger = None
315
316
self.test_count = 0
317
self.unexpected_count = 0
318
319
# This may not really be what we want
320
self.daemon = True
321
322
self.max_restarts = 5
323
324
self.browser = None
325
326
self.capture_stdio = capture_stdio
327
328
def run(self):
329
"""Main loop for the TestRunnerManager.
330
331
TestRunnerManagers generally receive commands from their
332
TestRunner updating them on the status of a test. They
333
may also have a stop flag set by the main thread indicating
334
that the manager should shut down the next time the event loop
335
spins."""
336
self.logger = structuredlog.StructuredLogger(self.suite_name)
337
with self.browser_cls(self.logger, **self.browser_kwargs) as browser:
338
self.browser = BrowserManager(self.logger,
339
browser,
340
self.command_queue,
341
no_timeout=self.debug_info is not None)
342
dispatch = {
343
RunnerManagerState.before_init: self.start_init,
344
RunnerManagerState.initializing: self.init,
345
RunnerManagerState.running: self.run_test,
346
RunnerManagerState.restarting: self.restart_runner
347
}
348
349
self.state = RunnerManagerState.before_init()
350
end_states = (RunnerManagerState.stop,
351
RunnerManagerState.error)
352
353
try:
354
while not isinstance(self.state, end_states):
355
f = dispatch.get(self.state.__class__)
356
while f:
357
self.logger.debug("Dispatch %s" % f.__name__)
358
if self.should_stop():
359
return
360
new_state = f()
361
if new_state is None:
362
break
363
self.state = new_state
364
self.logger.debug("new state: %s" % self.state.__class__.__name__)
365
if isinstance(self.state, end_states):
366
return
367
f = dispatch.get(self.state.__class__)
368
369
new_state = None
370
while new_state is None:
371
new_state = self.wait_event()
372
if self.should_stop():
373
return
374
self.state = new_state
375
self.logger.debug("new state: %s" % self.state.__class__.__name__)
376
except Exception as e:
377
self.logger.error(traceback.format_exc(e))
378
raise
379
finally:
380
self.logger.debug("TestRunnerManager main loop terminating, starting cleanup")
381
clean = isinstance(self.state, RunnerManagerState.stop)
382
self.stop_runner(force=not clean)
383
self.teardown()
384
self.logger.debug("TestRunnerManager main loop terminated")
385
386
def wait_event(self):
387
dispatch = {
388
RunnerManagerState.before_init: {},
389
RunnerManagerState.initializing:
390
{
391
"init_succeeded": self.init_succeeded,
392
"init_failed": self.init_failed,
393
},
394
RunnerManagerState.running:
395
{
396
"test_ended": self.test_ended,
397
"wait_finished": self.wait_finished,
398
},
399
RunnerManagerState.restarting: {},
400
RunnerManagerState.error: {},
401
RunnerManagerState.stop: {},
402
None: {
403
"runner_teardown": self.runner_teardown,
404
"log": self.log,
405
"error": self.error
406
}
407
}
408
try:
409
command, data = self.command_queue.get(True, 1)
410
self.logger.debug("Got command: %r" % command)
411
except IOError:
412
self.logger.error("Got IOError from poll")
413
return RunnerManagerState.restarting(0)
414
except Empty:
415
if (self.debug_info and self.debug_info.interactive and
416
self.browser.started and not self.browser.is_alive()):
417
self.logger.debug("Debugger exited")
418
return RunnerManagerState.stop()
419
420
if (isinstance(self.state, RunnerManagerState.running) and
421
not self.test_runner_proc.is_alive()):
422
if not self.command_queue.empty():
423
# We got a new message so process that
424
return
425
426
# If we got to here the runner presumably shut down
427
# unexpectedly
428
self.logger.info("Test runner process shut down")
429
430
if self.state.test is not None:
431
# This could happen if the test runner crashed for some other
432
# reason
433
# Need to consider the unlikely case where one test causes the
434
# runner process to repeatedly die
435
self.logger.critical("Last test did not complete")
436
return RunnerManagerState.error()
437
self.logger.warning("More tests found, but runner process died, restarting")
438
return RunnerManagerState.restarting(0)
439
else:
440
f = (dispatch.get(self.state.__class__, {}).get(command) or
441
dispatch.get(None, {}).get(command))
442
if not f:
443
self.logger.warning("Got command %s in state %s" %
444
(command, self.state.__class__.__name__))
445
return
446
return f(*data)
447
448
def should_stop(self):
449
return self.child_stop_flag.is_set() or self.parent_stop_flag.is_set()
450
451
def start_init(self):
452
test, test_group, group_metadata = self.get_next_test()
453
if test is None:
454
return RunnerManagerState.stop()
455
else:
456
return RunnerManagerState.initializing(test, test_group, group_metadata, 0)
457
458
def init(self):
459
assert isinstance(self.state, RunnerManagerState.initializing)
460
if self.state.failure_count > self.max_restarts:
461
self.logger.error("Max restarts exceeded")
462
return RunnerManagerState.error()
463
464
self.browser.update_settings(self.state.test)
465
466
result = self.browser.init(self.state.group_metadata)
467
if result is Stop:
468
return RunnerManagerState.error()
469
elif not result:
470
return RunnerManagerState.initializing(self.state.test,
471
self.state.test_group,
472
self.state.group_metadata,
473
self.state.failure_count + 1)
474
else:
475
self.executor_kwargs["group_metadata"] = self.state.group_metadata
476
self.start_test_runner()
477
478
def start_test_runner(self):
479
# Note that we need to be careful to start the browser before the
480
# test runner to ensure that any state set when the browser is started
481
# can be passed in to the test runner.
482
assert isinstance(self.state, RunnerManagerState.initializing)
483
assert self.command_queue is not None
484
assert self.remote_queue is not None
485
self.logger.info("Starting runner")
486
executor_browser_cls, executor_browser_kwargs = self.browser.browser.executor_browser()
487
488
args = (self.remote_queue,
489
self.command_queue,
490
self.executor_cls,
491
self.executor_kwargs,
492
executor_browser_cls,
493
executor_browser_kwargs,
494
self.capture_stdio,
495
self.child_stop_flag)
496
self.test_runner_proc = Process(target=start_runner,
497
args=args,
498
name="TestRunner-%i" % self.manager_number)
499
self.test_runner_proc.start()
500
self.logger.debug("Test runner started")
501
# Now we wait for either an init_succeeded event or an init_failed event
502
503
def init_succeeded(self):
504
assert isinstance(self.state, RunnerManagerState.initializing)
505
self.browser.after_init()
506
return RunnerManagerState.running(self.state.test,
507
self.state.test_group,
508
self.state.group_metadata)
509
510
def init_failed(self):
511
assert isinstance(self.state, RunnerManagerState.initializing)
512
self.browser.check_crash(None)
513
self.browser.after_init()
514
self.stop_runner(force=True)
515
return RunnerManagerState.initializing(self.state.test,
516
self.state.test_group,
517
self.state.group_metadata,
518
self.state.failure_count + 1)
519
520
def get_next_test(self, test_group=None):
521
test = None
522
while test is None:
523
while test_group is None or len(test_group) == 0:
524
test_group, group_metadata = self.test_source.group()
525
if test_group is None:
526
self.logger.info("No more tests")
527
return None, None, None
528
test = test_group.popleft()
529
self.run_count = 0
530
return test, test_group, group_metadata
531
532
def run_test(self):
533
assert isinstance(self.state, RunnerManagerState.running)
534
assert self.state.test is not None
535
536
if self.browser.update_settings(self.state.test):
537
self.logger.info("Restarting browser for new test environment")
538
return RunnerManagerState.restarting(self.state.test,
539
self.state.test_group,
540
self.state.group_metadata)
541
542
self.logger.test_start(self.state.test.id)
543
if self.rerun > 1:
544
self.logger.info("Run %d/%d" % (self.run_count, self.rerun))
545
self.send_message("reset")
546
self.run_count += 1
547
self.send_message("run_test", self.state.test)
548
549
def test_ended(self, test, results):
550
"""Handle the end of a test.
551
552
Output the result of each subtest, and the result of the overall
553
harness to the logs.
554
"""
555
assert isinstance(self.state, RunnerManagerState.running)
556
assert test == self.state.test
557
# Write the result of each subtest
558
file_result, test_results = results
559
subtest_unexpected = False
560
for result in test_results:
561
if test.disabled(result.name):
562
continue
563
expected = test.expected(result.name)
564
is_unexpected = expected != result.status
565
566
if is_unexpected:
567
self.unexpected_count += 1
568
self.logger.debug("Unexpected count in this thread %i" % self.unexpected_count)
569
subtest_unexpected = True
570
self.logger.test_status(test.id,
571
result.name,
572
result.status,
573
message=result.message,
574
expected=expected,
575
stack=result.stack)
576
577
# We have a couple of status codes that are used internally, but not exposed to the
578
# user. These are used to indicate that some possibly-broken state was reached
579
# and we should restart the runner before the next test.
580
# INTERNAL-ERROR indicates a Python exception was caught in the harness
581
# EXTERNAL-TIMEOUT indicates we had to forcibly kill the browser from the harness
582
# because the test didn't return a result after reaching the test-internal timeout
583
status_subns = {"INTERNAL-ERROR": "ERROR",
584
"EXTERNAL-TIMEOUT": "TIMEOUT"}
585
expected = test.expected()
586
status = status_subns.get(file_result.status, file_result.status)
587
588
if self.browser.check_crash(test.id) and status != "CRASH":
589
self.logger.info("Found a crash dump; should change status from %s to CRASH but this causes instability" % (status,))
590
591
self.test_count += 1
592
is_unexpected = expected != status
593
if is_unexpected:
594
self.unexpected_count += 1
595
self.logger.debug("Unexpected count in this thread %i" % self.unexpected_count)
596
597
if "assertion_count" in file_result.extra:
598
assertion_count = file_result.extra.pop("assertion_count")
599
if assertion_count > 0:
600
self.logger.assertion_count(test.id,
601
int(assertion_count),
602
test.min_assertion_count,
603
test.max_assertion_count)
604
605
file_result.extra["test_timeout"] = test.timeout * self.executor_kwargs['timeout_multiplier']
606
607
self.logger.test_end(test.id,
608
status,
609
message=file_result.message,
610
expected=expected,
611
extra=file_result.extra,
612
stack=file_result.stack)
613
614
restart_before_next = (test.restart_after or
615
file_result.status in ("CRASH", "EXTERNAL-TIMEOUT", "INTERNAL-ERROR") or
616
((subtest_unexpected or is_unexpected) and
617
self.restart_on_unexpected))
618
619
if (not file_result.status == "CRASH" and
620
self.pause_after_test or
621
(self.pause_on_unexpected and (subtest_unexpected or is_unexpected))):
622
self.logger.info("Pausing until the browser exits")
623
self.send_message("wait")
624
else:
625
return self.after_test_end(test, restart_before_next)
626
627
def wait_finished(self):
628
assert isinstance(self.state, RunnerManagerState.running)
629
self.logger.debug("Wait finished")
630
631
# The browser should be stopped already, but this ensures we do any
632
# post-stop processing
633
return self.after_test_end(self.state.test, True)
634
635
def after_test_end(self, test, restart):
636
assert isinstance(self.state, RunnerManagerState.running)
637
if self.run_count == self.rerun:
638
test, test_group, group_metadata = self.get_next_test()
639
if test is None:
640
return RunnerManagerState.stop()
641
if test_group != self.state.test_group:
642
# We are starting a new group of tests, so force a restart
643
restart = True
644
else:
645
test_group = self.state.test_group
646
group_metadata = self.state.group_metadata
647
if restart:
648
return RunnerManagerState.restarting(test, test_group, group_metadata)
649
else:
650
return RunnerManagerState.running(test, test_group, group_metadata)
651
652
def restart_runner(self):
653
"""Stop and restart the TestRunner"""
654
assert isinstance(self.state, RunnerManagerState.restarting)
655
self.stop_runner()
656
return RunnerManagerState.initializing(self.state.test, self.state.test_group, self.state.group_metadata, 0)
657
658
def log(self, action, kwargs):
659
getattr(self.logger, action)(**kwargs)
660
661
def error(self, message):
662
self.logger.error(message)
663
self.restart_runner()
664
665
def stop_runner(self, force=False):
666
"""Stop the TestRunner and the browser binary."""
667
if self.test_runner_proc is None:
668
return
669
670
if self.test_runner_proc.is_alive():
671
self.send_message("stop")
672
try:
673
self.browser.stop(force=force)
674
self.ensure_runner_stopped()
675
finally:
676
self.cleanup()
677
678
def teardown(self):
679
self.logger.debug("TestRunnerManager teardown")
680
self.test_runner_proc = None
681
self.command_queue.close()
682
self.remote_queue.close()
683
self.command_queue = None
684
self.remote_queue = None
685
686
def ensure_runner_stopped(self):
687
self.logger.debug("ensure_runner_stopped")
688
if self.test_runner_proc is None:
689
return
690
691
self.logger.debug("waiting for runner process to end")
692
self.test_runner_proc.join(10)
693
self.logger.debug("After join")
694
if self.test_runner_proc.is_alive():
695
# This might leak a file handle from the queue
696
self.logger.warning("Forcibly terminating runner process")
697
self.test_runner_proc.terminate()
698
699
# Multiprocessing queues are backed by operating system pipes. If
700
# the pipe in the child process had buffered data at the time of
701
# forced termination, the queue is no longer in a usable state
702
# (subsequent attempts to retrieve items may block indefinitely).
703
# Discard the potentially-corrupted queue and create a new one.
704
self.command_queue.close()
705
self.command_queue = Queue()
706
self.remote_queue.close()
707
self.remote_queue = Queue()
708
else:
709
self.logger.debug("Runner process exited with code %i" % self.test_runner_proc.exitcode)
710
711
def runner_teardown(self):
712
self.ensure_runner_stopped()
713
return RunnerManagerState.stop()
714
715
def send_message(self, command, *args):
716
self.remote_queue.put((command, args))
717
718
def cleanup(self):
719
self.logger.debug("TestRunnerManager cleanup")
720
if self.browser:
721
self.browser.cleanup()
722
while True:
723
try:
724
cmd, data = self.command_queue.get_nowait()
725
except Empty:
726
break
727
else:
728
if cmd == "log":
729
self.log(*data)
730
elif cmd == "runner_teardown":
731
# It's OK for the "runner_teardown" message to be left in
732
# the queue during cleanup, as we will already have tried
733
# to stop the TestRunner in `stop_runner`.
734
pass
735
else:
736
self.logger.warning("Command left in command_queue during cleanup: %r, %r" % (cmd, data))
737
while True:
738
try:
739
cmd, data = self.remote_queue.get_nowait()
740
self.logger.warning("Command left in remote_queue during cleanup: %r, %r" % (cmd, data))
741
except Empty:
742
break
743
744
745
def make_test_queue(tests, test_source_cls, **test_source_kwargs):
746
queue = test_source_cls.make_queue(tests, **test_source_kwargs)
747
748
# There is a race condition that means sometimes we continue
749
# before the tests have been written to the underlying pipe.
750
# Polling the pipe for data here avoids that
751
queue._reader.poll(10)
752
assert not queue.empty()
753
return queue
754
755
756
class ManagerGroup(object):
757
def __init__(self, suite_name, size, test_source_cls, test_source_kwargs,
758
browser_cls, browser_kwargs,
759
executor_cls, executor_kwargs,
760
rerun=1,
761
pause_after_test=False,
762
pause_on_unexpected=False,
763
restart_on_unexpected=True,
764
debug_info=None,
765
capture_stdio=True):
766
"""Main thread object that owns all the TestRunnerManager threads."""
767
self.suite_name = suite_name
768
self.size = size
769
self.test_source_cls = test_source_cls
770
self.test_source_kwargs = test_source_kwargs
771
self.browser_cls = browser_cls
772
self.browser_kwargs = browser_kwargs
773
self.executor_cls = executor_cls
774
self.executor_kwargs = executor_kwargs
775
self.pause_after_test = pause_after_test
776
self.pause_on_unexpected = pause_on_unexpected
777
self.restart_on_unexpected = restart_on_unexpected
778
self.debug_info = debug_info
779
self.rerun = rerun
780
self.capture_stdio = capture_stdio
781
782
self.pool = set()
783
# Event that is polled by threads so that they can gracefully exit in the face
784
# of sigint
785
self.stop_flag = threading.Event()
786
self.logger = structuredlog.StructuredLogger(suite_name)
787
788
def __enter__(self):
789
return self
790
791
def __exit__(self, exc_type, exc_val, exc_tb):
792
self.stop()
793
794
def run(self, test_type, tests):
795
"""Start all managers in the group"""
796
self.logger.debug("Using %i processes" % self.size)
797
type_tests = tests[test_type]
798
if not type_tests:
799
self.logger.info("No %s tests to run" % test_type)
800
return
801
802
test_queue = make_test_queue(type_tests, self.test_source_cls, **self.test_source_kwargs)
803
804
for _ in range(self.size):
805
manager = TestRunnerManager(self.suite_name,
806
test_queue,
807
self.test_source_cls,
808
self.browser_cls,
809
self.browser_kwargs,
810
self.executor_cls,
811
self.executor_kwargs,
812
self.stop_flag,
813
self.rerun,
814
self.pause_after_test,
815
self.pause_on_unexpected,
816
self.restart_on_unexpected,
817
self.debug_info,
818
self.capture_stdio)
819
manager.start()
820
self.pool.add(manager)
821
self.wait()
822
823
def wait(self):
824
"""Wait for all the managers in the group to finish"""
825
for manager in self.pool:
826
manager.join()
827
828
def stop(self):
829
"""Set the stop flag so that all managers in the group stop as soon
830
as possible"""
831
self.stop_flag.set()
832
self.logger.debug("Stop flag set in ManagerGroup")
833
834
def test_count(self):
835
return sum(manager.test_count for manager in self.pool)
836
837
def unexpected_count(self):
838
return sum(manager.unexpected_count for manager in self.pool)