Source code

Revision control

Copy as Markdown

Other Tools

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
const ArrayBufferInputStream = Components.Constructor(
"@mozilla.org/io/arraybuffer-input-stream;1",
"nsIArrayBufferInputStream"
);
const BinaryInputStream = Components.Constructor(
"@mozilla.org/binaryinputstream;1",
"nsIBinaryInputStream",
"setInputStream"
);
import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
const lazy = {};
XPCOMUtils.defineLazyServiceGetter(
lazy,
"gActivityDistributor",
"@mozilla.org/network/http-activity-distributor;1",
"nsIHttpActivityDistributor"
);
ChromeUtils.defineESModuleGetters(lazy, {
setTimeout: "resource://gre/modules/Timer.sys.mjs",
});
class NetworkThrottleListener {
#activities;
#offset;
#originalListener;
#pendingData;
#pendingException;
#queue;
#responseStarted;
/**
* Construct a new nsIStreamListener that buffers data and provides a
* method to notify another listener when data is available. This is
* used to throttle network data on a per-channel basis.
*
* After construction, @see setOriginalListener must be called on the
* new object.
*
* @param {NetworkThrottleQueue} queue the NetworkThrottleQueue to
* which status changes should be reported
*/
constructor(queue) {
this.#activities = {};
this.#offset = 0;
this.#pendingData = [];
this.#pendingException = null;
this.#queue = queue;
this.#responseStarted = false;
}
/**
* Set the original listener for this object. The original listener
* will receive requests from this object when the queue allows data
* through.
*
* @param {nsIStreamListener} originalListener the original listener
* for the channel, to which all requests will be sent
*/
setOriginalListener(originalListener) {
this.#originalListener = originalListener;
}
/**
* @see nsIStreamListener.onStartRequest.
*/
onStartRequest(request) {
this.#originalListener.onStartRequest(request);
this.#queue.start(this);
}
/**
* @see nsIStreamListener.onStopRequest.
*/
onStopRequest(request, statusCode) {
this.#pendingData.push({ request, statusCode });
this.#queue.dataAvailable(this);
}
/**
* @see nsIStreamListener.onDataAvailable.
*/
onDataAvailable(request, inputStream, offset, count) {
if (this.#pendingException) {
throw this.#pendingException;
}
const bin = new BinaryInputStream(inputStream);
const bytes = new ArrayBuffer(count);
bin.readArrayBuffer(count, bytes);
const stream = new ArrayBufferInputStream();
stream.setData(bytes, 0, count);
this.#pendingData.push({ request, stream, count });
this.#queue.dataAvailable(this);
}
/**
* Allow some buffered data from this object to be forwarded to this
* object's originalListener.
*
* @param {Number} bytesPermitted The maximum number of bytes
* permitted to be sent.
* @return {Object} an object of the form {length, done}, where
* |length| is the number of bytes actually forwarded, and
* |done| is a boolean indicating whether this particular
* request has been completed. (A NetworkThrottleListener
* may be queued multiple times, so this does not mean that
* all available data has been sent.)
*/
sendSomeData(bytesPermitted) {
if (this.#pendingData.length === 0) {
// Shouldn't happen.
return { length: 0, done: true };
}
const { request, stream, count, statusCode } = this.#pendingData[0];
if (statusCode !== undefined) {
this.#pendingData.shift();
this.#originalListener.onStopRequest(request, statusCode);
return { length: 0, done: true };
}
if (bytesPermitted > count) {
bytesPermitted = count;
}
try {
this.#originalListener.onDataAvailable(
request,
stream,
this.#offset,
bytesPermitted
);
} catch (e) {
this.#pendingException = e;
}
let done = false;
if (bytesPermitted === count) {
this.#pendingData.shift();
done = true;
} else {
this.#pendingData[0].count -= bytesPermitted;
}
this.#offset += bytesPermitted;
// Maybe our state has changed enough to emit an event.
this.#maybeEmitEvents();
return { length: bytesPermitted, done };
}
/**
* Return the number of pending data requests available for this
* listener.
*/
pendingCount() {
return this.#pendingData.length;
}
/**
* This is called when an http activity event is delivered. This
* object delays the event until the appropriate moment.
*/
addActivityCallback(
callback,
httpActivity,
channel,
activityType,
activitySubtype,
timestamp,
extraSizeData,
extraStringData
) {
const datum = {
callback,
httpActivity,
channel,
activityType,
activitySubtype,
extraSizeData,
extraStringData,
};
this.#activities[activitySubtype] = datum;
if (
activitySubtype ===
lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE
) {
this.totalSize = extraSizeData;
}
this.#maybeEmitEvents();
}
/**
* This is called for a download throttler when the latency timeout
* has ended.
*/
responseStart() {
this.#responseStarted = true;
this.#maybeEmitEvents();
}
/**
* Check our internal state and emit any http activity events as
* needed. Note that we wait until both our internal state has
* changed and we've received the real http activity event from
* platform. This approach ensures we can both pass on the correct
* data from the original event, and update the reported time to be
* consistent with the delay we're introducing.
*/
#maybeEmitEvents() {
if (this.#responseStarted) {
this.#maybeEmit(
lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_START
);
this.#maybeEmit(
lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_HEADER
);
}
if (this.totalSize !== undefined && this.#offset >= this.totalSize) {
this.#maybeEmit(
lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE
);
this.#maybeEmit(
lazy.gActivityDistributor.ACTIVITY_SUBTYPE_TRANSACTION_CLOSE
);
}
}
/**
* Emit an event for |code|, if the appropriate entry in
* |activities| is defined.
*/
#maybeEmit(code) {
if (this.#activities[code] !== undefined) {
const {
callback,
httpActivity,
channel,
activityType,
activitySubtype,
extraSizeData,
extraStringData,
} = this.#activities[code];
const now = Date.now() * 1000;
callback(
httpActivity,
channel,
activityType,
activitySubtype,
now,
extraSizeData,
extraStringData
);
this.#activities[code] = undefined;
}
}
QueryInterface = ChromeUtils.generateQI([
"nsIStreamListener",
"nsIInterfaceRequestor",
]);
}
class NetworkThrottleQueue {
#downloadQueue;
#latencyMax;
#latencyMean;
#maxBPS;
#meanBPS;
#pendingRequests;
#previousReads;
#pumping;
/**
* Construct a new queue that can be used to throttle the network for
* a group of related network requests.
*
* meanBPS {Number} Mean bytes per second.
* maxBPS {Number} Maximum bytes per second.
* latencyMean {Number} Mean latency in milliseconds.
* latencyMax {Number} Maximum latency in milliseconds.
*/
constructor(meanBPS, maxBPS, latencyMean, latencyMax) {
this.#meanBPS = meanBPS;
this.#maxBPS = maxBPS;
this.#latencyMean = latencyMean;
this.#latencyMax = latencyMax;
this.#pendingRequests = new Set();
this.#downloadQueue = [];
this.#previousReads = [];
this.#pumping = false;
}
/**
* A helper function that lets the indicating listener start sending
* data. This is called after the initial round trip time for the
* listener has elapsed.
*/
#allowDataFrom(throttleListener) {
throttleListener.responseStart();
this.#pendingRequests.delete(throttleListener);
const count = throttleListener.pendingCount();
for (let i = 0; i < count; ++i) {
this.#downloadQueue.push(throttleListener);
}
this.#pump();
}
/**
* An internal function that permits individual listeners to send
* data.
*/
#pump() {
// A redirect will cause two NetworkThrottleListeners to be on a
// listener chain. In this case, we might recursively call into
// this method. Avoid infinite recursion here.
if (this.#pumping) {
return;
}
this.#pumping = true;
const now = Date.now();
const oneSecondAgo = now - 1000;
while (
this.#previousReads.length &&
this.#previousReads[0].when < oneSecondAgo
) {
this.#previousReads.shift();
}
const totalBytes = this.#previousReads.reduce((sum, elt) => {
return sum + elt.numBytes;
}, 0);
let thisSliceBytes = this.#random(this.#meanBPS, this.#maxBPS);
if (totalBytes < thisSliceBytes) {
thisSliceBytes -= totalBytes;
let readThisTime = 0;
while (thisSliceBytes > 0 && this.#downloadQueue.length) {
const { length, done } =
this.#downloadQueue[0].sendSomeData(thisSliceBytes);
thisSliceBytes -= length;
readThisTime += length;
if (done) {
this.#downloadQueue.shift();
}
}
this.#previousReads.push({ when: now, numBytes: readThisTime });
}
// If there is more data to download, then schedule ourselves for
// one second after the oldest previous read.
if (this.#downloadQueue.length) {
const when = this.#previousReads[0].when + 1000;
lazy.setTimeout(this.#pump.bind(this), when - now);
}
this.#pumping = false;
}
/**
* A helper function that, given a mean and a maximum, returns a
* random integer between (mean - (max - mean)) and max.
*/
#random(mean, max) {
return mean - (max - mean) + Math.floor(2 * (max - mean) * Math.random());
}
/**
* Notice a new listener object. This is called by the
* NetworkThrottleListener when the request has started. Initially
* a new listener object is put into a "pending" state, until the
* round-trip time has elapsed. This is used to simulate latency.
*
* @param {NetworkThrottleListener} throttleListener the new listener
*/
start(throttleListener) {
this.#pendingRequests.add(throttleListener);
const delay = this.#random(this.#latencyMean, this.#latencyMax);
if (delay > 0) {
lazy.setTimeout(() => this.#allowDataFrom(throttleListener), delay);
} else {
this.#allowDataFrom(throttleListener);
}
}
/**
* Note that new data is available for a given listener. Each time
* data is available, the listener will be re-queued.
*
* @param {NetworkThrottleListener} throttleListener the listener
* which has data available.
*/
dataAvailable(throttleListener) {
if (!this.#pendingRequests.has(throttleListener)) {
this.#downloadQueue.push(throttleListener);
this.#pump();
}
}
}
/**
* Construct a new object that can be used to throttle the network for
* a group of related network requests.
*
* @param {Object} An object with the following attributes:
* latencyMean {Number} Mean latency in milliseconds.
* latencyMax {Number} Maximum latency in milliseconds.
* downloadBPSMean {Number} Mean bytes per second for downloads.
* downloadBPSMax {Number} Maximum bytes per second for downloads.
* uploadBPSMean {Number} Mean bytes per second for uploads.
* uploadBPSMax {Number} Maximum bytes per second for uploads.
*
* Download throttling will not be done if downloadBPSMean and
* downloadBPSMax are <= 0. Upload throttling will not be done if
* uploadBPSMean and uploadBPSMax are <= 0.
*/
export class NetworkThrottleManager {
#downloadQueue;
constructor({
latencyMean,
latencyMax,
downloadBPSMean,
downloadBPSMax,
uploadBPSMean,
uploadBPSMax,
}) {
if (downloadBPSMax <= 0 && downloadBPSMean <= 0) {
this.#downloadQueue = null;
} else {
this.#downloadQueue = new NetworkThrottleQueue(
downloadBPSMean,
downloadBPSMax,
latencyMean,
latencyMax
);
}
if (uploadBPSMax <= 0 && uploadBPSMean <= 0) {
this.uploadQueue = null;
} else {
this.uploadQueue = Cc[
"@mozilla.org/network/throttlequeue;1"
].createInstance(Ci.nsIInputChannelThrottleQueue);
this.uploadQueue.init(uploadBPSMean, uploadBPSMax);
}
}
/**
* Create a new NetworkThrottleListener for a given channel and
* install it using |setNewListener|.
*
* @param {nsITraceableChannel} channel the channel to manage
* @return {NetworkThrottleListener} the new listener, or null if
* download throttling is not being done.
*/
manage(channel) {
if (this.#downloadQueue) {
const listener = new NetworkThrottleListener(this.#downloadQueue);
const originalListener = channel.setNewListener(listener);
listener.setOriginalListener(originalListener);
return listener;
}
return null;
}
/**
* Throttle uploads taking place on the given channel.
*
* @param {nsITraceableChannel} channel the channel to manage
*/
manageUpload(channel) {
if (this.uploadQueue) {
channel = channel.QueryInterface(Ci.nsIThrottledInputChannel);
channel.throttleQueue = this.uploadQueue;
}
}
}