Source code

Revision control

Other Tools

1
# This Source Code Form is subject to the terms of the Mozilla Public
2
# License, v. 2.0. If a copy of the MPL was not distributed with this
3
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
"""
5
The objective of optimization is to remove as many tasks from the graph as
6
possible, as efficiently as possible, thereby delivering useful results as
7
quickly as possible. For example, ideally if only a test script is modified in
8
a push, then the resulting graph contains only the corresponding test suite
9
task.
10
11
See ``taskcluster/docs/optimization.rst`` for more information.
12
"""
13
14
from __future__ import absolute_import, print_function, unicode_literals
15
16
import logging
17
import os
18
from collections import defaultdict
19
20
from .graph import Graph
21
from . import files_changed
22
from .taskgraph import TaskGraph
23
from .util.seta import is_low_value_task
24
from .util.perfile import perfile_number_of_chunks
25
from .util.taskcluster import find_task_id
26
from .util.parameterization import resolve_task_references
27
from mozbuild.util import memoize
28
from slugid import nice as slugid
29
from mozbuild.base import MozbuildObject
30
31
logger = logging.getLogger(__name__)
32
33
TOPSRCDIR = os.path.abspath(os.path.join(__file__, '../../../'))
34
35
36
def optimize_task_graph(target_task_graph, params, do_not_optimize,
37
existing_tasks=None, strategies=None):
38
"""
39
Perform task optimization, returning a taskgraph and a map from label to
40
assigned taskId, including replacement tasks.
41
"""
42
label_to_taskid = {}
43
if not existing_tasks:
44
existing_tasks = {}
45
46
# instantiate the strategies for this optimization process
47
if not strategies:
48
strategies = _make_default_strategies()
49
50
optimizations = _get_optimizations(target_task_graph, strategies)
51
52
removed_tasks = remove_tasks(
53
target_task_graph=target_task_graph,
54
optimizations=optimizations,
55
params=params,
56
do_not_optimize=do_not_optimize)
57
58
replaced_tasks = replace_tasks(
59
target_task_graph=target_task_graph,
60
optimizations=optimizations,
61
params=params,
62
do_not_optimize=do_not_optimize,
63
label_to_taskid=label_to_taskid,
64
existing_tasks=existing_tasks,
65
removed_tasks=removed_tasks)
66
67
return get_subgraph(
68
target_task_graph, removed_tasks, replaced_tasks,
69
label_to_taskid), label_to_taskid
70
71
72
def _make_default_strategies():
73
return {
74
'never': OptimizationStrategy(), # "never" is the default behavior
75
'index-search': IndexSearch(),
76
'seta': SETA(),
77
'skip-unless-changed': SkipUnlessChanged(),
78
'skip-unless-schedules': SkipUnlessSchedules(),
79
'skip-unless-schedules-or-seta': Either(SkipUnlessSchedules(), SETA()),
80
}
81
82
83
def _get_optimizations(target_task_graph, strategies):
84
def optimizations(label):
85
task = target_task_graph.tasks[label]
86
if task.optimization:
87
opt_by, arg = task.optimization.items()[0]
88
return (opt_by, strategies[opt_by], arg)
89
else:
90
return ('never', strategies['never'], None)
91
return optimizations
92
93
94
def _log_optimization(verb, opt_counts):
95
if opt_counts:
96
logger.info(
97
'{} '.format(verb.title()) +
98
', '.join(
99
'{} tasks by {}'.format(c, b)
100
for b, c in sorted(opt_counts.iteritems())) +
101
' during optimization.')
102
else:
103
logger.info('No tasks {} during optimization'.format(verb))
104
105
106
def remove_tasks(target_task_graph, params, optimizations, do_not_optimize):
107
"""
108
Implement the "Removing Tasks" phase, returning a set of task labels of all removed tasks.
109
"""
110
opt_counts = defaultdict(int)
111
removed = set()
112
reverse_links_dict = target_task_graph.graph.reverse_links_dict()
113
114
for label in target_task_graph.graph.visit_preorder():
115
# if we're not allowed to optimize, that's easy..
116
if label in do_not_optimize:
117
continue
118
119
# if there are remaining tasks depending on this one, do not remove..
120
if any(l not in removed for l in reverse_links_dict[label]):
121
continue
122
123
# call the optimization strategy
124
task = target_task_graph.tasks[label]
125
opt_by, opt, arg = optimizations(label)
126
if opt.should_remove_task(task, params, arg):
127
removed.add(label)
128
opt_counts[opt_by] += 1
129
continue
130
131
_log_optimization('removed', opt_counts)
132
return removed
133
134
135
def replace_tasks(target_task_graph, params, optimizations, do_not_optimize,
136
label_to_taskid, removed_tasks, existing_tasks):
137
"""
138
Implement the "Replacing Tasks" phase, returning a set of task labels of
139
all replaced tasks. The replacement taskIds are added to label_to_taskid as
140
a side-effect.
141
"""
142
opt_counts = defaultdict(int)
143
replaced = set()
144
links_dict = target_task_graph.graph.links_dict()
145
146
for label in target_task_graph.graph.visit_postorder():
147
# if we're not allowed to optimize, that's easy..
148
if label in do_not_optimize:
149
continue
150
151
# if this task depends on un-replaced, un-removed tasks, do not replace
152
if any(l not in replaced and l not in removed_tasks for l in links_dict[label]):
153
continue
154
155
# if the task already exists, that's an easy replacement
156
repl = existing_tasks.get(label)
157
if repl:
158
label_to_taskid[label] = repl
159
replaced.add(label)
160
opt_counts['existing_tasks'] += 1
161
continue
162
163
# call the optimization strategy
164
task = target_task_graph.tasks[label]
165
opt_by, opt, arg = optimizations(label)
166
repl = opt.should_replace_task(task, params, arg)
167
if repl:
168
if repl is True:
169
# True means remove this task; get_subgraph will catch any
170
# problems with removed tasks being depended on
171
removed_tasks.add(label)
172
else:
173
label_to_taskid[label] = repl
174
replaced.add(label)
175
opt_counts[opt_by] += 1
176
continue
177
178
_log_optimization('replaced', opt_counts)
179
return replaced
180
181
182
def get_subgraph(target_task_graph, removed_tasks, replaced_tasks, label_to_taskid):
183
"""
184
Return the subgraph of target_task_graph consisting only of
185
non-optimized tasks and edges between them.
186
187
To avoid losing track of taskIds for tasks optimized away, this method
188
simultaneously substitutes real taskIds for task labels in the graph, and
189
populates each task definition's `dependencies` key with the appropriate
190
taskIds. Task references are resolved in the process.
191
"""
192
193
# check for any dependency edges from included to removed tasks
194
bad_edges = [(l, r, n) for l, r, n in target_task_graph.graph.edges
195
if l not in removed_tasks and r in removed_tasks]
196
if bad_edges:
197
probs = ', '.join('{} depends on {} as {} but it has been removed'.format(l, r, n)
198
for l, r, n in bad_edges)
199
raise Exception("Optimization error: " + probs)
200
201
# fill in label_to_taskid for anything not removed or replaced
202
assert replaced_tasks <= set(label_to_taskid)
203
for label in sorted(target_task_graph.graph.nodes - removed_tasks - set(label_to_taskid)):
204
label_to_taskid[label] = slugid()
205
206
# resolve labels to taskIds and populate task['dependencies']
207
tasks_by_taskid = {}
208
named_links_dict = target_task_graph.graph.named_links_dict()
209
omit = removed_tasks | replaced_tasks
210
for label, task in target_task_graph.tasks.iteritems():
211
if label in omit:
212
continue
213
task.task_id = label_to_taskid[label]
214
named_task_dependencies = {
215
name: label_to_taskid[label]
216
for name, label in named_links_dict.get(label, {}).iteritems()}
217
218
# Add remaining soft dependencies
219
if task.soft_dependencies:
220
named_task_dependencies.update({
221
label: label_to_taskid[label]
222
for label in task.soft_dependencies
223
if label in label_to_taskid and label not in omit
224
})
225
226
task.task = resolve_task_references(task.label, task.task, named_task_dependencies)
227
deps = task.task.setdefault('dependencies', [])
228
deps.extend(sorted(named_task_dependencies.itervalues()))
229
tasks_by_taskid[task.task_id] = task
230
231
# resolve edges to taskIds
232
edges_by_taskid = (
233
(label_to_taskid.get(left), label_to_taskid.get(right), name)
234
for (left, right, name) in target_task_graph.graph.edges
235
)
236
# ..and drop edges that are no longer entirely in the task graph
237
# (note that this omits edges to replaced tasks, but they are still in task.dependnecies)
238
edges_by_taskid = set(
239
(left, right, name)
240
for (left, right, name) in edges_by_taskid
241
if left in tasks_by_taskid and right in tasks_by_taskid
242
)
243
244
return TaskGraph(
245
tasks_by_taskid,
246
Graph(set(tasks_by_taskid), edges_by_taskid))
247
248
249
class OptimizationStrategy(object):
250
def should_remove_task(self, task, params, arg):
251
"""Determine whether to optimize this task by removing it. Returns
252
True to remove."""
253
return False
254
255
def should_replace_task(self, task, params, arg):
256
"""Determine whether to optimize this task by replacing it. Returns a
257
taskId to replace this task, True to replace with nothing, or False to
258
keep the task."""
259
return False
260
261
262
class Either(OptimizationStrategy):
263
"""Given one or more optimization strategies, remove a task if any of them
264
says to, and replace with a task if any finds a replacement (preferring the
265
earliest). By default, each substrategy gets the same arg, but split_args
266
can return a list of args for each strategy, if desired."""
267
def __init__(self, *substrategies, **kwargs):
268
self.substrategies = substrategies
269
self.split_args = kwargs.pop('split_args', None)
270
if not self.split_args:
271
self.split_args = lambda arg: [arg] * len(substrategies)
272
if kwargs:
273
raise TypeError("unexpected keyword args")
274
275
def _for_substrategies(self, arg, fn):
276
for sub, arg in zip(self.substrategies, self.split_args(arg)):
277
rv = fn(sub, arg)
278
if rv:
279
return rv
280
return False
281
282
def should_remove_task(self, task, params, arg):
283
return self._for_substrategies(
284
arg,
285
lambda sub, arg: sub.should_remove_task(task, params, arg))
286
287
def should_replace_task(self, task, params, arg):
288
return self._for_substrategies(
289
arg,
290
lambda sub, arg: sub.should_replace_task(task, params, arg))
291
292
293
class IndexSearch(OptimizationStrategy):
294
295
# A task with no dependencies remaining after optimization will be replaced
296
# if artifacts exist for the corresponding index_paths.
297
# Otherwise, we're in one of the following cases:
298
# - the task has un-optimized dependencies
299
# - the artifacts have expired
300
# - some changes altered the index_paths and new artifacts need to be
301
# created.
302
# In every of those cases, we need to run the task to create or refresh
303
# artifacts.
304
305
def should_replace_task(self, task, params, index_paths):
306
"Look for a task with one of the given index paths"
307
for index_path in index_paths:
308
try:
309
task_id = find_task_id(
310
index_path,
311
use_proxy=bool(os.environ.get('TASK_ID')))
312
return task_id
313
except KeyError:
314
# 404 will end up here and go on to the next index path
315
pass
316
317
return False
318
319
320
class SETA(OptimizationStrategy):
321
def should_remove_task(self, task, params, _):
322
label = task.label
323
324
# we would like to return 'False, None' while it's high_value_task
325
# and we wouldn't optimize it. Otherwise, it will return 'True, None'
326
if is_low_value_task(label,
327
params.get('project'),
328
params.get('pushlog_id'),
329
params.get('pushdate')):
330
# Always optimize away low-value tasks
331
return True
332
else:
333
return False
334
335
336
class SkipUnlessChanged(OptimizationStrategy):
337
def should_remove_task(self, task, params, file_patterns):
338
# pushlog_id == -1 - this is the case when run from a cron.yml job
339
if params.get('pushlog_id') == -1:
340
return False
341
342
changed = files_changed.check(params, file_patterns)
343
if not changed:
344
logger.debug('no files found matching a pattern in `skip-unless-changed` for ' +
345
task.label)
346
return True
347
return False
348
349
350
class SkipUnlessSchedules(OptimizationStrategy):
351
352
@memoize
353
def scheduled_by_push(self, repository, revision):
354
changed_files = files_changed.get_changed_files(repository, revision)
355
356
mbo = MozbuildObject.from_environment()
357
# the decision task has a sparse checkout, so, mozbuild_reader will use
358
# a MercurialRevisionFinder with revision '.', which should be the same
359
# as `revision`; in other circumstances, it will use a default reader
360
rdr = mbo.mozbuild_reader(config_mode='empty')
361
362
components = set()
363
for p, m in rdr.files_info(changed_files).items():
364
components |= set(m['SCHEDULES'].components)
365
366
return components
367
368
def should_remove_task(self, task, params, conditions):
369
if params.get('pushlog_id') == -1:
370
return False
371
372
scheduled = self.scheduled_by_push(params['head_repository'], params['head_rev'])
373
conditions = set(conditions)
374
# if *any* of the condition components are scheduled, do not optimize
375
if conditions & scheduled:
376
return False
377
378
return True
379
380
381
class TestVerify(OptimizationStrategy):
382
def should_remove_task(self, task, params, _):
383
# we would like to return 'False, None' while it's high_value_task
384
# and we wouldn't optimize it. Otherwise, it will return 'True, None'
385
env = params.get('try_task_config', {}) or {}
386
env = env.get('templates', {}).get('env', {})
387
if perfile_number_of_chunks(params.is_try(),
388
env.get('MOZHARNESS_TEST_PATHS', ''),
389
params.get('head_repository', ''),
390
params.get('head_rev', ''),
391
task):
392
return False
393
return True