# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at
from __future__ import absolute_import, print_function, unicode_literals
import argparse
from contextlib import contextmanager
import gzip
import io
import logging
from mozbuild.generated_sources import (
import os
from six.moves.queue import Queue
import requests
import sys
import tarfile
from requests.packages.urllib3.util.retry import Retry
from threading import Event, Thread
import time
# Arbitrary, should probably measure this.
log = logging.getLogger("upload-generated-sources")
def timed():
Yield a function that provides the elapsed time in seconds since this
function was called.
start = time.time()
def elapsed():
return time.time() - start
yield elapsed
def gzip_compress(data):
Apply gzip compression to `data` and return the result as a `BytesIO`.
b = io.BytesIO()
with gzip.GzipFile(fileobj=b, mode="w") as f:
return b
def upload_worker(queue, event, bucket, session_args):
Get `(name, contents)` entries from `queue` and upload `contents`
to S3 with gzip compression using `name` as the key, prefixed with
the SHA-512 digest of `contents` as a hex string. If an exception occurs,
set `event`.
import boto3
session = boto3.session.Session(**session_args)
s3 = session.client("s3")
while True:
if event.is_set():
# Some other thread hit an exception.
(name, contents) = queue.get()
pathname = get_filename_with_digest(name, contents)
compressed = gzip_compress(contents)
extra_args = {
"ContentEncoding": "gzip",
"ContentType": "text/plain",
'Uploading "{}" ({} bytes)'.format(pathname, len(compressed.getvalue()))
with timed() as elapsed:
s3.upload_fileobj(compressed, bucket, pathname, ExtraArgs=extra_args)
'Finished uploading "{}" in {:0.3f}s'.format(pathname, elapsed())
except Exception:
log.exception("Thread encountered exception:")
def do_work(artifact, region, bucket):
session_args = {"region_name": region}
session = requests.Session()
retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
http_adapter = requests.adapters.HTTPAdapter(max_retries=retry)
session.mount("https://", http_adapter)
session.mount("http://", http_adapter)
if "TASK_ID" in os.environ:
level = os.environ.get("MOZ_SCM_LEVEL", "1")
'Using AWS credentials from the secrets service: "{}"'.format(secrets_url)
res = session.get(secrets_url)
secret = res.json()
else:"Trying to use your AWS credentials..")
# First, fetch the artifact containing the sources.'Fetching generated sources artifact: "{}"'.format(artifact))
with timed() as elapsed:
res = session.get(artifact)
"Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s".format(
res.status_code, len(res.content), elapsed()
# Create a queue and worker threads for uploading.
q = Queue()
event = Event()"Creating {} worker threads".format(NUM_WORKER_THREADS))
for i in range(NUM_WORKER_THREADS):
t = Thread(target=upload_worker, args=(q, event, bucket, session_args))
t.daemon = True
with, mode="r|gz") as tar:
# Next, process each file.
for entry in tar:
if event.is_set():
break'Queueing "{}"'.format(
q.put((, tar.extractfile(entry).read()))
# Wait until all uploads are finished.
# We don't use q.join() here because we want to also monitor event.
while q.unfinished_tasks:
if event.wait(0.1):
log.error("Worker thread encountered exception, exiting...")
def main(argv):
logging.basicConfig(format="%(levelname)s - %(threadName)s - %(message)s")
parser = argparse.ArgumentParser(
description="Upload generated source files in ARTIFACT to BUCKET in S3."
parser.add_argument("artifact", help="generated-sources artifact from build task")
args = parser.parse_args(argv)
region, bucket = get_s3_region_and_bucket()
with timed() as elapsed:
do_work(region=region, bucket=bucket, artifact=args.artifact)"Finished in {:.03f}s".format(elapsed()))
return 0
if __name__ == "__main__":