Source code

Revision control

Copy as Markdown

Other Tools

Test Info:

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import sys
import time
import six
from marionette_driver import errors, wait
from marionette_driver.wait import Wait
from marionette_harness import MarionetteTestCase
class TickingClock(object):
def __init__(self, incr=1):
self.ticks = 0
self.increment = incr
def sleep(self, dur=None):
dur = dur if dur is not None else self.increment
self.ticks += dur
@property
def now(self):
return self.ticks
class SequenceClock(object):
def __init__(self, times):
self.times = times
self.i = 0
@property
def now(self):
if len(self.times) > self.i:
self.i += 1
return self.times[self.i - 1]
def sleep(self, dur):
pass
class MockMarionette(object):
def __init__(self):
self.waited = 0
def exception(self, e=None, wait=1):
self.wait()
if self.waited == wait:
if e is None:
e = Exception
raise e
def true(self, wait=1):
self.wait()
if self.waited == wait:
return True
return None
def false(self, wait=1):
self.wait()
return False
def none(self, wait=1):
self.wait()
return None
def value(self, value, wait=1):
self.wait()
if self.waited == wait:
return value
return None
def wait(self):
self.waited += 1
def at_third_attempt(clock, end):
return clock.now == 2
def now(clock, end):
return True
class SystemClockTest(MarionetteTestCase):
def setUp(self):
super(SystemClockTest, self).setUp()
self.clock = wait.SystemClock()
def test_construction_initializes_time(self):
self.assertEqual(self.clock._time, time)
def test_sleep(self):
start = time.time()
self.clock.sleep(0.1)
end = time.time() - start
self.assertGreater(end, 0)
def test_time_now(self):
self.assertIsNotNone(self.clock.now)
class FormalWaitTest(MarionetteTestCase):
def setUp(self):
super(FormalWaitTest, self).setUp()
self.m = MockMarionette()
self.m.timeout = 123
def test_construction_with_custom_timeout(self):
wt = Wait(self.m, timeout=42)
self.assertEqual(wt.timeout, 42)
def test_construction_with_custom_interval(self):
wt = Wait(self.m, interval=42)
self.assertEqual(wt.interval, 42)
def test_construction_with_custom_clock(self):
c = TickingClock(1)
wt = Wait(self.m, clock=c)
self.assertEqual(wt.clock, c)
def test_construction_with_custom_exception(self):
wt = Wait(self.m, ignored_exceptions=Exception)
self.assertIn(Exception, wt.exceptions)
self.assertEqual(len(wt.exceptions), 1)
def test_construction_with_custom_exception_list(self):
exc = [Exception, ValueError]
wt = Wait(self.m, ignored_exceptions=exc)
for e in exc:
self.assertIn(e, wt.exceptions)
self.assertEqual(len(wt.exceptions), len(exc))
def test_construction_with_custom_exception_tuple(self):
exc = (Exception, ValueError)
wt = Wait(self.m, ignored_exceptions=exc)
for e in exc:
self.assertIn(e, wt.exceptions)
self.assertEqual(len(wt.exceptions), len(exc))
def test_duplicate_exceptions(self):
wt = Wait(self.m, ignored_exceptions=[Exception, Exception])
self.assertIn(Exception, wt.exceptions)
self.assertEqual(len(wt.exceptions), 1)
def test_default_timeout(self):
self.assertEqual(wait.DEFAULT_TIMEOUT, 5)
def test_default_interval(self):
self.assertEqual(wait.DEFAULT_INTERVAL, 0.1)
def test_end_property(self):
wt = Wait(self.m)
self.assertIsNotNone(wt.end)
def test_marionette_property(self):
wt = Wait(self.m)
self.assertEqual(wt.marionette, self.m)
def test_clock_property(self):
wt = Wait(self.m)
self.assertIsInstance(wt.clock, wait.SystemClock)
def test_timeout_uses_default_if_marionette_timeout_is_none(self):
self.m.timeout = None
wt = Wait(self.m)
self.assertEqual(wt.timeout, wait.DEFAULT_TIMEOUT)
class PredicatesTest(MarionetteTestCase):
def test_until(self):
c = wait.SystemClock()
self.assertFalse(wait.until_pred(c, six.MAXSIZE))
self.assertTrue(wait.until_pred(c, 0))
class WaitUntilTest(MarionetteTestCase):
def setUp(self):
super(WaitUntilTest, self).setUp()
self.m = MockMarionette()
self.clock = TickingClock()
self.wt = Wait(self.m, timeout=10, interval=1, clock=self.clock)
def test_true(self):
r = self.wt.until(lambda x: x.true())
self.assertTrue(r)
self.assertEqual(self.clock.ticks, 0)
def test_true_within_timeout(self):
r = self.wt.until(lambda x: x.true(wait=5))
self.assertTrue(r)
self.assertEqual(self.clock.ticks, 4)
def test_timeout(self):
with self.assertRaises(errors.TimeoutException):
r = self.wt.until(lambda x: x.true(wait=15))
self.assertEqual(self.clock.ticks, 10)
def test_exception_raises_immediately(self):
with self.assertRaises(TypeError):
self.wt.until(lambda x: x.exception(e=TypeError))
self.assertEqual(self.clock.ticks, 0)
def test_ignored_exception(self):
self.wt.exceptions = (TypeError,)
with self.assertRaises(errors.TimeoutException):
self.wt.until(lambda x: x.exception(e=TypeError))
def test_ignored_exception_wrapped_in_timeoutexception(self):
self.wt.exceptions = (TypeError,)
exc = None
try:
self.wt.until(lambda x: x.exception(e=TypeError))
except Exception as e:
exc = e
s = str(exc)
self.assertIsNotNone(exc)
self.assertIsInstance(exc, errors.TimeoutException)
self.assertIn(", caused by {0!r}".format(TypeError), s)
self.assertIn("self.wt.until(lambda x: x.exception(e=TypeError))", s)
def test_ignored_exception_after_timeout_is_not_raised(self):
with self.assertRaises(errors.TimeoutException):
r = self.wt.until(lambda x: x.exception(wait=15))
self.assertEqual(self.clock.ticks, 10)
def test_keyboard_interrupt(self):
with self.assertRaises(KeyboardInterrupt):
self.wt.until(lambda x: x.exception(e=KeyboardInterrupt))
def test_system_exit(self):
with self.assertRaises(SystemExit):
self.wt.until(lambda x: x.exception(SystemExit))
def test_true_condition_returns_immediately(self):
r = self.wt.until(lambda x: x.true())
self.assertIsInstance(r, bool)
self.assertTrue(r)
self.assertEqual(self.clock.ticks, 0)
def test_value(self):
r = self.wt.until(lambda x: "foo")
self.assertEqual(r, "foo")
self.assertEqual(self.clock.ticks, 0)
def test_custom_predicate(self):
r = self.wt.until(lambda x: x.true(wait=2), is_true=at_third_attempt)
self.assertTrue(r)
self.assertEqual(self.clock.ticks, 1)
def test_custom_predicate_times_out(self):
with self.assertRaises(errors.TimeoutException):
self.wt.until(lambda x: x.true(wait=4), is_true=at_third_attempt)
self.assertEqual(self.clock.ticks, 2)
def test_timeout_elapsed_duration(self):
with self.assertRaisesRegex(
errors.TimeoutException, "Timed out after 2.0 seconds"
):
self.wt.until(lambda x: x.true(wait=4), is_true=at_third_attempt)
def test_timeout_elapsed_rounding(self):
wt = Wait(self.m, clock=SequenceClock([1, 0.01, 1]), timeout=0)
with self.assertRaisesRegex(
errors.TimeoutException, "Timed out after 1.0 seconds"
):
wt.until(lambda x: x.true(), is_true=now)
def test_timeout_elapsed_interval_by_delayed_condition_return(self):
def callback(mn):
self.clock.sleep(11)
return mn.false()
with self.assertRaisesRegex(
errors.TimeoutException, "Timed out after 11.0 seconds"
):
self.wt.until(callback)
# With a delayed conditional return > timeout, only 1 iteration is
# possible
self.assertEqual(self.m.waited, 1)
def test_timeout_with_delayed_condition_return(self):
def callback(mn):
self.clock.sleep(0.5)
return mn.false()
with self.assertRaisesRegex(
errors.TimeoutException, "Timed out after 10.0 seconds"
):
self.wt.until(callback)
# With a delayed conditional return < interval, 10 iterations should be
# possible
self.assertEqual(self.m.waited, 10)
def test_timeout_interval_shorter_than_delayed_condition_return(self):
def callback(mn):
self.clock.sleep(2)
return mn.false()
with self.assertRaisesRegex(
errors.TimeoutException, "Timed out after 10.0 seconds"
):
self.wt.until(callback)
# With a delayed return of the conditional which takes twice that long than the interval,
# half of the iterations should be possible
self.assertEqual(self.m.waited, 5)
def test_message(self):
self.wt.exceptions = (TypeError,)
exc = None
try:
self.wt.until(lambda x: x.exception(e=TypeError), message="hooba")
except errors.TimeoutException as e:
exc = e
result = str(exc)
self.assertIn("seconds with message: hooba, caused by", result)
def test_no_message(self):
self.wt.exceptions = (TypeError,)
exc = None
try:
self.wt.until(lambda x: x.exception(e=TypeError), message="")
except errors.TimeoutException as e:
exc = e
result = str(exc)
self.assertIn("seconds, caused by", result)
def test_message_has_none_as_its_value(self):
self.wt.exceptions = (TypeError,)
exc = None
try:
self.wt.until(False, None, None)
except errors.TimeoutException as e:
exc = e
result = str(exc)
self.assertNotIn("with message:", result)
self.assertNotIn("secondsNone", result)