Source code

Revision control

Copy as Markdown

Other Tools

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import io
import os
import pipes
import posixpath
import re
import shlex
import shutil
import signal
import subprocess
import sys
import tempfile
import time
import traceback
from shutil import copytree
from threading import Thread
import six
from six.moves import range
from . import version_codes
_TEST_ROOT = None
class ADBProcess(object):
"""ADBProcess encapsulates the data related to executing the adb process."""
def __init__(self, args, use_stdout_pipe=False, timeout=None):
#: command argument list.
self.args = args
Popen_args = {}
#: Temporary file handle to be used for stdout.
if use_stdout_pipe:
self.stdout_file = subprocess.PIPE
# Reading utf-8 from the stdout pipe
if sys.version_info >= (3, 6):
Popen_args["encoding"] = "utf-8"
else:
Popen_args["universal_newlines"] = True
else:
self.stdout_file = tempfile.NamedTemporaryFile(mode="w+b")
Popen_args["stdout"] = self.stdout_file
#: boolean indicating if the command timed out.
self.timedout = None
#: exitcode of the process.
self.exitcode = None
#: subprocess Process object used to execute the command.
Popen_args["stderr"] = subprocess.STDOUT
self.proc = subprocess.Popen(args, **Popen_args)
# If a timeout is set, then create a thread responsible for killing the
# process, as well as updating the exitcode and timedout status.
def timeout_thread(adb_process, timeout):
start_time = time.time()
polling_interval = 0.001
adb_process.exitcode = adb_process.proc.poll()
while (time.time() - start_time) <= float(
timeout
) and adb_process.exitcode is None:
time.sleep(polling_interval)
adb_process.exitcode = adb_process.proc.poll()
if adb_process.exitcode is None:
adb_process.proc.kill()
adb_process.timedout = True
adb_process.exitcode = adb_process.proc.poll()
if timeout:
Thread(target=timeout_thread, args=(self, timeout), daemon=True).start()
@property
def stdout(self):
"""Return the contents of stdout."""
assert not self.stdout_file == subprocess.PIPE
if not self.stdout_file or self.stdout_file.closed:
content = ""
else:
self.stdout_file.seek(0, os.SEEK_SET)
content = six.ensure_str(self.stdout_file.read().rstrip())
return content
def __str__(self):
# Remove -s <serialno> from the error message to allow bug suggestions
# to be independent of the individual failing device.
arg_string = " ".join(self.args)
arg_string = re.sub(r" -s [\w-]+", "", arg_string)
return "args: %s, exitcode: %s, stdout: %s" % (
arg_string,
self.exitcode,
self.stdout,
)
def __iter__(self):
assert self.stdout_file == subprocess.PIPE
return self
def __next__(self):
assert self.stdout_file == subprocess.PIPE
try:
return next(self.proc.stdout)
except StopIteration:
# Wait until the process ends.
while self.exitcode is None or self.timedout:
time.sleep(0.001)
raise StopIteration
# ADBError and ADBTimeoutError are treated differently in order that
# ADBTimeoutErrors can be handled distinctly from ADBErrors.
class ADBError(Exception):
"""ADBError is raised in situations where a command executed on a
device either exited with a non-zero exitcode or when an
unexpected error condition has occurred. Generally, ADBErrors can
be handled and the device can continue to be used.
"""
pass
class ADBProcessError(ADBError):
"""ADBProcessError is raised when an associated ADBProcess is
available and relevant.
"""
def __init__(self, adb_process):
ADBError.__init__(self, str(adb_process))
self.adb_process = adb_process
class ADBListDevicesError(ADBError):
"""ADBListDevicesError is raised when errors are found listing the
devices, typically not any permissions.
The devices information is stocked with the *devices* member.
"""
def __init__(self, msg, devices):
ADBError.__init__(self, msg)
self.devices = devices
class ADBTimeoutError(Exception):
"""ADBTimeoutError is raised when either a host command or shell
command takes longer than the specified timeout to execute. The
timeout value is set in the ADBCommand constructor and is 300 seconds by
default. This error is typically fatal since the host is having
problems communicating with the device. You may be able to recover
by rebooting, but this is not guaranteed.
Recovery options are:
* Killing and restarting the adb server via
::
adb kill-server; adb start-server
* Rebooting the device manually.
* Rebooting the host.
"""
pass
class ADBDeviceFactoryError(Exception):
"""ADBDeviceFactoryError is raised when the ADBDeviceFactory is in
an inconsistent state.
"""
pass
class ADBCommand(object):
"""ADBCommand provides a basic interface to adb commands
which is used to provide the 'command' methods for the
classes ADBHost and ADBDevice.
ADBCommand should only be used as the base class for other
classes and should not be instantiated directly. To enforce this
restriction calling ADBCommand's constructor will raise a
NonImplementedError exception.
:param str adb: path to adb executable. Defaults to 'adb'.
:param str adb_host: host of the adb server.
:param int adb_port: port of the adb server.
:param str logger_name: logging logger name. Defaults to 'adb'.
:param int timeout: The default maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value defaults to 300.
:param bool verbose: provide verbose output
:param bool use_root: Use root if available on device
:raises: :exc:`ADBError`
:exc:`ADBTimeoutError`
::
from mozdevice import ADBCommand
try:
adbcommand = ADBCommand()
except NotImplementedError:
print "ADBCommand can not be instantiated."
"""
def __init__(
self,
adb="adb",
adb_host=None,
adb_port=None,
logger_name="adb",
timeout=300,
verbose=False,
use_root=True,
):
if self.__class__ == ADBCommand:
raise NotImplementedError
self._logger = self._get_logger(logger_name, verbose)
self._verbose = verbose
self._use_root = use_root
self._adb_path = adb
self._adb_host = adb_host
self._adb_port = adb_port
self._timeout = timeout
self._polling_interval = 0.001
self._adb_version = ""
self._logger.debug("%s: %s" % (self.__class__.__name__, self.__dict__))
# catch early a missing or non executable adb command
# and get the adb version while we are at it.
try:
output = subprocess.Popen(
[adb, "version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
).communicate()
re_version = re.compile(r"Android Debug Bridge version (.*)")
if isinstance(output[0], six.binary_type):
self._adb_version = re_version.match(
output[0].decode("utf-8", "replace")
).group(1)
else:
self._adb_version = re_version.match(output[0]).group(1)
if self._adb_version < "1.0.36":
raise ADBError(
"adb version %s less than minimum 1.0.36" % self._adb_version
)
except Exception as exc:
raise ADBError("%s: %s is not executable." % (exc, adb))
def _get_logger(self, logger_name, verbose):
logger = None
level = "DEBUG" if verbose else "INFO"
try:
import mozlog
logger = mozlog.get_default_logger(logger_name)
if not logger:
if sys.__stdout__.isatty():
defaults = {"mach": sys.stdout}
else:
defaults = {"tbpl": sys.stdout}
logger = mozlog.commandline.setup_logging(
logger_name, {}, defaults, formatter_defaults={"level": level}
)
except ImportError:
pass
if logger is None:
import logging
logger = logging.getLogger(logger_name)
logger.setLevel(level)
return logger
# Host Command methods
def command(self, cmds, device_serial=None, timeout=None):
"""Executes an adb command on the host.
:param list cmds: The command and its arguments to be
executed.
:param str device_serial: The device's
serial number if the adb command is to be executed against
a specific device. If it is not specified, ANDROID_SERIAL
from the environment will be used if it is set.
:param int timeout: The maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value set in the ADBCommand constructor is used.
:return: :class:`ADBProcess`
command() provides a low level interface for executing
commands on the host via adb.
command() executes on the host in such a fashion that stdout
of the adb process is a file handle on the host and
the exit code is available as the exit code of the adb
process.
The caller provides a list containing commands, as well as a
timeout period in seconds.
A subprocess is spawned to execute adb with stdout and stderr
directed to a temporary file. If the process takes longer than
the specified timeout, the process is terminated.
It is the caller's responsibilty to clean up by closing
the stdout temporary file.
"""
args = [self._adb_path]
device_serial = device_serial or os.environ.get("ANDROID_SERIAL")
if self._adb_host:
args.extend(["-H", self._adb_host])
if self._adb_port:
args.extend(["-P", str(self._adb_port)])
if device_serial:
args.extend(["-s", device_serial, "wait-for-device"])
args.extend(cmds)
adb_process = ADBProcess(args)
if timeout is None:
timeout = self._timeout
start_time = time.time()
adb_process.exitcode = adb_process.proc.poll()
while (time.time() - start_time) <= float(
timeout
) and adb_process.exitcode is None:
time.sleep(self._polling_interval)
adb_process.exitcode = adb_process.proc.poll()
if adb_process.exitcode is None:
adb_process.proc.kill()
adb_process.timedout = True
adb_process.exitcode = adb_process.proc.poll()
adb_process.stdout_file.seek(0, os.SEEK_SET)
return adb_process
def command_output(self, cmds, device_serial=None, timeout=None):
"""Executes an adb command on the host returning stdout.
:param list cmds: The command and its arguments to be
executed.
:param str device_serial: The device's
serial number if the adb command is to be executed against
a specific device. If it is not specified, ANDROID_SERIAL
from the environment will be used if it is set.
:param int timeout: The maximum time in seconds
for any spawned adb process to complete before throwing
an ADBTimeoutError.
This timeout is per adb call. The total time spent
may exceed this value. If it is not specified, the value
set in the ADBCommand constructor is used.
:return: str - content of stdout.
:raises: :exc:`ADBTimeoutError`
:exc:`ADBError`
"""
adb_process = None
try:
# Need to force the use of the ADBCommand class's command
# since ADBDevice will redefine command and call its
# own version otherwise.
adb_process = ADBCommand.command(
self, cmds, device_serial=device_serial, timeout=timeout
)
if adb_process.timedout:
raise ADBTimeoutError("%s" % adb_process)
if adb_process.exitcode:
raise ADBProcessError(adb_process)
output = adb_process.stdout
if self._verbose:
self._logger.debug(
"command_output: %s, "
"timeout: %s, "
"timedout: %s, "
"exitcode: %s, output: %s"
% (
" ".join(adb_process.args),
timeout,
adb_process.timedout,
adb_process.exitcode,
output,
)
)
return output
finally:
if adb_process and isinstance(adb_process.stdout_file, io.IOBase):
adb_process.stdout_file.close()
class ADBHost(ADBCommand):
"""ADBHost provides a basic interface to adb host commands
which do not target a specific device.
:param str adb: path to adb executable. Defaults to 'adb'.
:param str adb_host: host of the adb server.
:param int adb_port: port of the adb server.
:param logger_name: logging logger name. Defaults to 'adb'.
:param int timeout: The default maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value defaults to 300.
:param bool verbose: provide verbose output
:raises: :exc:`ADBError`
:exc:`ADBTimeoutError`
::
from mozdevice import ADBHost
adbhost = ADBHost()
adbhost.start_server()
"""
def __init__(
self,
adb="adb",
adb_host=None,
adb_port=None,
logger_name="adb",
timeout=300,
verbose=False,
):
ADBCommand.__init__(
self,
adb=adb,
adb_host=adb_host,
adb_port=adb_port,
logger_name=logger_name,
timeout=timeout,
verbose=verbose,
use_root=True,
)
def command(self, cmds, timeout=None):
"""Executes an adb command on the host.
:param list cmds: The command and its arguments to be
executed.
:param int timeout: The maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value set in the ADBHost constructor is used.
:return: :class:`ADBProcess`
command() provides a low level interface for executing
commands on the host via adb.
command() executes on the host in such a fashion that stdout
of the adb process is a file handle on the host and
the exit code is available as the exit code of the adb
process.
The caller provides a list containing commands, as well as a
timeout period in seconds.
A subprocess is spawned to execute adb with stdout and stderr
directed to a temporary file. If the process takes longer than
the specified timeout, the process is terminated.
It is the caller's responsibilty to clean up by closing
the stdout temporary file.
"""
return ADBCommand.command(self, cmds, timeout=timeout)
def command_output(self, cmds, timeout=None):
"""Executes an adb command on the host returning stdout.
:param list cmds: The command and its arguments to be
executed.
:param int timeout: The maximum time in seconds
for any spawned adb process to complete before throwing
an ADBTimeoutError.
This timeout is per adb call. The total time spent
may exceed this value. If it is not specified, the value
set in the ADBHost constructor is used.
:return: str - content of stdout.
:raises: :exc:`ADBTimeoutError`
:exc:`ADBError`
"""
return ADBCommand.command_output(self, cmds, timeout=timeout)
def start_server(self, timeout=None):
"""Starts the adb server.
:param int timeout: The maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value set in the ADBHost constructor is used.
:raises: :exc:`ADBTimeoutError`
:exc:`ADBError`
Attempting to use start_server with any adb_host value other than None
will fail with an ADBError exception.
You will need to start the server on the remote host via the command:
.. code-block:: shell
adb -a fork-server server
If you wish the remote adb server to restart automatically, you can
enclose the command in a loop as in:
.. code-block:: shell
while true; do
adb -a fork-server server
done
"""
self.command_output(["start-server"], timeout=timeout)
def kill_server(self, timeout=None):
"""Kills the adb server.
:param int timeout: The maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value set in the ADBHost constructor is used.
:raises: :exc:`ADBTimeoutError`
:exc:`ADBError`
"""
self.command_output(["kill-server"], timeout=timeout)
def devices(self, timeout=None):
"""Executes adb devices -l and returns a list of objects describing attached devices.
:param int timeout: The maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value set in the ADBHost constructor is used.
:return: an object contain
:raises: :exc:`ADBTimeoutError`
:exc:`ADBListDevicesError`
:exc:`ADBError`
The output of adb devices -l
::
$ adb devices -l
List of devices attached
b313b945 device usb:1-7 product:d2vzw model:SCH_I535 device:d2vzw
is parsed and placed into an object as in
::
[{'device_serial': 'b313b945', 'state': 'device', 'product': 'd2vzw',
'usb': '1-7', 'device': 'd2vzw', 'model': 'SCH_I535' }]
"""
# b313b945 device usb:1-7 product:d2vzw model:SCH_I535 device:d2vzw
# from Android system/core/adb/transport.c statename()
re_device_info = re.compile(
r"([^\s]+)\s+(offline|bootloader|device|host|recovery|sideload|"
"no permissions|unauthorized|unknown)"
)
devices = []
lines = self.command_output(["devices", "-l"], timeout=timeout).splitlines()
for line in lines:
if line == "List of devices attached ":
continue
match = re_device_info.match(line)
if match:
device = {"device_serial": match.group(1), "state": match.group(2)}
remainder = line[match.end(2) :].strip()
if remainder:
try:
device.update(
dict([j.split(":") for j in remainder.split(" ")])
)
except ValueError:
self._logger.warning(
"devices: Unable to parse " "remainder for device %s" % line
)
devices.append(device)
for device in devices:
if device["state"] == "no permissions":
raise ADBListDevicesError(
"No permissions to detect devices. You should restart the"
" adb server as root:\n"
"\n# adb kill-server\n# adb start-server\n"
"\nor maybe configure your udev rules.",
devices,
)
return devices
ADBDEVICES = {}
def ADBDeviceFactory(
device=None,
adb="adb",
adb_host=None,
adb_port=None,
test_root=None,
logger_name="adb",
timeout=300,
verbose=False,
device_ready_retry_wait=20,
device_ready_retry_attempts=3,
use_root=True,
share_test_root=True,
run_as_package=None,
):
"""ADBDeviceFactory provides a factory for :class:`ADBDevice`
instances that enforces the requirement that only one
:class:`ADBDevice` be created for each attached device. It uses
the identical arguments as the :class:`ADBDevice`
constructor. This is also used to ensure that the device's
test_root is initialized to an empty directory before tests are
run on the device.
:return: :class:`ADBDevice`
:raises: :exc:`ADBDeviceFactoryError`
:exc:`ADBError`
:exc:`ADBTimeoutError`
"""
device = device or os.environ.get("ANDROID_SERIAL")
if device is not None and device in ADBDEVICES:
# We have already created an ADBDevice for this device, just re-use it.
adbdevice = ADBDEVICES[device]
elif device is None and ADBDEVICES:
# We did not specify the device serial number and we have
# already created an ADBDevice which means we must only have
# one device connected and we can re-use the existing ADBDevice.
devices = list(ADBDEVICES.keys())
assert (
len(devices) == 1
), "Only one device may be connected if the device serial number is not specified."
adbdevice = ADBDEVICES[devices[0]]
elif (
device is not None
and device not in ADBDEVICES
or device is None
and not ADBDEVICES
):
# The device has not had an ADBDevice created yet.
adbdevice = ADBDevice(
device=device,
adb=adb,
adb_host=adb_host,
adb_port=adb_port,
test_root=test_root,
logger_name=logger_name,
timeout=timeout,
verbose=verbose,
device_ready_retry_wait=device_ready_retry_wait,
device_ready_retry_attempts=device_ready_retry_attempts,
use_root=use_root,
share_test_root=share_test_root,
run_as_package=run_as_package,
)
ADBDEVICES[adbdevice._device_serial] = adbdevice
else:
raise ADBDeviceFactoryError(
"Inconsistent ADBDeviceFactory: device: %s, ADBDEVICES: %s"
% (device, ADBDEVICES)
)
# Clean the test root before testing begins.
if test_root:
adbdevice.rm(
posixpath.join(adbdevice.test_root, "*"),
recursive=True,
force=True,
timeout=timeout,
)
# Sync verbose and update the logger configuration in case it has
# changed since the initial initialization
if verbose != adbdevice._verbose:
adbdevice._verbose = verbose
adbdevice._logger = adbdevice._get_logger(adbdevice._logger.name, verbose)
return adbdevice
class ADBDevice(ADBCommand):
"""ADBDevice provides methods which can be used to interact with the
associated Android-based device.
:param str device: When a string is passed in device, it
is interpreted as the device serial number. This form is not
compatible with devices containing a ":" in the serial; in
this case ValueError will be raised. When a dictionary is
passed it must have one or both of the keys "device_serial"
and "usb". This is compatible with the dictionaries in the
list returned by ADBHost.devices(). If the value of
device_serial is a valid serial not containing a ":" it will
be used to identify the device, otherwise the value of the usb
key, prefixed with "usb:" is used. If None is passed and
there is exactly one device attached to the host, that device
is used. If None is passed and ANDROID_SERIAL is set in the environment,
that device is used. If there is more than one device attached and
device is None and ANDROID_SERIAL is not set in the environment, ValueError
is raised. If no device is attached the constructor will block
until a device is attached or the timeout is reached.
:param str adb_host: host of the adb server to connect to.
:param int adb_port: port of the adb server to connect to.
:param str test_root: value containing the test root to be
used on the device. This value will be shared among all
instances of ADBDevice if share_test_root is True.
:param str logger_name: logging logger name. Defaults to 'adb'
:param int timeout: The default maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value defaults to 300.
:param bool verbose: provide verbose output
:param int device_ready_retry_wait: number of seconds to wait
between attempts to check if the device is ready after a
reboot.
:param integer device_ready_retry_attempts: number of attempts when
checking if a device is ready.
:param bool use_root: Use root if it is available on device
:param bool share_test_root: True if instance should share the
same test_root value with other ADBInstances. Defaults to True.
:param str run_as_package: Name of package to be used in run-as in liew of
using su.
:raises: :exc:`ADBError`
:exc:`ADBTimeoutError`
:exc:`ValueError`
::
from mozdevice import ADBDevice
adbdevice = ADBDevice()
print(adbdevice.list_files("/mnt/sdcard"))
if adbdevice.process_exist("org.mozilla.geckoview.test_runner"):
print("org.mozilla.geckoview.test_runner is running")
"""
SOCKET_DIRECTION_REVERSE = "reverse"
SOCKET_DIRECTION_FORWARD = "forward"
# BUILTINS is used to determine which commands can not be executed
# via su or run-as. This set of possible builtin commands was
# obtained from `man builtin` on Linux.
BUILTINS = set(
[
"alias",
"bg",
"bind",
"break",
"builtin",
"caller",
"cd",
"command",
"compgen",
"complete",
"compopt",
"continue",
"declare",
"dirs",
"disown",
"echo",
"enable",
"eval",
"exec",
"exit",
"export",
"false",
"fc",
"fg",
"getopts",
"hash",
"help",
"history",
"jobs",
"kill",
"let",
"local",
"logout",
"mapfile",
"popd",
"printf",
"pushd",
"pwd",
"read",
"readonly",
"return",
"set",
"shift",
"shopt",
"source",
"suspend",
"test",
"times",
"trap",
"true",
"type",
"typeset",
"ulimit",
"umask",
"unalias",
"unset",
"wait",
]
)
def __init__(
self,
device=None,
adb="adb",
adb_host=None,
adb_port=None,
test_root=None,
logger_name="adb",
timeout=300,
verbose=False,
device_ready_retry_wait=20,
device_ready_retry_attempts=3,
use_root=True,
share_test_root=True,
run_as_package=None,
):
global _TEST_ROOT
ADBCommand.__init__(
self,
adb=adb,
adb_host=adb_host,
adb_port=adb_port,
logger_name=logger_name,
timeout=timeout,
verbose=verbose,
use_root=use_root,
)
self._logger.info("Using adb %s" % self._adb_version)
self._device_serial = self._get_device_serial(device)
self._initial_test_root = test_root
self._share_test_root = share_test_root
if share_test_root and not _TEST_ROOT:
_TEST_ROOT = test_root
self._test_root = None
self._run_as_package = None
# Cache packages debuggable state.
self._debuggable_packages = {}
self._device_ready_retry_wait = device_ready_retry_wait
self._device_ready_retry_attempts = device_ready_retry_attempts
self._have_root_shell = False
self._have_su = False
self._have_android_su = False
self._selinux = None
self._re_internal_storage = None
self._wait_for_boot_completed(timeout=timeout)
# Record the start time of the ADBDevice initialization so we can
# determine if we should abort with an ADBTimeoutError if it is
# taking too long.
start_time = time.time()
# Attempt to get the Android version as early as possible in order
# to work around differences in determining adb command exit codes
# in Android before and after Android 7.
self.version = 0
while self.version < 1 and (time.time() - start_time) <= float(timeout):
try:
version = self.get_prop("ro.build.version.sdk", timeout=timeout)
self.version = int(version)
except ValueError:
self._logger.info("unexpected ro.build.version.sdk: '%s'" % version)
time.sleep(2)
if self.version < 1:
# note slightly different meaning to the ADBTimeoutError here (and above):
# failed to get valid (numeric) version string in all attempts in allowed time
raise ADBTimeoutError(
"ADBDevice: unable to determine ro.build.version.sdk."
)
self._mkdir_p = None
# Force the use of /system/bin/ls or /system/xbin/ls in case
# there is /sbin/ls which embeds ansi escape codes to colorize
# the output. Detect if we are using busybox ls. We want each
# entry on a single line and we don't want . or ..
ls_dir = "/system"
# Using self.is_file is problematic on emulators either
# using ls or test to check for their existence.
# Executing the command to detect its existence works around
# any issues with ls or test.
boot_completed = False
while not boot_completed and (time.time() - start_time) <= float(timeout):
try:
self.shell_output("/system/bin/ls /system/bin/ls", timeout=timeout)
boot_completed = True
self._ls = "/system/bin/ls"
except ADBError as e1:
self._logger.debug("detect /system/bin/ls {}".format(e1))
try:
self.shell_output(
"/system/xbin/ls /system/xbin/ls", timeout=timeout
)
boot_completed = True
self._ls = "/system/xbin/ls"
except ADBError as e2:
self._logger.debug("detect /system/xbin/ls : {}".format(e2))
if not boot_completed:
time.sleep(2)
if not boot_completed:
raise ADBError("ADBDevice.__init__: ls could not be found")
# A race condition can occur especially with emulators where
# the device appears to be available but it has not completed
# mounting the sdcard. We can work around this by checking if
# the sdcard is missing when we attempt to ls it and retrying
# if it is not yet available.
boot_completed = False
while not boot_completed and (time.time() - start_time) <= float(timeout):
try:
self.shell_output("{} -1A {}".format(self._ls, ls_dir), timeout=timeout)
boot_completed = True
self._ls += " -1A"
except ADBError as e:
self._logger.debug("detect ls -1A: {}".format(e))
if "No such file or directory" not in str(e):
boot_completed = True
self._ls += " -a"
if not boot_completed:
time.sleep(2)
if not boot_completed:
raise ADBTimeoutError("ADBDevice: /sdcard not found.")
self._logger.info("%s supported" % self._ls)
# builtin commands which do not exist as separate programs can
# not be executed using su or run-as. Remove builtin commands
# from self.BUILTINS which also exist as separate programs so
# that we will be able to execute them using su or run-as if
# necessary.
remove_builtins = set()
for builtin in self.BUILTINS:
try:
self.ls("/system/*bin/%s" % builtin, timeout=timeout)
self._logger.debug("Removing %s from BUILTINS" % builtin)
remove_builtins.add(builtin)
except ADBError:
pass
self.BUILTINS.difference_update(remove_builtins)
# Do we have cp?
boot_completed = False
while not boot_completed and (time.time() - start_time) <= float(timeout):
try:
self.shell_output("cp --help", timeout=timeout)
boot_completed = True
self._have_cp = True
except ADBError as e:
if "not found" in str(e):
self._have_cp = False
boot_completed = True
elif "known option" in str(e):
self._have_cp = True
boot_completed = True
elif "invalid option" in str(e):
self._have_cp = True
boot_completed = True
if not boot_completed:
time.sleep(2)
if not boot_completed:
raise ADBTimeoutError("ADBDevice: cp not found.")
self._logger.info("Native cp support: %s" % self._have_cp)
# Do we have chmod -R?
try:
self._chmod_R = False
re_recurse = re.compile(r"[-]R")
chmod_output = self.shell_output("chmod --help", timeout=timeout)
match = re_recurse.search(chmod_output)
if match:
self._chmod_R = True
except ADBError as e:
self._logger.debug("Check chmod -R: {}".format(e))
match = re_recurse.search(str(e))
if match:
self._chmod_R = True
self._logger.info("Native chmod -R support: {}".format(self._chmod_R))
# Do we have chown -R?
try:
self._chown_R = False
chown_output = self.shell_output("chown --help", timeout=timeout)
match = re_recurse.search(chown_output)
if match:
self._chown_R = True
except ADBError as e:
self._logger.debug("Check chown -R: {}".format(e))
self._logger.info("Native chown -R support: {}".format(self._chown_R))
try:
cleared = self.shell_bool('logcat -P ""', timeout=timeout)
except ADBError:
cleared = False
if not cleared:
self._logger.info("Unable to turn off logcat chatty")
# Do we have pidof?
if self.version < version_codes.N:
# unexpected pidof behavior observed on Android 6 in bug 1514363
self._have_pidof = False
else:
boot_completed = False
while not boot_completed and (time.time() - start_time) <= float(timeout):
try:
self.shell_output("pidof --help", timeout=timeout)
boot_completed = True
self._have_pidof = True
except ADBError as e:
if "not found" in str(e):
self._have_pidof = False
boot_completed = True
elif "known option" in str(e):
self._have_pidof = True
boot_completed = True
if not boot_completed:
time.sleep(2)
if not boot_completed:
raise ADBTimeoutError("ADBDevice: pidof not found.")
# Bug 1529960 observed pidof intermittently returning no results for a
# running process on the 7.0 x86_64 emulator.
characteristics = self.get_prop("ro.build.characteristics", timeout=timeout)
abi = self.get_prop("ro.product.cpu.abi", timeout=timeout)
self._have_flaky_pidof = (
self.version == version_codes.N
and abi == "x86_64"
and "emulator" in characteristics
)
self._logger.info(
"Native {} pidof support: {}".format(
"flaky" if self._have_flaky_pidof else "normal", self._have_pidof
)
)
if self._use_root:
# Detect if root is available, but do not fail if it is not.
# Catch exceptions due to the potential for segfaults
# calling su when using an improperly rooted device.
self._check_adb_root(timeout=timeout)
if not self._have_root_shell:
# To work around bug 1525401 where su -c id will return an
# exitcode of 1 if selinux permissive is not already in effect,
# we need su to turn off selinux prior to checking for su.
# We can use shell() directly to prevent the non-zero exitcode
# from raising an ADBError.
# Note: We are assuming su -c is supported and do not attempt to
# use su 0.
adb_process = self.shell("su -c setenforce 0")
self._logger.info(
"su -c setenforce 0 exitcode %s, stdout: %s"
% (adb_process.proc.poll(), adb_process.proc.stdout)
)
uid = "uid=0"
# Do we have a 'Superuser' sh like su?
try:
if self.shell_output("su -c id", timeout=timeout).find(uid) != -1:
self._have_su = True
self._logger.info("su -c supported")
except ADBError as e:
self._logger.debug("Check for su -c failed: {}".format(e))
# Check if Android's su 0 command works.
# If we already have detected su -c support, we can skip this check.
try:
if (
not self._have_su
and self.shell_output("su 0 id", timeout=timeout).find(uid)
!= -1
):
self._have_android_su = True
self._logger.info("su 0 supported")
except ADBError as e:
self._logger.debug("Check for su 0 failed: {}".format(e))
# Guarantee that /data/local/tmp exists and is accessible to all.
# It is a fatal error if /data/local/tmp does not exist and can not be created.
if not self.exists("/data/local/tmp", timeout=timeout):
# parents=True is required on emulator, where exist() may be flaky
self.mkdir("/data/local/tmp", parents=True, timeout=timeout)
# Beginning in Android 8.1 /data/anr/traces.txt no longer contains
# a single file traces.txt but instead will contain individual files
# for each stack.
# fbba7fe06312241c7eb8c592ec2ac630e4316d55
stack_trace_dir = self.shell_output(
"getprop dalvik.vm.stack-trace-dir", timeout=timeout
)
if not stack_trace_dir:
stack_trace_file = self.shell_output(
"getprop dalvik.vm.stack-trace-file", timeout=timeout
)
if stack_trace_file:
stack_trace_dir = posixpath.dirname(stack_trace_file)
else:
stack_trace_dir = "/data/anr"
self.stack_trace_dir = stack_trace_dir
self.enforcing = "Permissive"
self.run_as_package = run_as_package
self._logger.debug("ADBDevice: %s" % self.__dict__)
@property
def is_rooted(self):
return self._have_root_shell or self._have_su or self._have_android_su
def _wait_for_boot_completed(self, timeout=None):
"""Internal method to wait for boot to complete.
Wait for sys.boot_completed=1 and raise ADBError if boot does
not complete within retry attempts.
:param int timeout: The default maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value defaults to 300.
:raises: :exc:`ADBError`
"""
for attempt in range(self._device_ready_retry_attempts):
sys_boot_completed = self.shell_output(
"getprop sys.boot_completed", timeout=timeout
)
if sys_boot_completed == "1":
break
time.sleep(self._device_ready_retry_wait)
if sys_boot_completed != "1":
raise ADBError("Failed to complete boot in time")
def _get_device_serial(self, device):
device = device or os.environ.get("ANDROID_SERIAL")
if device is None:
devices = ADBHost(
adb=self._adb_path, adb_host=self._adb_host, adb_port=self._adb_port
).devices()
if len(devices) > 1:
raise ValueError(
"ADBDevice called with multiple devices "
"attached and no device specified"
)
if len(devices) == 0:
raise ADBError("No connected devices found.")
device = devices[0]
# Allow : in device serial if it matches a tcpip device serial.
re_device_serial_tcpip = re.compile(r"[^:]+:[0-9]+$")
def is_valid_serial(serial):
return (
serial.startswith("usb:")
or re_device_serial_tcpip.match(serial) is not None
or ":" not in serial
)
if isinstance(device, six.string_types):
# Treat this as a device serial
if not is_valid_serial(device):
raise ValueError(
"Device serials containing ':' characters are "
"invalid. Pass the output from "
"ADBHost.devices() for the device instead"
)
return device
serial = device.get("device_serial")
if serial is not None and is_valid_serial(serial):
return serial
usb = device.get("usb")
if usb is not None:
return "usb:%s" % usb
raise ValueError("Unable to get device serial")
def _check_root_user(self, timeout=None):
uid = "uid=0"
# Is shell already running as root?
try:
if self.shell_output("id", timeout=timeout).find(uid) != -1:
self._logger.info("adbd running as root")
return True
except ADBError:
self._logger.debug("Check for root user failed")
return False
def _check_adb_root(self, timeout=None):
self._have_root_shell = self._check_root_user(timeout=timeout)
# Exclude these devices from checking for a root shell due to
# potential hangs.
exclude_set = set()
exclude_set.add("E5823") # Sony Xperia Z5 Compact (E5823)
# Do we need to run adb root to get a root shell?
if not self._have_root_shell:
if self.get_prop("ro.product.model") in exclude_set:
self._logger.warning(
"your device was excluded from attempting adb root."
)
else:
try:
self.command_output(["root"], timeout=timeout)
self._have_root_shell = self._check_root_user(timeout=timeout)
if self._have_root_shell:
self._logger.info("adbd restarted as root")
else:
self._logger.info("adbd not restarted as root")
except ADBError:
self._logger.debug("Check for root adbd failed")
def pidof(self, app_name, timeout=None):
"""
Return a list of pids for all extant processes running within the
specified application package.
:param str app_name: The name of the application package to examine
:param int timeout: The maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per
adb call. The total time spent may exceed this
value. If it is not specified, the value set
in the ADBDevice constructor is used.
:return: List of integers containing the pid(s) of the various processes.
:raises: :exc:`ADBTimeoutError`
"""
if self._have_pidof:
try:
pid_output = self.shell_output("pidof %s" % app_name, timeout=timeout)
re_pids = re.compile(r"[0-9]+")
pids = re_pids.findall(pid_output)
if self._have_flaky_pidof and not pids:
time.sleep(0.1)
pid_output = self.shell_output(
"pidof %s" % app_name, timeout=timeout
)
pids = re_pids.findall(pid_output)
except ADBError:
pids = []
else:
procs = self.get_process_list(timeout=timeout)
# limit the comparion to the first 75 characters due to a
# limitation in processname length in android.
pids = [proc[0] for proc in procs if proc[1] == app_name[:75]]
return [int(pid) for pid in pids]
def _sync(self, timeout=None):
"""Sync the file system using shell_output in order that exceptions
are raised to the caller."""
self.shell_output("sync", timeout=timeout)
@staticmethod
def _should_quote(arg):
"""Utility function if command argument should be quoted."""
if not arg:
return False
if arg[0] == "'" and arg[-1] == "'" or arg[0] == '"' and arg[-1] == '"':
# Already quoted
return False
re_quotable_chars = re.compile(r"[ ()\"&'\];]")
return re_quotable_chars.search(arg)
@staticmethod
def _quote(arg):
"""Utility function to return quoted version of command argument."""
if hasattr(shlex, "quote"):
quote = shlex.quote
elif hasattr(pipes, "quote"):
quote = pipes.quote
else:
def quote(arg):
arg = arg or ""
re_unsafe = re.compile(r"[^\w@%+=:,./-]")
if re_unsafe.search(arg):
arg = "'" + arg.replace("'", "'\"'\"'") + "'"
return arg
return quote(arg)
@staticmethod
def _escape_command_line(cmds):
"""Utility function which takes a list of command arguments and returns
escaped and quoted version of the command as a string.
"""
assert isinstance(cmds, list)
# This is identical to shlex.join in Python 3.8. We can
# replace it should we ever get Python 3.8 as a minimum.
quoted_cmd = " ".join([ADBDevice._quote(arg) for arg in cmds])
return quoted_cmd
@staticmethod
def _get_exitcode(file_obj):
"""Get the exitcode from the last line of the file_obj for shell
commands executed on Android prior to Android 7.
"""
re_returncode = re.compile(r"adb_returncode=([0-9]+)")
file_obj.seek(0, os.SEEK_END)
line = ""
length = file_obj.tell()
offset = 1
while length - offset >= 0:
file_obj.seek(-offset, os.SEEK_END)
char = six.ensure_str(file_obj.read(1))
if not char:
break
if char != "\r" and char != "\n":
line = char + line
elif line:
# we have collected everything up to the beginning of the line
break
offset += 1
match = re_returncode.match(line)
if match:
exitcode = int(match.group(1))
# Set the position in the file to the position of the
# adb_returncode and truncate it from the output.
file_obj.seek(-1, os.SEEK_CUR)
file_obj.truncate()
else:
exitcode = None
# We may have a situation where the adb_returncode= is not
# at the end of the output. This happens at least in the
# failure jit-tests on arm. To work around this
# possibility, we can search the entire output for the
# appropriate match.
file_obj.seek(0, os.SEEK_SET)
for line in file_obj:
line = six.ensure_str(line)
match = re_returncode.search(line)
if match:
exitcode = int(match.group(1))
break
# Reset the position in the file to the end.
file_obj.seek(0, os.SEEK_END)
return exitcode
def is_path_internal_storage(self, path, timeout=None):
"""
Return True if the path matches an internal storage path
as defined by either '/sdcard', '/mnt/sdcard', or any of the
.*_STORAGE environment variables on the device otherwise False.
:param str path: The path to test.
:param int timeout: The maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value set in the ADBDevice constructor is used.
:return: boolean
:raises: :exc:`ADBTimeoutError`
:exc:`ADBError`
"""
if not self._re_internal_storage:
storage_dirs = set(["/mnt/sdcard", "/sdcard"])
re_STORAGE = re.compile("([^=]+STORAGE)=(.*)")
lines = self.shell_output("set", timeout=timeout).split()
for line in lines:
m = re_STORAGE.match(line.strip())
if m and m.group(2):
storage_dirs.add(m.group(2))
self._re_internal_storage = re.compile("/|".join(list(storage_dirs)) + "/")
return self._re_internal_storage.match(path) is not None
def is_package_debuggable(self, package):
if not package:
return False
if not self.is_app_installed(package):
self._logger.warning(
"Can not check if package %s is debuggable as it is not installed."
% package
)
return False
if package in self._debuggable_packages:
return self._debuggable_packages[package]
try:
self.shell_output("run-as %s ls /system" % package)
self._debuggable_packages[package] = True
except ADBError as e:
self._debuggable_packages[package] = False
self._logger.warning("Package %s is not debuggable: %s" % (package, str(e)))
return self._debuggable_packages[package]
@property
def package_dir(self):
if not self._run_as_package:
return None
# If we have a debuggable app and can use its directory to
# locate the test_root, this returns the location of the app's
# directory. If it is not located in the default location this
# will not be correct.
return "/data/data/%s" % self._run_as_package
@property
def run_as_package(self):
"""Returns the name of the package which will be used in run-as to change
the effective user executing a command."""
return self._run_as_package
@run_as_package.setter
def run_as_package(self, value):
if self._have_root_shell or self._have_su or self._have_android_su:
# When we have root available, use that instead of run-as.
return
if self._run_as_package == value:
# Do nothing if the value doesn't change.
return
if not value:
if self._test_root:
# Make sure the old test_root is clean without using
# the test_root property getter.
self.rm(
posixpath.join(self._test_root, "*"), recursive=True, force=True
)
self._logger.info(
"Setting run_as_package to None. Resetting test root from %s to %s"
% (self._test_root, self._initial_test_root)
)
self._run_as_package = None
# We must set _run_as_package to None before assigning to
# self.test_root in order to prevent attempts to use
# run-as.
self.test_root = self._initial_test_root
if self._test_root:
# Make sure the new test_root is clean.
self.rm(
posixpath.join(self._test_root, "*"), recursive=True, force=True
)
return
if not self.is_package_debuggable(value):
self._logger.warning(
"Can not set run_as_package to %s since it is not debuggable." % value
)
# Since we are attempting to set run_as_package assume
# that we are not rooted and do not include
# /data/local/tmp as an option when checking for possible
# test_root paths using external storage.
paths = [
"/storage/emulated/0/Android/data/%s/test_root" % value,
"/sdcard/test_root",
"/mnt/sdcard/test_root",
]
self._try_test_root_candidates(paths)
return
# Require these devices to have Verify bytecode turned off due to failures with run-as.
include_set = set()
include_set.add("SM-G973F") # Samsung S10g SM-G973F
if (
self.get_prop("ro.product.model") in include_set
and self.shell_output("settings get global art_verifier_verify_debuggable")
== "1"
):
self._logger.warning(
"""Your device has Verify bytecode of debuggable apps set which
causes problems attempting to use run-as to delegate command execution to debuggable
apps. You must turn this setting off in Developer options on your device.
"""
)
raise ADBError(
"Verify bytecode of debuggable apps must be turned off to use run-as"
)
self._logger.info("Setting run_as_package to %s" % value)
self._run_as_package = value
old_test_root = self._test_root
new_test_root = posixpath.join(self.package_dir, "test_root")
if old_test_root != new_test_root:
try:
# Make sure the old test_root is clean.
if old_test_root:
self.rm(
posixpath.join(old_test_root, "*"), recursive=True, force=True
)
self.test_root = posixpath.join(self.package_dir, "test_root")
# Make sure the new test_root is clean.
self.rm(posixpath.join(self.test_root, "*"), recursive=True, force=True)
except ADBError as e:
# There was a problem using run-as to initialize
# the new test_root in the app's directory.
# Restore the old test root and raise an ADBError.
self._run_as_package = None
self.test_root = old_test_root
self._logger.warning(
"Exception %s setting test_root to %s. "
"Resetting test_root to %s."
% (str(e), new_test_root, old_test_root)
)
raise ADBError(
"Unable to initialize test root while setting run_as_package %s"
% value
)
def enable_run_as_for_path(self, path):
return self._run_as_package is not None and path.startswith(self.package_dir)
@property
def test_root(self):
"""
The test_root property returns the directory on the device where
temporary test files are stored.
The first time test_root it is called it determines and caches a value
for the test root on the device. It determines the appropriate test
root by attempting to create a 'proof' directory on each of a list of
directories and returning the first successful directory as the
test_root value. The cached value for the test_root will be shared
by subsequent instances of ADBDevice if self._share_test_root is True.
The default list of directories checked by test_root are:
If the device is rooted:
- /data/local/tmp/test_root
If run_as_package is not available and the device is not rooted:
- /data/local/tmp/test_root
- /sdcard/test_root
- /storage/sdcard/test_root
- /mnt/sdcard/test_root
You may override the default list by providing a test_root argument to
the :class:`ADBDevice` constructor which will then be used when
attempting to create the 'proof' directory.
:raises: :exc:`ADBTimeoutError`
:exc:`ADBError`
"""
if self._test_root is not None:
self._logger.debug("Using cached test_root %s" % self._test_root)
return self._test_root
if self.run_as_package is not None:
raise ADBError(
"run_as_package is %s however test_root is None" % self.run_as_package
)
if self._share_test_root and _TEST_ROOT:
self._logger.debug(
"Attempting to use shared test_root %s" % self._test_root
)
paths = [_TEST_ROOT]
elif self._initial_test_root is not None:
self._logger.debug(
"Attempting to use initial test_root %s" % self._test_root
)
paths = [self._initial_test_root]
else:
# Android 10's scoped storage means we can no longer
# reliably host profiles and tests on the sdcard though it
# depends on the device. See
# Also see RunProgram in
# python/mozbuild/mozbuild/mach_commands.py where they
# choose /data/local/tmp as the default location for the
# profile because GeckoView only takes its configuration
# file from /data/local/tmp. Since we have not specified
# a run_as_package yet, assume we may be attempting to use
# a shell program which creates files owned by the shell
# user and which would work using /data/local/tmp/ even if
# the device is not rooted. Fall back to external storage
# if /data/local/tmp is not available.
paths = ["/data/local/tmp/test_root"]
if not self.is_rooted:
# Note that /sdcard may be accessible while
# /mnt/sdcard is not.
paths.extend(
[
"/sdcard/test_root",
"/storage/sdcard/test_root",
"/mnt/sdcard/test_root",
]
)
return self._try_test_root_candidates(paths)
@test_root.setter
def test_root(self, value):
# Cache the requested test root so that
# other invocations of ADBDevice will pick
# up the same value.
global _TEST_ROOT
if self._test_root == value:
return
self._logger.debug("Setting test_root from %s to %s" % (self._test_root, value))
old_test_root = self._test_root
self._test_root = value
if self._share_test_root:
_TEST_ROOT = value
if not value:
return
if not self._try_test_root(value):
self._test_root = old_test_root
raise ADBError("Unable to set test_root to %s" % value)
readme = posixpath.join(value, "README")
if not self.is_file(readme):
tmpf = tempfile.NamedTemporaryFile(mode="w", delete=False)
tmpf.write(
"This directory is used by mozdevice to contain all content "
"related to running tests on this device.\n"
)
tmpf.close()
try:
self.push(tmpf.name, readme)
finally:
if tmpf:
os.unlink(tmpf.name)
def _try_test_root_candidates(self, paths):
max_attempts = 3
for test_root in paths:
for attempt in range(1, max_attempts + 1):
self._logger.debug(
"Setting test root to %s attempt %d of %d"
% (test_root, attempt, max_attempts)
)
if self._try_test_root(test_root):
if not self._test_root:
# Cache the detected test_root so that we can
# restore the value without having re-run
# _try_test_root.
self._initial_test_root = test_root
self._test_root = test_root
self._logger.info("Setting test_root to %s" % self._test_root)
return self._test_root
self._logger.debug(
"_setup_test_root: "
"Attempt %d of %d failed to set test_root to %s"
% (attempt, max_attempts, test_root)
)
if attempt != max_attempts:
time.sleep(20)
raise ADBError(
"Unable to set up test root using paths: [%s]" % ", ".join(paths)
)
def _try_test_root(self, test_root):
try:
if not self.is_dir(test_root):
self.mkdir(test_root, parents=True)
proof_dir = posixpath.join(test_root, "proof")
if self.is_dir(proof_dir):
self.rm(proof_dir, recursive=True)
self.mkdir(proof_dir)
self.rm(proof_dir, recursive=True)
except ADBError as e:
self._logger.warning("%s is not writable: %s" % (test_root, str(e)))
return False
return True
# Host Command methods
def command(self, cmds, timeout=None):
"""Executes an adb command on the host against the device.
:param list cmds: The command and its arguments to be
executed.
:param int timeout: The maximum time in
seconds for any spawned adb process to complete before
throwing an ADBTimeoutError. This timeout is per adb call. The
total time spent may exceed this value. If it is not
specified, the value set in the ADBDevice constructor is used.
:return: :class:`ADBProcess`
command() provides a low level interface for executing
commands for a specific device on the host via adb.
command() executes on the host in such a fashion that stdout
of the adb process are file handles on the host and
the exit code is available as the exit code of the adb
process.
For executing shell commands on the device, use
ADBDevice.shell(). The caller provides a list containing
commands, as well as a timeout period in seconds.
A subprocess is spawned to execute adb for the device with
stdout and stderr directed to a temporary file. If the process
takes longer than the specified timeout, the process is
terminated.
It is the caller's responsibilty to clean up by closing
the stdout temporary file.
"""
return ADBCommand.command(
self, cmds, device_serial=self._device_serial, timeout=timeout
)
def command_output(self, cmds, timeout=None):
"""Executes an adb command on the host against the device returning
stdout.
:param list cmds: The command and its arguments to be executed.
:param int timeout: The maximum time in seconds
for any spawned adb process to complete before throwing
an ADBTimeoutError.
This timeout is per adb call. The total time spent
may exceed this value. If it is not specified, the value
set in the ADBDevice constructor is used.
:return: str - content of stdout.
:raises: :exc:`ADBTimeoutError`
:exc:`ADBError`
"""
return ADBCommand.command_output(
self, cmds, device_serial=self._device_serial, timeout=timeout
)