Source code
Revision control
Copy as Markdown
Other Tools
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
"""
Graph morphs are modifications to task-graphs that take place *after* the
optimization phase.
These graph morphs are largely invisible to developers running `./mach`
locally, so they should be limited to changes that do not modify the meaning of
the graph.
"""
# Note that the translation of `{'task-reference': '..'}` and
# `artifact-reference` are handled in the optimization phase (since
# optimization involves dealing with taskIds directly). Similarly,
# `{'relative-datestamp': '..'}` is handled at the last possible moment during
# task creation.
import copy
import logging
import os
import re
from slugid import nice as slugid
from taskgraph.graph import Graph
from taskgraph.morph import register_morph
from taskgraph.task import Task
from taskgraph.taskgraph import TaskGraph
from .util.workertypes import get_worker_type
here = os.path.abspath(os.path.dirname(__file__))
logger = logging.getLogger(__name__)
MAX_ROUTES = 64
def amend_taskgraph(taskgraph, label_to_taskid, to_add):
"""Add the given tasks to the taskgraph, returning a new taskgraph"""
new_tasks = taskgraph.tasks.copy()
new_edges = set(taskgraph.graph.edges)
for task in to_add:
new_tasks[task.task_id] = task
assert task.label not in label_to_taskid
label_to_taskid[task.label] = task.task_id
for depname, dep in task.dependencies.items():
new_edges.add((task.task_id, dep, depname))
taskgraph = TaskGraph(new_tasks, Graph(set(new_tasks), new_edges))
return taskgraph, label_to_taskid
def derive_misc_task(
target_task,
purpose,
image,
taskgraph,
label_to_taskid,
parameters,
graph_config,
dependencies,
):
"""Create the shell of a task that depends on `dependencies` and on the given docker
image."""
label = f"{purpose}-{target_task.label}"
# this is why all docker image tasks are included in the target task graph: we
# need to find them in label_to_taskid, even if nothing else required them
image_taskid = label_to_taskid["docker-image-" + image]
provisioner_id, worker_type = get_worker_type(
graph_config,
parameters,
"misc",
)
deps = copy.copy(dependencies)
deps["docker-image"] = image_taskid
task_def = {
"provisionerId": provisioner_id,
"workerType": worker_type,
"dependencies": [d for d in deps.values()],
"created": {"relative-datestamp": "0 seconds"},
"deadline": target_task.task["deadline"],
# no point existing past the parent task's deadline
"expires": target_task.task["deadline"],
"metadata": {
"name": label,
"description": f"{purpose} for {target_task.description}",
"owner": target_task.task["metadata"]["owner"],
"source": target_task.task["metadata"]["source"],
},
"scopes": [],
"payload": {
"image": {
"path": "public/image.tar.zst",
"taskId": image_taskid,
"type": "task-image",
},
"features": {"taskclusterProxy": True},
"maxRunTime": 600,
},
}
if image_taskid not in taskgraph.tasks:
# The task above depends on the replaced docker-image not one in
# this current graph.
del deps["docker-image"]
task = Task(
kind="misc",
label=label,
attributes={},
task=task_def,
dependencies=deps,
)
task.task_id = slugid()
return task
# these regular expressions capture route prefixes for which we have a star
# scope, allowing them to be summarized. Each should correspond to a star scope
# in each Gecko `assume:repo:hg.mozilla.org/...` role.
SCOPE_SUMMARY_REGEXPS = [
re.compile(r"(index:insert-task:docker\.images\.v1\.[^.]*\.).*"),
re.compile(r"(index:insert-task:gecko\.v2\.trunk\.revision\.).*"),
re.compile(r"(index:insert-task:gecko\.v2\.[^.]*\.).*"),
re.compile(r"(index:insert-task:comm\.v2\.[^.]*\.).*"),
]
def make_index_task(
parent_task,
taskgraph,
label_to_taskid,
parameters,
graph_config,
index_paths,
index_rank,
purpose,
dependencies,
):
task = derive_misc_task(
parent_task,
purpose,
"index-task",
taskgraph,
label_to_taskid,
parameters,
graph_config,
dependencies,
)
# we need to "summarize" the scopes, otherwise a particularly
# namespace-heavy index task might have more scopes than can fit in a
# temporary credential.
scopes = set()
for path in index_paths:
scope = f"index:insert-task:{path}"
for summ_re in SCOPE_SUMMARY_REGEXPS:
match = summ_re.match(scope)
if match:
scope = match.group(1) + "*"
break
scopes.add(scope)
task.task["scopes"] = sorted(scopes)
task.task["payload"]["command"] = ["insert-indexes.js"] + index_paths
task.task["payload"]["env"] = {
"TARGET_TASKID": parent_task.task_id,
"INDEX_RANK": index_rank,
}
return task
@register_morph
def add_index_tasks(taskgraph, label_to_taskid, parameters, graph_config):
"""
The TaskCluster queue only allows 64 routes on a task. In the event a task
exceeds this limit, this graph morph adds "index tasks" that depend on it
and do the index insertions directly, avoiding the limit on task.routes.
"""
logger.debug("Morphing: adding index tasks")
# Add indexes for tasks that exceed MAX_ROUTES.
added = []
for label, task in taskgraph.tasks.items():
if len(task.task.get("routes", [])) <= MAX_ROUTES:
continue
index_paths = [
r.split(".", 1)[1] for r in task.task["routes"] if r.startswith("index.")
]
task.task["routes"] = [
r for r in task.task["routes"] if not r.startswith("index.")
]
added.append(
make_index_task(
task,
taskgraph,
label_to_taskid,
parameters,
graph_config,
index_paths=index_paths,
index_rank=task.task.get("extra", {}).get("index", {}).get("rank", 0),
purpose="index-task",
dependencies={"parent": task.task_id},
)
)
if added:
taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added)
logger.info(f"Added {len(added)} index tasks")
return taskgraph, label_to_taskid
@register_morph
def add_eager_cache_index_tasks(taskgraph, label_to_taskid, parameters, graph_config):
"""
Some tasks (e.g. cached tasks) we want to exist in the index before they even
run/complete. Our current use is to allow us to depend on an unfinished cached
task in future pushes. This graph morph adds "eager-index tasks" that depend on
the decision task and do the index insertions directly, which does not need to
wait on the pointed at task to complete.
"""
logger.debug("Morphing: Adding eager cached index's")
added = []
for label, task in taskgraph.tasks.items():
if "eager_indexes" not in task.attributes:
continue
eager_indexes = task.attributes["eager_indexes"]
added.append(
make_index_task(
task,
taskgraph,
label_to_taskid,
parameters,
graph_config,
index_paths=eager_indexes,
index_rank=0, # Be sure complete tasks get priority
purpose="eager-index",
dependencies={},
)
)
if added:
taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added)
logger.info(f"Added {len(added)} eager index tasks")
return taskgraph, label_to_taskid
@register_morph
def add_try_task_duplicates(taskgraph, label_to_taskid, parameters, graph_config):
return _add_try_task_duplicates(
taskgraph, label_to_taskid, parameters, graph_config
)
# this shim function exists so we can call it from the unittests.
# this works around an issue with
# third_party/python/taskcluster_taskgraph/taskgraph/morph.py#40
def _add_try_task_duplicates(taskgraph, label_to_taskid, parameters, graph_config):
try_config = parameters.get("try_task_config", {})
tasks = try_config.get("tasks", [])
glob_tasks = {x.strip("-*") for x in tasks if x.endswith("-*")}
tasks = set(tasks) - glob_tasks
rebuild = try_config.get("rebuild")
if rebuild:
for task in taskgraph.tasks.values():
chunk_index = -1
if task.label.endswith("-cf"):
chunk_index = -2
label_parts = task.label.split("-")
label_no_chunk = "-".join(label_parts[:chunk_index])
if label_parts[chunk_index].isnumeric() and label_no_chunk in glob_tasks:
task.attributes["task_duplicates"] = rebuild
elif task.label in tasks:
task.attributes["task_duplicates"] = rebuild
return taskgraph, label_to_taskid