Source code
Revision control
Copy as Markdown
Other Tools
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import logging
import taskcluster_urls as liburls
from taskcluster import Hooks
from taskgraph.util import taskcluster as tc_util
from taskgraph.util.taskcluster import (
get_root_url,
get_task_definition,
get_taskcluster_client,
)
logger = logging.getLogger(__name__)
def get_index_url(index_path, multiple=False, block_proxy=True):
index_tmpl = liburls.api(
get_root_url(block_proxy=block_proxy), "index", "v1", "task{}/{}"
)
return index_tmpl.format("s" if multiple else "", index_path)
def insert_index(index_path, task_id, data=None):
# Find task expiry.
expires = get_task_definition(task_id)["expires"]
index = get_taskcluster_client("index")
response = index.insertTask(
index_path,
{
"taskId": task_id,
"rank": 0,
"data": data or {},
"expires": expires,
},
)
return response
def status_task(task_id):
"""Gets the status of a task given a task_id.
In testing mode, just logs that it would have retrieved status.
Args:
task_id (str): A task id.
Returns:
dict: A dictionary object as defined here:
"""
if tc_util.testing:
logger.info(f"Would have gotten status for {task_id}.")
else:
queue = get_taskcluster_client("queue")
response = queue.status(task_id)
if response:
return response.get("status", {})
def state_task(task_id):
"""Gets the state of a task given a task_id.
In testing mode, just logs that it would have retrieved state. This is a subset of the
data returned by :func:`status_task`.
Args:
task_id (str): A task id.
Returns:
str: The state of the task, one of
``pending, running, completed, failed, exception, unknown``.
"""
if tc_util.testing:
logger.info(f"Would have gotten state for {task_id}.")
else:
status = status_task(task_id).get("state") or "unknown"
return status
def trigger_hook(hook_group_id, hook_id, hook_payload):
hooks = Hooks({"rootUrl": get_root_url()})
response = hooks.triggerHook(hook_group_id, hook_id, hook_payload)
logger.info(
"Task seen here: {}/tasks/{}".format(
get_root_url(),
response["status"]["taskId"],
)
)
def list_task_group_tasks(task_group_id):
"""Generate the tasks in a task group"""
queue = get_taskcluster_client("queue")
tasks = []
def pagination_handler(response):
tasks.extend(response["tasks"])
queue.listTaskGroup(task_group_id, paginationHandler=pagination_handler)
return tasks
def list_task_group_incomplete_task_ids(task_group_id):
states = ("running", "pending", "unscheduled")
for task in [t["status"] for t in list_task_group_tasks(task_group_id)]:
if task["state"] in states:
yield task["taskId"]
def list_task_group_complete_tasks(task_group_id):
tasks = {}
for task in list_task_group_tasks(task_group_id):
if task.get("status", {}).get("state", "") == "completed":
tasks[task.get("task", {}).get("metadata", {}).get("name", "")] = task.get(
"status", {}
).get("taskId", "")
return tasks
def find_task(index_path):
index = get_taskcluster_client("index")
return index.findTask(index_path)