Source code
Revision control
Copy as Markdown
Other Tools
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
import { HPKEConfigManager } from "resource://gre/modules/HPKEConfigManager.sys.mjs";
let lazy = {};
ChromeUtils.defineLazyGetter(lazy, "logConsole", function () {
  return console.createInstance({
    prefix: "DAPTelemetrySender",
    maxLogLevelPref: "toolkit.telemetry.dap.logLevel",
  });
});
ChromeUtils.defineESModuleGetters(lazy, {
  AsyncShutdown: "resource://gre/modules/AsyncShutdown.sys.mjs",
  NimbusFeatures: "resource://nimbus/ExperimentAPI.sys.mjs",
  setTimeout: "resource://gre/modules/Timer.sys.mjs",
  ObliviousHTTP: "resource://gre/modules/ObliviousHTTP.sys.mjs",
});
XPCOMUtils.defineLazyPreferenceGetter(
  lazy,
  "gTelemetryEnabled",
  "datareporting.healthreport.uploadEnabled",
  false
);
XPCOMUtils.defineLazyPreferenceGetter(
  lazy,
  "gDapEndpoint",
  "toolkit.telemetry.dap.leader.url"
);
XPCOMUtils.defineLazyPreferenceGetter(
  lazy,
  "gLeaderHpke",
  "toolkit.telemetry.dap.leader.hpke"
);
XPCOMUtils.defineLazyPreferenceGetter(
  lazy,
  "gHelperHpke",
  "toolkit.telemetry.dap.helper.hpke"
);
/**
 * The purpose of this singleton is to handle sending of DAP telemetry data.
 * The current DAP draft standard is available here:
 *
 * The specific purpose of this singleton is to make the necessary calls to fetch to do networking.
 */
