Source code

Revision control

Other Tools

1
from __future__ import unicode_literals
2
3
import multiprocessing
4
import threading
5
import traceback
6
from six.moves.queue import Empty
7
from collections import namedtuple
8
from multiprocessing import Process, current_process, Queue
9
10
from mozlog import structuredlog, capture
11
12
# Special value used as a sentinal in various commands
13
Stop = object()
14
15
16
def release_mozlog_lock():
17
try:
18
from mozlog.structuredlog import StructuredLogger
19
try:
20
StructuredLogger._lock.release()
21
except threading.ThreadError:
22
pass
23
except ImportError:
24
pass
25
26
27
class MessageLogger(object):
28
def __init__(self, message_func):
29
self.send_message = message_func
30
31
def _log_data(self, action, **kwargs):
32
self.send_message("log", action, kwargs)
33
34
def process_output(self, process, data, command):
35
self._log_data("process_output", process=process, data=data, command=command)
36
37
38
def _log_func(level_name):
39
def log(self, message):
40
self._log_data(level_name.lower(), message=message)
41
log.__doc__ = """Log a message with level %s
42
43
:param message: The string message to log
44
""" % level_name
45
log.__name__ = str(level_name).lower()
46
return log
47
48
# Create all the methods on StructuredLog for debug levels
49
for level_name in structuredlog.log_levels:
50
setattr(MessageLogger, level_name.lower(), _log_func(level_name))
51
52
53
class TestRunner(object):
54
"""Class implementing the main loop for running tests.
55
56
This class delegates the job of actually running a test to the executor
57
that is passed in.
58
59
:param logger: Structured logger
60
:param command_queue: subprocess.Queue used to send commands to the
61
process
62
:param result_queue: subprocess.Queue used to send results to the
63
parent TestRunnerManager process
64
:param executor: TestExecutor object that will actually run a test.
65
"""
66
def __init__(self, logger, command_queue, result_queue, executor):
67
self.command_queue = command_queue
68
self.result_queue = result_queue
69
70
self.executor = executor
71
self.name = current_process().name
72
self.logger = logger
73
74
def __enter__(self):
75
return self
76
77
def __exit__(self, exc_type, exc_value, traceback):
78
self.teardown()
79
80
def setup(self):
81
self.logger.debug("Executor setup")
82
try:
83
self.executor.setup(self)
84
except Exception:
85
# The caller is responsible for logging the exception if required
86
self.send_message("init_failed")
87
else:
88
self.send_message("init_succeeded")
89
self.logger.debug("Executor setup done")
90
91
def teardown(self):
92
self.executor.teardown()
93
self.send_message("runner_teardown")
94
self.result_queue = None
95
self.command_queue = None
96
self.browser = None
97
98
def run(self):
99
"""Main loop accepting commands over the pipe and triggering
100
the associated methods"""
101
self.setup()
102
commands = {"run_test": self.run_test,
103
"reset": self.reset,
104
"stop": self.stop,
105
"wait": self.wait}
106
while True:
107
command, args = self.command_queue.get()
108
try:
109
rv = commands[command](*args)
110
except Exception:
111
self.send_message("error",
112
"Error running command %s with arguments %r:\n%s" %
113
(command, args, traceback.format_exc()))
114
else:
115
if rv is Stop:
116
break
117
118
def stop(self):
119
return Stop
120
121
def reset(self):
122
self.executor.reset()
123
124
def run_test(self, test):
125
try:
126
return self.executor.run_test(test)
127
except Exception:
128
self.logger.critical(traceback.format_exc())
129
raise
130
131
def wait(self):
132
self.executor.wait()
133
self.send_message("wait_finished")
134
135
def send_message(self, command, *args):
136
self.result_queue.put((command, args))
137
138
139
def start_runner(runner_command_queue, runner_result_queue,
140
executor_cls, executor_kwargs,
141
executor_browser_cls, executor_browser_kwargs,
142
capture_stdio, stop_flag):
143
"""Launch a TestRunner in a new process"""
144
145
def send_message(command, *args):
146
runner_result_queue.put((command, args))
147
148
def handle_error(e):
149
logger.critical(traceback.format_exc())
150
stop_flag.set()
151
152
# Ensure that when we start this in a new process we have the global lock
153
# in the logging module unlocked
154
release_mozlog_lock()
155
156
logger = MessageLogger(send_message)
157
158
with capture.CaptureIO(logger, capture_stdio):
159
try:
160
browser = executor_browser_cls(**executor_browser_kwargs)
161
executor = executor_cls(browser, **executor_kwargs)
162
with TestRunner(logger, runner_command_queue, runner_result_queue, executor) as runner:
163
try:
164
runner.run()
165
except KeyboardInterrupt:
166
stop_flag.set()
167
except Exception as e:
168
handle_error(e)
169
except Exception as e:
170
handle_error(e)
171
172
173
manager_count = 0
174
175
176
def next_manager_number():
177
global manager_count
178
local = manager_count = manager_count + 1
179
return local
180
181
182
class BrowserManager(object):
183
def __init__(self, logger, browser, command_queue, no_timeout=False):
184
self.logger = logger
185
self.browser = browser
186
self.no_timeout = no_timeout
187
self.browser_settings = None
188
self.last_test = None
189
190
self.started = False
191
192
self.init_timer = None
193
self.command_queue = command_queue
194
195
def update_settings(self, test):
196
browser_settings = self.browser.settings(test)
197
restart_required = ((self.browser_settings is not None and
198
self.browser_settings != browser_settings) or
199
(self.last_test != test and test.expected() == "CRASH"))
200
self.browser_settings = browser_settings
201
self.last_test = test
202
return restart_required
203
204
def init(self, group_metadata):
205
"""Launch the browser that is being tested,
206
and the TestRunner process that will run the tests."""
207
# It seems that this lock is helpful to prevent some race that otherwise
208
# sometimes stops the spawned processes initialising correctly, and
209
# leaves this thread hung
210
if self.init_timer is not None:
211
self.init_timer.cancel()
212
213
self.logger.debug("Init called, starting browser and runner")
214
215
if not self.no_timeout:
216
self.init_timer = threading.Timer(self.browser.init_timeout,
217
self.init_timeout)
218
try:
219
if self.init_timer is not None:
220
self.init_timer.start()
221
self.logger.debug("Starting browser with settings %r" % self.browser_settings)
222
self.browser.start(group_metadata=group_metadata, **self.browser_settings)
223
self.browser_pid = self.browser.pid()
224
except Exception:
225
self.logger.warning("Failure during init %s" % traceback.format_exc())
226
if self.init_timer is not None:
227
self.init_timer.cancel()
228
self.logger.error(traceback.format_exc())
229
succeeded = False
230
else:
231
succeeded = True
232
self.started = True
233
234
return succeeded
235
236
def send_message(self, command, *args):
237
self.command_queue.put((command, args))
238
239
def init_timeout(self):
240
# This is called from a separate thread, so we send a message to the
241
# main loop so we get back onto the manager thread
242
self.logger.debug("init_failed called from timer")
243
self.send_message("init_failed")
244
245
def after_init(self):
246
"""Callback when we have started the browser, started the remote
247
control connection, and we are ready to start testing."""
248
if self.init_timer is not None:
249
self.init_timer.cancel()
250
251
def stop(self, force=False):
252
self.browser.stop(force=force)
253
self.started = False
254
255
def cleanup(self):
256
if self.init_timer is not None:
257
self.init_timer.cancel()
258
259
def check_crash(self, test_id):
260
return self.browser.check_crash(process=self.browser_pid, test=test_id)
261
262
def is_alive(self):
263
return self.browser.is_alive()
264
265
266
class _RunnerManagerState(object):
267
before_init = namedtuple("before_init", [])
268
initializing = namedtuple("initializing_browser",
269
["test", "test_group", "group_metadata", "failure_count"])
270
running = namedtuple("running", ["test", "test_group", "group_metadata"])
271
restarting = namedtuple("restarting", ["test", "test_group", "group_metadata"])
272
error = namedtuple("error", [])
273
stop = namedtuple("stop", [])
274
275
276
RunnerManagerState = _RunnerManagerState()
277
278
279
class TestRunnerManager(threading.Thread):
280
def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs,
281
executor_cls, executor_kwargs, stop_flag, rerun=1, pause_after_test=False,
282
pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None,
283
capture_stdio=True):
284
"""Thread that owns a single TestRunner process and any processes required
285
by the TestRunner (e.g. the Firefox binary).
286
287
TestRunnerManagers are responsible for launching the browser process and the
288
runner process, and for logging the test progress. The actual test running
289
is done by the TestRunner. In particular they:
290
291
* Start the binary of the program under test
292
* Start the TestRunner
293
* Tell the TestRunner to start a test, if any
294
* Log that the test started
295
* Log the test results
296
* Take any remedial action required e.g. restart crashed or hung
297
processes
298
"""
299
self.suite_name = suite_name
300
301
self.test_source = test_source_cls(test_queue)
302
303
self.browser_cls = browser_cls
304
self.browser_kwargs = browser_kwargs
305
306
self.executor_cls = executor_cls
307
self.executor_kwargs = executor_kwargs
308
309
# Flags used to shut down this thread if we get a sigint
310
self.parent_stop_flag = stop_flag
311
self.child_stop_flag = multiprocessing.Event()
312
313
self.rerun = rerun
314
self.run_count = 0
315
self.pause_after_test = pause_after_test
316
self.pause_on_unexpected = pause_on_unexpected
317
self.restart_on_unexpected = restart_on_unexpected
318
self.debug_info = debug_info
319
320
self.manager_number = next_manager_number()
321
322
self.command_queue = Queue()
323
self.remote_queue = Queue()
324
325
self.test_runner_proc = None
326
327
threading.Thread.__init__(self, name="TestRunnerManager-%i" % self.manager_number)
328
# This is started in the actual new thread
329
self.logger = None
330
331
self.test_count = 0
332
self.unexpected_count = 0
333
334
# This may not really be what we want
335
self.daemon = True
336
337
self.timer = None
338
339
self.max_restarts = 5
340
341
self.browser = None
342
343
self.capture_stdio = capture_stdio
344
345
def run(self):
346
"""Main loop for the TestRunnerManager.
347
348
TestRunnerManagers generally receive commands from their
349
TestRunner updating them on the status of a test. They
350
may also have a stop flag set by the main thread indicating
351
that the manager should shut down the next time the event loop
352
spins."""
353
self.logger = structuredlog.StructuredLogger(self.suite_name)
354
with self.browser_cls(self.logger, **self.browser_kwargs) as browser:
355
self.browser = BrowserManager(self.logger,
356
browser,
357
self.command_queue,
358
no_timeout=self.debug_info is not None)
359
dispatch = {
360
RunnerManagerState.before_init: self.start_init,
361
RunnerManagerState.initializing: self.init,
362
RunnerManagerState.running: self.run_test,
363
RunnerManagerState.restarting: self.restart_runner
364
}
365
366
self.state = RunnerManagerState.before_init()
367
end_states = (RunnerManagerState.stop,
368
RunnerManagerState.error)
369
370
try:
371
while not isinstance(self.state, end_states):
372
f = dispatch.get(self.state.__class__)
373
while f:
374
self.logger.debug("Dispatch %s" % f.__name__)
375
if self.should_stop():
376
return
377
new_state = f()
378
if new_state is None:
379
break
380
self.state = new_state
381
self.logger.debug("new state: %s" % self.state.__class__.__name__)
382
if isinstance(self.state, end_states):
383
return
384
f = dispatch.get(self.state.__class__)
385
386
new_state = None
387
while new_state is None:
388
new_state = self.wait_event()
389
if self.should_stop():
390
return
391
self.state = new_state
392
self.logger.debug("new state: %s" % self.state.__class__.__name__)
393
except Exception as e:
394
self.logger.error(traceback.format_exc(e))
395
raise
396
finally:
397
self.logger.debug("TestRunnerManager main loop terminating, starting cleanup")
398
clean = isinstance(self.state, RunnerManagerState.stop)
399
self.stop_runner(force=not clean)
400
self.teardown()
401
self.logger.debug("TestRunnerManager main loop terminated")
402
403
def wait_event(self):
404
dispatch = {
405
RunnerManagerState.before_init: {},
406
RunnerManagerState.initializing:
407
{
408
"init_succeeded": self.init_succeeded,
409
"init_failed": self.init_failed,
410
},
411
RunnerManagerState.running:
412
{
413
"test_ended": self.test_ended,
414
"wait_finished": self.wait_finished,
415
},
416
RunnerManagerState.restarting: {},
417
RunnerManagerState.error: {},
418
RunnerManagerState.stop: {},
419
None: {
420
"runner_teardown": self.runner_teardown,
421
"log": self.log,
422
"error": self.error
423
}
424
}
425
try:
426
command, data = self.command_queue.get(True, 1)
427
self.logger.debug("Got command: %r" % command)
428
except IOError:
429
self.logger.error("Got IOError from poll")
430
return RunnerManagerState.restarting(0)
431
except Empty:
432
if (self.debug_info and self.debug_info.interactive and
433
self.browser.started and not self.browser.is_alive()):
434
self.logger.debug("Debugger exited")
435
return RunnerManagerState.stop()
436
437
if (isinstance(self.state, RunnerManagerState.running) and
438
not self.test_runner_proc.is_alive()):
439
if not self.command_queue.empty():
440
# We got a new message so process that
441
return
442
443
# If we got to here the runner presumably shut down
444
# unexpectedly
445
self.logger.info("Test runner process shut down")
446
447
if self.state.test is not None:
448
# This could happen if the test runner crashed for some other
449
# reason
450
# Need to consider the unlikely case where one test causes the
451
# runner process to repeatedly die
452
self.logger.critical("Last test did not complete")
453
return RunnerManagerState.error()
454
self.logger.warning("More tests found, but runner process died, restarting")
455
return RunnerManagerState.restarting(0)
456
else:
457
f = (dispatch.get(self.state.__class__, {}).get(command) or
458
dispatch.get(None, {}).get(command))
459
if not f:
460
self.logger.warning("Got command %s in state %s" %
461
(command, self.state.__class__.__name__))
462
return
463
return f(*data)
464
465
def should_stop(self):
466
return self.child_stop_flag.is_set() or self.parent_stop_flag.is_set()
467
468
def start_init(self):
469
test, test_group, group_metadata = self.get_next_test()
470
if test is None:
471
return RunnerManagerState.stop()
472
else:
473
return RunnerManagerState.initializing(test, test_group, group_metadata, 0)
474
475
def init(self):
476
assert isinstance(self.state, RunnerManagerState.initializing)
477
if self.state.failure_count > self.max_restarts:
478
self.logger.critical("Max restarts exceeded")
479
return RunnerManagerState.error()
480
481
self.browser.update_settings(self.state.test)
482
483
result = self.browser.init(self.state.group_metadata)
484
if result is Stop:
485
return RunnerManagerState.error()
486
elif not result:
487
return RunnerManagerState.initializing(self.state.test,
488
self.state.test_group,
489
self.state.group_metadata,
490
self.state.failure_count + 1)
491
else:
492
self.executor_kwargs["group_metadata"] = self.state.group_metadata
493
self.start_test_runner()
494
495
def start_test_runner(self):
496
# Note that we need to be careful to start the browser before the
497
# test runner to ensure that any state set when the browser is started
498
# can be passed in to the test runner.
499
assert isinstance(self.state, RunnerManagerState.initializing)
500
assert self.command_queue is not None
501
assert self.remote_queue is not None
502
self.logger.info("Starting runner")
503
executor_browser_cls, executor_browser_kwargs = self.browser.browser.executor_browser()
504
505
args = (self.remote_queue,
506
self.command_queue,
507
self.executor_cls,
508
self.executor_kwargs,
509
executor_browser_cls,
510
executor_browser_kwargs,
511
self.capture_stdio,
512
self.child_stop_flag)
513
self.test_runner_proc = Process(target=start_runner,
514
args=args,
515
name="TestRunner-%i" % self.manager_number)
516
self.test_runner_proc.start()
517
self.logger.debug("Test runner started")
518
# Now we wait for either an init_succeeded event or an init_failed event
519
520
def init_succeeded(self):
521
assert isinstance(self.state, RunnerManagerState.initializing)
522
self.browser.after_init()
523
return RunnerManagerState.running(self.state.test,
524
self.state.test_group,
525
self.state.group_metadata)
526
527
def init_failed(self):
528
assert isinstance(self.state, RunnerManagerState.initializing)
529
self.browser.check_crash(None)
530
self.browser.after_init()
531
self.stop_runner(force=True)
532
return RunnerManagerState.initializing(self.state.test,
533
self.state.test_group,
534
self.state.group_metadata,
535
self.state.failure_count + 1)
536
537
def get_next_test(self, test_group=None):
538
test = None
539
while test is None:
540
while test_group is None or len(test_group) == 0:
541
test_group, group_metadata = self.test_source.group()
542
if test_group is None:
543
self.logger.info("No more tests")
544
return None, None, None
545
test = test_group.popleft()
546
self.run_count = 0
547
return test, test_group, group_metadata
548
549
def run_test(self):
550
assert isinstance(self.state, RunnerManagerState.running)
551
assert self.state.test is not None
552
553
if self.browser.update_settings(self.state.test):
554
self.logger.info("Restarting browser for new test environment")
555
return RunnerManagerState.restarting(self.state.test,
556
self.state.test_group,
557
self.state.group_metadata)
558
559
self.logger.test_start(self.state.test.id)
560
if self.rerun > 1:
561
self.logger.info("Run %d/%d" % (self.run_count, self.rerun))
562
self.send_message("reset")
563
self.run_count += 1
564
if self.debug_info is None:
565
# Factor of 3 on the extra timeout here is based on allowing the executor
566
# at least test.timeout + 2 * extra_timeout to complete,
567
# which in turn is based on having several layers of timeout inside the executor
568
wait_timeout = (self.state.test.timeout * self.executor_kwargs['timeout_multiplier'] +
569
3 * self.executor_cls.extra_timeout)
570
self.timer = threading.Timer(wait_timeout, self._timeout)
571
572
self.send_message("run_test", self.state.test)
573
if self.timer:
574
self.timer.start()
575
576
def _timeout(self):
577
# This is executed in a different thread (threading.Timer).
578
self.logger.info("Got timeout in harness")
579
test = self.state.test
580
self.inject_message(
581
"test_ended",
582
test,
583
(test.result_cls("EXTERNAL-TIMEOUT",
584
"TestRunner hit external timeout "
585
"(this may indicate a hang)"), []),
586
)
587
588
def test_ended(self, test, results):
589
"""Handle the end of a test.
590
591
Output the result of each subtest, and the result of the overall
592
harness to the logs.
593
"""
594
if ((not isinstance(self.state, RunnerManagerState.running)) or
595
(test != self.state.test)):
596
# Due to inherent race conditions in EXTERNAL-TIMEOUT, we might
597
# receive multiple test_ended for a test (e.g. from both Executor
598
# and TestRunner), in which case we ignore the duplicate message.
599
self.logger.error("Received unexpected test_ended for %s" % test)
600
return
601
if self.timer is not None:
602
self.timer.cancel()
603
# Write the result of each subtest
604
file_result, test_results = results
605
subtest_unexpected = False
606
expect_any_subtest_status = test.expect_any_subtest_status()
607
if expect_any_subtest_status:
608
self.logger.debug("Ignoring subtest statuses for test %s" % test.id)
609
for result in test_results:
610
if test.disabled(result.name):
611
continue
612
if expect_any_subtest_status:
613
expected = result.status
614
else:
615
expected = test.expected(result.name)
616
known_intermittent = test.known_intermittent(result.name)
617
is_unexpected = expected != result.status and result.status not in known_intermittent
618
619
if is_unexpected:
620
self.unexpected_count += 1
621
self.logger.debug("Unexpected count in this thread %i" % self.unexpected_count)
622
subtest_unexpected = True
623
self.logger.test_status(test.id,
624
result.name,
625
result.status,
626
message=result.message,
627
expected=expected,
628
known_intermittent=known_intermittent,
629
stack=result.stack)
630
631
# We have a couple of status codes that are used internally, but not exposed to the
632
# user. These are used to indicate that some possibly-broken state was reached
633
# and we should restart the runner before the next test.
634
# INTERNAL-ERROR indicates a Python exception was caught in the harness
635
# EXTERNAL-TIMEOUT indicates we had to forcibly kill the browser from the harness
636
# because the test didn't return a result after reaching the test-internal timeout
637
status_subns = {"INTERNAL-ERROR": "ERROR",
638
"EXTERNAL-TIMEOUT": "TIMEOUT"}
639
expected = test.expected()
640
known_intermittent = test.known_intermittent()
641
status = status_subns.get(file_result.status, file_result.status)
642
643
if self.browser.check_crash(test.id) and status != "CRASH":
644
self.logger.info("Found a crash dump; should change status from %s to CRASH but this causes instability" % (status,))
645
646
self.test_count += 1
647
is_unexpected = expected != status and status not in known_intermittent
648
if is_unexpected:
649
self.unexpected_count += 1
650
self.logger.debug("Unexpected count in this thread %i" % self.unexpected_count)
651
652
if "assertion_count" in file_result.extra:
653
assertion_count = file_result.extra.pop("assertion_count")
654
if assertion_count > 0:
655
self.logger.assertion_count(test.id,
656
int(assertion_count),
657
test.min_assertion_count,
658
test.max_assertion_count)
659
660
file_result.extra["test_timeout"] = test.timeout * self.executor_kwargs['timeout_multiplier']
661
662
self.logger.test_end(test.id,
663
status,
664
message=file_result.message,
665
expected=expected,
666
known_intermittent=known_intermittent,
667
extra=file_result.extra,
668
stack=file_result.stack)
669
670
restart_before_next = (test.restart_after or
671
file_result.status in ("CRASH", "EXTERNAL-TIMEOUT", "INTERNAL-ERROR") or
672
((subtest_unexpected or is_unexpected) and
673
self.restart_on_unexpected))
674
675
if (not file_result.status == "CRASH" and
676
self.pause_after_test or
677
(self.pause_on_unexpected and (subtest_unexpected or is_unexpected))):
678
self.logger.info("Pausing until the browser exits")
679
self.send_message("wait")
680
else:
681
return self.after_test_end(test, restart_before_next)
682
683
def wait_finished(self):
684
assert isinstance(self.state, RunnerManagerState.running)
685
self.logger.debug("Wait finished")
686
687
# The browser should be stopped already, but this ensures we do any
688
# post-stop processing
689
return self.after_test_end(self.state.test, True)
690
691
def after_test_end(self, test, restart):
692
assert isinstance(self.state, RunnerManagerState.running)
693
if self.run_count == self.rerun:
694
test, test_group, group_metadata = self.get_next_test()
695
if test is None:
696
return RunnerManagerState.stop()
697
if test_group != self.state.test_group:
698
# We are starting a new group of tests, so force a restart
699
restart = True
700
else:
701
test_group = self.state.test_group
702
group_metadata = self.state.group_metadata
703
if restart:
704
return RunnerManagerState.restarting(test, test_group, group_metadata)
705
else:
706
return RunnerManagerState.running(test, test_group, group_metadata)
707
708
def restart_runner(self):
709
"""Stop and restart the TestRunner"""
710
assert isinstance(self.state, RunnerManagerState.restarting)
711
self.stop_runner()
712
return RunnerManagerState.initializing(self.state.test, self.state.test_group, self.state.group_metadata, 0)
713
714
def log(self, action, kwargs):
715
getattr(self.logger, action)(**kwargs)
716
717
def error(self, message):
718
self.logger.error(message)
719
self.restart_runner()
720
721
def stop_runner(self, force=False):
722
"""Stop the TestRunner and the browser binary."""
723
if self.test_runner_proc is None:
724
return
725
726
if self.test_runner_proc.is_alive():
727
self.send_message("stop")
728
try:
729
self.browser.stop(force=force)
730
self.ensure_runner_stopped()
731
finally:
732
self.cleanup()
733
734
def teardown(self):
735
self.logger.debug("TestRunnerManager teardown")
736
self.test_runner_proc = None
737
self.command_queue.close()
738
self.remote_queue.close()
739
self.command_queue = None
740
self.remote_queue = None
741
742
def ensure_runner_stopped(self):
743
self.logger.debug("ensure_runner_stopped")
744
if self.test_runner_proc is None:
745
return
746
747
self.browser.stop(force=True)
748
self.logger.debug("waiting for runner process to end")
749
self.test_runner_proc.join(10)
750
self.logger.debug("After join")
751
if self.test_runner_proc.is_alive():
752
# This might leak a file handle from the queue
753
self.logger.warning("Forcibly terminating runner process")
754
self.test_runner_proc.terminate()
755
756
# Multiprocessing queues are backed by operating system pipes. If
757
# the pipe in the child process had buffered data at the time of
758
# forced termination, the queue is no longer in a usable state
759
# (subsequent attempts to retrieve items may block indefinitely).
760
# Discard the potentially-corrupted queue and create a new one.
761
self.command_queue.close()
762
self.command_queue = Queue()
763
self.remote_queue.close()
764
self.remote_queue = Queue()
765
else:
766
self.logger.debug("Runner process exited with code %i" % self.test_runner_proc.exitcode)
767
768
def runner_teardown(self):
769
self.ensure_runner_stopped()
770
return RunnerManagerState.stop()
771
772
def send_message(self, command, *args):
773
"""Send a message to the remote queue (to Executor)."""
774
self.remote_queue.put((command, args))
775
776
def inject_message(self, command, *args):
777
"""Inject a message to the command queue (from Executor)."""
778
self.command_queue.put((command, args))
779
780
def cleanup(self):
781
self.logger.debug("TestRunnerManager cleanup")
782
if self.browser:
783
self.browser.cleanup()
784
while True:
785
try:
786
cmd, data = self.command_queue.get_nowait()
787
except Empty:
788
break
789
else:
790
if cmd == "log":
791
self.log(*data)
792
elif cmd == "runner_teardown":
793
# It's OK for the "runner_teardown" message to be left in
794
# the queue during cleanup, as we will already have tried
795
# to stop the TestRunner in `stop_runner`.
796
pass
797
else:
798
self.logger.warning("Command left in command_queue during cleanup: %r, %r" % (cmd, data))
799
while True:
800
try:
801
cmd, data = self.remote_queue.get_nowait()
802
self.logger.warning("Command left in remote_queue during cleanup: %r, %r" % (cmd, data))
803
except Empty:
804
break
805
806
807
def make_test_queue(tests, test_source_cls, **test_source_kwargs):
808
queue = test_source_cls.make_queue(tests, **test_source_kwargs)
809
810
# There is a race condition that means sometimes we continue
811
# before the tests have been written to the underlying pipe.
812
# Polling the pipe for data here avoids that
813
queue._reader.poll(10)
814
assert not queue.empty()
815
return queue
816
817
818
class ManagerGroup(object):
819
"""Main thread object that owns all the TestRunnerManager threads."""
820
def __init__(self, suite_name, size, test_source_cls, test_source_kwargs,
821
browser_cls, browser_kwargs,
822
executor_cls, executor_kwargs,
823
rerun=1,
824
pause_after_test=False,
825
pause_on_unexpected=False,
826
restart_on_unexpected=True,
827
debug_info=None,
828
capture_stdio=True):
829
self.suite_name = suite_name
830
self.size = size
831
self.test_source_cls = test_source_cls
832
self.test_source_kwargs = test_source_kwargs
833
self.browser_cls = browser_cls
834
self.browser_kwargs = browser_kwargs
835
self.executor_cls = executor_cls
836
self.executor_kwargs = executor_kwargs
837
self.pause_after_test = pause_after_test
838
self.pause_on_unexpected = pause_on_unexpected
839
self.restart_on_unexpected = restart_on_unexpected
840
self.debug_info = debug_info
841
self.rerun = rerun
842
self.capture_stdio = capture_stdio
843
844
self.pool = set()
845
# Event that is polled by threads so that they can gracefully exit in the face
846
# of sigint
847
self.stop_flag = threading.Event()
848
self.logger = structuredlog.StructuredLogger(suite_name)
849
850
def __enter__(self):
851
return self
852
853
def __exit__(self, exc_type, exc_val, exc_tb):
854
self.stop()
855
856
def run(self, test_type, tests):
857
"""Start all managers in the group"""
858
self.logger.debug("Using %i processes" % self.size)
859
type_tests = tests[test_type]
860
if not type_tests:
861
self.logger.info("No %s tests to run" % test_type)
862
return
863
864
test_queue = make_test_queue(type_tests, self.test_source_cls, **self.test_source_kwargs)
865
866
for _ in range(self.size):
867
manager = TestRunnerManager(self.suite_name,
868
test_queue,
869
self.test_source_cls,
870
self.browser_cls,
871
self.browser_kwargs,
872
self.executor_cls,
873
self.executor_kwargs,
874
self.stop_flag,
875
self.rerun,
876
self.pause_after_test,
877
self.pause_on_unexpected,
878
self.restart_on_unexpected,
879
self.debug_info,
880
self.capture_stdio)
881
manager.start()
882
self.pool.add(manager)
883
self.wait()
884
885
def wait(self):
886
"""Wait for all the managers in the group to finish"""
887
for manager in self.pool:
888
manager.join()
889
890
def stop(self):
891
"""Set the stop flag so that all managers in the group stop as soon
892
as possible"""
893
self.stop_flag.set()
894
self.logger.debug("Stop flag set in ManagerGroup")
895
896
def test_count(self):
897
return sum(manager.test_count for manager in self.pool)
898
899
def unexpected_count(self):
900
return sum(manager.unexpected_count for manager in self.pool)