Source code

Revision control

Copy as Markdown

Other Tools

# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import datetime
import logging
import requests
from taskcluster.generated import _client_importer
from taskcluster.generated.aio import _client_importer as _async_client_importer
from taskcluster.utils import stringDate
import urllib.parse
logger = logging.getLogger(__name__)
class TaskclusterConfig(object):
"""
Local configuration used to access Taskcluster service and objects
"""
def __init__(self, url=None):
self.options = None
self.secrets = None
self.default_url = url if url is not None else os.environ.get("TASKCLUSTER_ROOT_URL")
assert self.default_url is not None, "You must specify a Taskcluster deployment url"
def auth(self, client_id=None, access_token=None, max_retries=12):
"""
Build Taskcluster credentials options
Supports, by order of preference:
* directly provided credentials
* credentials from environment variables
* taskclusterProxy
* no authentication
"""
self.options = {"maxRetries": max_retries}
if client_id is None and access_token is None:
# Credentials preference: Use env. variables
client_id = os.environ.get("TASKCLUSTER_CLIENT_ID")
access_token = os.environ.get("TASKCLUSTER_ACCESS_TOKEN")
logger.info("Using taskcluster credentials from environment")
else:
logger.info("Using taskcluster credentials from cli")
if client_id is not None and access_token is not None:
# Use provided credentials
self.options["credentials"] = {
"clientId": client_id,
"accessToken": access_token,
}
self.options["rootUrl"] = self.default_url
elif "TASK_ID" in os.environ:
# Use Taskcluster Proxy when running in a task
logger.info("Taskcluster Proxy enabled")
self.options["rootUrl"] = os.environ.get("TASKCLUSTER_PROXY_URL", "http://taskcluster")
else:
logger.info("No Taskcluster authentication.")
self.options["rootUrl"] = self.default_url
def get_service(self, service_name, use_async=False):
"""
Build a Taskcluster service instance using current authentication
"""
if self.options is None:
self.auth()
client_importer = _async_client_importer if use_async else _client_importer
service = getattr(client_importer, service_name.capitalize(), None)
assert service is not None, "Invalid Taskcluster service {}".format(
service_name
)
return service(self.options)
def load_secrets(
self, secret_name, prefixes=[], required=[], existing={}, local_secrets=None
):
"""Shortcut to use load_secrets helper with current authentication"""
self.secrets = load_secrets(
self.get_service('secrets'),
secret_name,
prefixes,
required,
existing,
local_secrets,
)
return self.secrets
def upload_artifact(self, artifact_path, content, content_type, ttl):
"""Shortcut to use upload_artifact helper with current authentication"""
path = upload_artifact(
self.get_service('queue'),
artifact_path,
content,
content_type,
ttl,
)
return urllib.parse.urljoin(self.default_url, path)
def load_secrets(
secrets_service, secret_name, prefixes=[], required=[], existing={}, local_secrets=None
):
"""
Fetch a specific set of secrets by name and verify that the required
secrets exist.
Also supports providing local secrets to avoid using remote Taskcluster service
for local development (or contributor onboarding)
A user can specify prefixes to limit the part of secrets used (useful when a secret
is shared amongst several services)
"""
secrets = {}
if existing:
secrets.update(existing)
if isinstance(local_secrets, dict):
# Use local secrets file to avoid using Taskcluster secrets
logger.info("Using provided local secrets")
all_secrets = local_secrets
else:
# Use Taskcluster secret service
assert secret_name is not None, "Missing Taskcluster secret secret_name"
all_secrets = secrets_service.get(secret_name).get("secret", dict())
logger.info("Loaded Taskcluster secret {}".format(secret_name))
if prefixes:
# Use secrets behind supported prefixes
for prefix in prefixes:
secrets.update(all_secrets.get(prefix, dict()))
else:
# Use all secrets available
secrets.update(all_secrets)
# Check required secrets
for required_secret in required:
if required_secret not in secrets:
raise Exception("Missing value {} in secrets.".format(required_secret))
return secrets
def upload_artifact(queue_service, artifact_path, content, content_type, ttl):
"""
DEPRECATED. Do not use.
"""
task_id = os.environ.get("TASK_ID")
run_id = os.environ.get("RUN_ID")
proxy = os.environ.get("TASKCLUSTER_PROXY_URL")
assert task_id and run_id and proxy, "Can only run in Taskcluster tasks with proxy"
assert isinstance(content, str)
assert isinstance(ttl, datetime.timedelta)
# Create S3 artifact on Taskcluster
resp = queue_service.createArtifact(
task_id,
run_id,
artifact_path,
{
"storageType": "s3",
"expires": stringDate(datetime.datetime.utcnow() + ttl),
"contentType": content_type,
},
)
assert resp["storageType"] == "s3", "Not an s3 storage"
assert "putUrl" in resp, "Missing putUrl"
assert "contentType" in resp, "Missing contentType"
# Push the artifact on storage service
headers = {"Content-Type": resp["contentType"]}
push = requests.put(url=resp["putUrl"], headers=headers, data=content)
push.raise_for_status()
# Build the absolute url
return "/api/queue/v1/task/{task_id}/runs/{run_id}/artifacts/{path}".format(
task_id=task_id,
run_id=run_id,
path=artifact_path,
)