export const DAPTelemetrySender = new (class {
  /**
   * @typedef { 'sum' | 'sumvec' | 'histogram' } VDAF
   */
  /**
   * Task configuration must match a configured task on the DAP server.
   *
   * @typedef {object} Task
   * @property {string} id - The task ID in urlsafe_base64 encoding.
   * @property {VDAF} vdaf - The VDAF used by the task.
   * @property {number} [bits] - The bit-width of integers in sum/sumvec measurements.
   * @property {number} [length] - The number of vector/histogram elements.
   * @property {number} time_precision - The rounding granularity in seconds
   *                                     that is applied to timestamps attached
   *                                     to the report.
   */
  async startup() {
    if (
      Services.startup.isInOrBeyondShutdownPhase(
        Ci.nsIAppStartup.SHUTDOWN_PHASE_APPSHUTDOWNCONFIRMED
      )
    ) {
      lazy.logConsole.warn(
        "DAPTelemetrySender startup not possible due to shutdown."
      );
      return;
    }
    // Note that this can block until the ExperimentAPI is available.
    // This is fine as we depend on it. In case of a race with shutdown
    // it will reject, making the below getVariable calls return null.
    await lazy.NimbusFeatures.dapTelemetry.ready();
    if (
      lazy.NimbusFeatures.dapTelemetry.getVariable("enabled") &&
      lazy.NimbusFeatures.dapTelemetry.getVariable("task1Enabled")
    ) {
      let tasks = [];
      lazy.logConsole.debug("Task 1 is enabled.");
      let task1_id =
        lazy.NimbusFeatures.dapTelemetry.getVariable("task1TaskId");
      if (task1_id !== undefined && task1_id != "") {
        let task = {
          // this is testing task 1
          id: task1_id,
          vdaf: "sumvec",
          bits: 8,
          length: 20,
          time_precision: 300,
        };
        tasks.push(task);
        lazy.setTimeout(
          () => this.timedSendTestReports(tasks),
          this.timeout_value()
        );
        lazy.NimbusFeatures.dapTelemetry.onUpdate(async () => {
          if (typeof this.counters !== "undefined") {
            await this.sendTestReports(tasks, { reason: "nimbus-update" });
          }
        });
      }
      this._asyncShutdownBlocker = async () => {
        lazy.logConsole.debug(`Sending on shutdown.`);
        // Shorter timeout to prevent crashing due to blocking shutdown
        await this.sendTestReports(tasks, {
          timeout: 2_000,
          reason: "shutdown",
        });
      };
      lazy.AsyncShutdown.appShutdownConfirmed.addBlocker(
        "DAPTelemetrySender: sending data",
        this._asyncShutdownBlocker
      );
    }
  }
  async sendTestReports(tasks, options = {}) {
    for (let task of tasks) {
      let measurement;
      if (task.vdaf == "sum") {
        measurement = 3;
      } else if (task.vdaf == "sumvec") {
        measurement = new Array(20).fill(0);
        let r = Math.floor(Math.random() * 10);
        measurement[r] += 1;
        measurement[19] += 1;
      } else if (task.vdaf == "histogram") {
        measurement = Math.floor(Math.random() * 15);
      } else {
        throw new Error(`Unknown VDAF ${task.vdaf}`);
      }
      await this.sendDAPMeasurement(task, measurement, options);
    }
  }
  async timedSendTestReports(tasks) {
    lazy.logConsole.debug("Sending on timer.");
    await this.sendTestReports(tasks);
    lazy.setTimeout(
      () => this.timedSendTestReports(tasks),
      this.timeout_value()
    );
  }
  timeout_value() {
    const MINUTE = 60 * 1000;
    return MINUTE * (9 + Math.random() * 2); // 9 - 11 minutes
  }
  /**
   * Internal testing function to verify the DAP aggregator keys match current
   * values advertised by servers.
   */
  async checkHpkeKeys() {
    async function check_key(url, expected) {
      let response = await fetch(url + "/hpke_config");
      let body = await response.arrayBuffer();
      let actual = ChromeUtils.base64URLEncode(body, { pad: false });
      if (actual != expected) {
        throw new Error(`HPKE for ${url} does not match`);
      }
    }
    await Promise.allSettled([
      await check_key(
        Services.prefs.getStringPref("toolkit.telemetry.dap.leader.url"),
        Services.prefs.getStringPref("toolkit.telemetry.dap.leader.hpke")
      ),
      await check_key(
        Services.prefs.getStringPref("toolkit.telemetry.dap.helper.url"),
        Services.prefs.getStringPref("toolkit.telemetry.dap.helper.hpke")
      ),
    ]);
  }
  /**
   * Creates a DAP report for a specific task from a measurement and sends it.
   *
   * @param {Task} task
   *   Definition of the task for which the measurement was taken.
   * @param {number|Array<number>} measurement
   *   The measured value for which a report is generated.
   * @param {object} options
   * @param {number} options.timeout
   *   The timeout for request in milliseconds. Defaults to 30s.
   * @param {string} options.reason
   *   A string to indicate the reason for triggering a submission. This is
   *   currently ignored and not recorded.
   * @param {string} options.ohttp_relay
   * @param {Uint8Array} options.ohttp_hpke
   *   If an OHTTP relay is specified, the reports are uploaded over OHTTP.
   */
  async sendDAPMeasurement(task, measurement, options = {}) {
    try {
      const controller = new AbortController();
      lazy.setTimeout(() => controller.abort(), options.timeout ?? 30_000);
      let keys = {
        leader_hpke: HPKEConfigManager.decodeKey(lazy.gLeaderHpke),
        helper_hpke: HPKEConfigManager.decodeKey(lazy.gHelperHpke),
      };
      let report = this.generateReport(task, measurement, keys);
      await this.sendReport(
        lazy.gDapEndpoint,
        task.id,
        report,
        controller.signal,
        options
      );
    } catch (e) {
      if (e.name === "AbortError") {
        lazy.logConsole.error("Aborted DAP report generation: ", e);
      } else {
        lazy.logConsole.error("DAP report generation failed: " + e);
      }
      throw e;
    }
  }
  /**
   * @typedef {object} AggregatorKeys
   * @property {Uint8Array} leader_hpke - The leader's DAP HPKE key.
   * @property {Uint8Array} helper_hpke - The helper's DAP HPKE key.
   */
  /**
   * Generates the encrypted DAP report.
   *
   * @param {Task} task
   *   Definition of the task for which the measurement was taken.
   * @param {number|Array<number>} measurement
   *   The measured value for which a report is generated.
   * @param {AggregatorKeys} keys
   *   The DAP encryption keys for each aggregator.
   *
   * @returns {ArrayBuffer} The generated binary report data.
   */
  generateReport(task, measurement, keys) {
    let task_id = new Uint8Array(
      ChromeUtils.base64URLDecode(task.id, { padding: "ignore" })
    );
    let reportOut = {};
    if (task.vdaf === "sum") {
      Services.DAPTelemetry.GetReportPrioSum(
        keys.leader_hpke,
        keys.helper_hpke,
        measurement,
        task_id,
        task.bits,
        task.time_precision,
        reportOut
      );
    } else if (task.vdaf === "sumvec") {
      if (measurement.length != task.length) {
        throw new Error(
          "Measurement vector length doesn't match task configuration"
        );
      }
      Services.DAPTelemetry.GetReportPrioSumVec(
        keys.leader_hpke,
        keys.helper_hpke,
        measurement,
        task_id,
        task.bits,
        task.time_precision,
        reportOut
      );
    } else if (task.vdaf === "histogram") {
      Services.DAPTelemetry.GetReportPrioHistogram(
        keys.leader_hpke,
        keys.helper_hpke,
        measurement,
        task_id,
        task.length,
        task.time_precision,
        reportOut
      );
    } else {
      throw new Error(
        `Unknown measurement type for task ${task.id}: ${task.vdaf} ${task.bits}`
      );
    }
    return new Uint8Array(reportOut.value).buffer;
  }
  /**
   * Sends a report to the leader.
   *
   * @param {string} leader_endpoint
   *   The URL for the leader.
   * @param {string} task_id
   *   Base64 encoded task_id as it appears in the upload path.
   * @param {ArrayBuffer} report
   *   Raw bytes of the TLS encoded report.
   * @param {AbortSignal} abortSignal
   *   Can be used to cancel network requests. Does not cancel computation.
   * @param {object} options
   * @param {string} options.ohttp_relay
   * @param {Uint8Array} options.ohttp_hpke
   *   If an OHTTP relay is specified, the reports are uploaded over OHTTP. In
   *   this case, the OHTTP and DAP keys must be provided and this code will not
   *   attempt to fetch them.
   *
   * @returns {Promise<undefined>} Once the attempt to send the report completes, whether or not it was successful.
   */
  async sendReport(leader_endpoint, task_id, report, abortSignal, options) {
    // If telemetry disabled, don't upload DAP reports either.
    if (!lazy.gTelemetryEnabled) {
      return;
    }
    const upload_path = leader_endpoint + "/tasks/" + task_id + "/reports";
    try {
      let requestOptions = {
        method: "PUT",
        headers: { "Content-Type": "application/dap-report" },
        body: report,
        signal: abortSignal,
      };
      let response;
      if (options.ohttp_relay) {
        response = await lazy.ObliviousHTTP.ohttpRequest(
          options.ohttp_relay,
          options.ohttp_hpke,
          upload_path,
          requestOptions
        );
      } else {
        response = await fetch(upload_path, requestOptions);
      }
      if (response.status != 200) {
        const content_type = response.headers.get("content-type");
        if (content_type && content_type === "application/json") {
          // A JSON error from the DAP server.
          let error = await response.json();
          throw new Error(
            `Sending failed. HTTP response: ${response.status} ${response.statusText}. Error: ${error.type} ${error.title}`
          );
        } else {
          // A different error, e.g. from a load-balancer.
          let error = await response.text();
          throw new Error(
            `Sending failed. HTTP response: ${response.status} ${response.statusText}. Error: ${error}`
          );
        }
      } else {
        lazy.logConsole.debug("DAP report sent");
      }
    } catch (err) {
      if (err.name === "AbortError") {
        lazy.logConsole.error("Aborted DAP report sending: ", err);
      } else {
        lazy.logConsole.error("Failed to send report: ", err);
      }
      throw err;
    }
  }
})();