Source code

Revision control

Copy as Markdown

Other Tools

Test Info:

/* Any copyright is dedicated to the Public Domain.
let { PostQueue } = ChromeUtils.importESModule(
);
function makeRecord(nbytes) {
return {
toJSON: () => ({ payload: "x".repeat(nbytes) }),
};
}
// Note: This is 14 bytes. Tests make assumptions about this (even if it's just
// in setting config.max_request_bytes to a specific value).
makeRecord.nonPayloadOverhead = JSON.stringify(makeRecord(0).toJSON()).length;
// Gives how many encoded bytes a request with the given payload
// sizes will be (assuming the records were created by makeRecord)
// requestBytesFor([20]) => 22, requestBytesFor([20, 20]) => 43
function requestBytesFor(recordPayloadByteCounts) {
let requestBytes = 1;
for (let size of recordPayloadByteCounts) {
requestBytes += size + 1 + makeRecord.nonPayloadOverhead;
}
return requestBytes;
}
function makePostQueue(config, lastModTime, responseGenerator) {
let stats = {
posts: [],
batches: [],
};
let poster = (data, headers, batch, commit) => {
let payloadBytes = 0;
let numRecords = 0;
for (let record of JSON.parse(data)) {
if (config.max_record_payload_bytes) {
less(
record.payload.length,
config.max_record_payload_bytes,
"PostQueue should respect max_record_payload_bytes"
);
}
payloadBytes += record.payload.length;
++numRecords;
}
let thisPost = {
nbytes: data.length,
batch,
commit,
payloadBytes,
numRecords,
};
if (headers.length) {
thisPost.headers = headers;
}
// check that we respected the provided limits for the post
if (config.max_post_records) {
lessOrEqual(
numRecords,
config.max_post_records,
"PostQueue should respect max_post_records"
);
}
if (config.max_post_bytes) {
less(
payloadBytes,
config.max_post_bytes,
"PostQueue should respect max_post_bytes"
);
}
if (config.max_request_bytes) {
less(
thisPost.nbytes,
config.max_request_bytes,
"PostQueue should respect max_request_bytes"
);
}
stats.posts.push(thisPost);
// Call this now so we can check if there's a batch id in it.
// Kind of cludgey, but allows us to have the correct batch id even
// before the next post is made.
let nextResponse = responseGenerator.next().value;
// Record info for the batch.
let curBatch = stats.batches[stats.batches.length - 1];
// If there's no batch, it committed, or we requested a new one,
// then we need to start a new one.
if (!curBatch || batch == "true" || curBatch.didCommit) {
curBatch = {
posts: 0,
payloadBytes: 0,
numRecords: 0,
didCommit: false,
batch,
serverBatch: false,
};
if (nextResponse.obj && nextResponse.obj.batch) {
curBatch.batch = nextResponse.obj.batch;
curBatch.serverBatch = true;
}
stats.batches.push(curBatch);
}
// If we provided a batch id, it must be the same as the current batch
if (batch && batch != "true") {
equal(curBatch.batch, batch);
}
curBatch.posts += 1;
curBatch.payloadBytes += payloadBytes;
curBatch.numRecords += numRecords;
curBatch.didCommit = commit;
// if this is an actual server batch (or it's a one-shot batch), check that
// we respected the provided total limits
if (commit && (batch == "true" || curBatch.serverBatch)) {
if (config.max_total_records) {
lessOrEqual(
curBatch.numRecords,
config.max_total_records,
"PostQueue should respect max_total_records"
);
}
if (config.max_total_bytes) {
less(
curBatch.payloadBytes,
config.max_total_bytes,
"PostQueue should respect max_total_bytes"
);
}
}
return Promise.resolve(nextResponse);
};
let done = () => {};
let pq = new PostQueue(poster, lastModTime, config, getTestLogger(), done);
return { pq, stats };
}
add_task(async function test_simple() {
let config = {
max_request_bytes: 1000,
max_record_payload_bytes: 1000,
};
const time = 11111111;
function* responseGenerator() {
yield {
success: true,
status: 200,
headers: {
"x-weave-timestamp": time + 100,
"x-last-modified": time + 100,
},
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
await pq.enqueue(makeRecord(10));
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([10]),
payloadBytes: 10,
numRecords: 1,
commit: true, // we don't know if we have batch semantics, so committed.
headers: [["x-if-unmodified-since", time]],
batch: "true",
},
]);
deepEqual(stats.batches, [
{
posts: 1,
payloadBytes: 10,
numRecords: 1,
didCommit: true,
batch: "true",
serverBatch: false,
},
]);
});
// Test we do the right thing when we need to make multiple posts when there
// are no batch semantics
add_task(async function test_max_request_bytes_no_batch() {
let config = {
max_request_bytes: 50,
max_record_payload_bytes: 50,
};
const time = 11111111;
function* responseGenerator() {
yield {
success: true,
status: 200,
headers: {
"x-weave-timestamp": time + 100,
"x-last-modified": time + 100,
},
};
yield {
success: true,
status: 200,
headers: {
"x-weave-timestamp": time + 200,
"x-last-modified": time + 200,
},
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
let payloadSize = 20 - makeRecord.nonPayloadOverhead;
await pq.enqueue(makeRecord(payloadSize)); // total size now 22 bytes - "[" + record + "]"
await pq.enqueue(makeRecord(payloadSize)); // total size now 43 bytes - "[" + record + "," + record + "]"
await pq.enqueue(makeRecord(payloadSize)); // this will exceed our byte limit, so will be in the 2nd POST.
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 43, // 43 for the first part
payloadBytes: payloadSize * 2,
numRecords: 2,
commit: false,
headers: [["x-if-unmodified-since", time]],
batch: "true",
},
{
nbytes: 22,
payloadBytes: payloadSize,
numRecords: 1,
commit: false, // we know we aren't in a batch, so never commit.
headers: [["x-if-unmodified-since", time + 100]],
batch: null,
},
]);
equal(stats.batches.filter(x => x.didCommit).length, 0);
equal(pq.lastModified, time + 200);
});
add_task(async function test_max_record_payload_bytes_no_batch() {
let config = {
max_request_bytes: 100,
max_record_payload_bytes: 50,
};
const time = 11111111;
function* responseGenerator() {
yield {
success: true,
status: 200,
headers: {
"x-weave-timestamp": time + 100,
"x-last-modified": time + 100,
},
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
// Should trigger when the record really is too large to fit
let { enqueued } = await pq.enqueue(makeRecord(51));
ok(!enqueued);
// Shouldn't trigger when the encoded record is too big
ok(
(await pq.enqueue(makeRecord(50 - makeRecord.nonPayloadOverhead))).enqueued
); // total size now 52 bytes - "[" + record + "]"
ok(
(await pq.enqueue(makeRecord(46 - makeRecord.nonPayloadOverhead))).enqueued
); // total size now 99 bytes - "[" + record0 + "," + record1 + "]"
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: 99,
payloadBytes: 50 + 46 - makeRecord.nonPayloadOverhead * 2,
numRecords: 2,
commit: true, // we know we aren't in a batch, so never commit.
batch: "true",
headers: [["x-if-unmodified-since", time]],
},
]);
deepEqual(stats.batches, [
{
posts: 1,
payloadBytes: 50 + 46 - makeRecord.nonPayloadOverhead * 2,
numRecords: 2,
didCommit: true,
batch: "true",
serverBatch: false,
},
]);
equal(pq.lastModified, time + 100);
});
// Batch tests.
// Test making a single post when batch semantics are in place.
add_task(async function test_single_batch() {
let config = {
max_post_bytes: 1000,
max_post_records: 100,
max_total_records: 200,
max_record_payload_bytes: 1000,
};
const time = 11111111;
function* responseGenerator() {
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 },
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
ok((await pq.enqueue(makeRecord(10))).enqueued);
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([10]),
numRecords: 1,
payloadBytes: 10,
commit: true, // we don't know if we have batch semantics, so committed.
batch: "true",
headers: [["x-if-unmodified-since", time]],
},
]);
deepEqual(stats.batches, [
{
posts: 1,
payloadBytes: 10,
numRecords: 1,
didCommit: true,
batch: 1234,
serverBatch: true,
},
]);
});
// Test we do the right thing when we need to make multiple posts due to
// max_post_bytes when there are batch semantics in place.
add_task(async function test_max_post_bytes_batch() {
let config = {
max_post_bytes: 50,
max_post_records: 4,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 50,
max_request_bytes: 4000,
};
const time = 11111111;
function* responseGenerator() {
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 },
};
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: {
"x-last-modified": time + 200,
"x-weave-timestamp": time + 200,
},
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
ok((await pq.enqueue(makeRecord(20))).enqueued); // 20
ok((await pq.enqueue(makeRecord(20))).enqueued); // 40
// 60 would overflow, so post
ok((await pq.enqueue(makeRecord(20))).enqueued); // 20
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([20, 20]),
payloadBytes: 40,
numRecords: 2,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time]],
},
{
nbytes: requestBytesFor([20]),
payloadBytes: 20,
numRecords: 1,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time]],
},
]);
deepEqual(stats.batches, [
{
posts: 2,
payloadBytes: 60,
numRecords: 3,
didCommit: true,
batch: 1234,
serverBatch: true,
},
]);
equal(pq.lastModified, time + 200);
});
// Test we do the right thing when we need to make multiple posts due to
// max_request_bytes when there are batch semantics in place.
add_task(async function test_max_request_bytes_batch() {
let config = {
max_post_bytes: 60,
max_post_records: 40,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 500,
max_request_bytes: 100,
};
const time = 11111111;
function* responseGenerator() {
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 },
};
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: {
"x-last-modified": time + 200,
"x-weave-timestamp": time + 200,
},
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
ok((await pq.enqueue(makeRecord(10))).enqueued); // post: 10, request: 26 (10 + 14 + 2)
ok((await pq.enqueue(makeRecord(10))).enqueued); // post: 20, request: 51 (10 + 14 + 1) * 2 + 1
ok((await pq.enqueue(makeRecord(10))).enqueued); // post: 30, request: 76 (10 + 14 + 1) * 3 + 1
// 1 more would be post: 40 (fine), request: 101, So we should post.
ok((await pq.enqueue(makeRecord(10))).enqueued);
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([10, 10, 10]),
payloadBytes: 30,
numRecords: 3,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time]],
},
{
nbytes: requestBytesFor([10]),
payloadBytes: 10,
numRecords: 1,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time]],
},
]);
deepEqual(stats.batches, [
{
posts: 2,
payloadBytes: 40,
numRecords: 4,
didCommit: true,
batch: 1234,
serverBatch: true,
},
]);
equal(pq.lastModified, time + 200);
});
// Test we do the right thing when the batch bytes limit is exceeded.
add_task(async function test_max_total_bytes_batch() {
let config = {
max_post_bytes: 50,
max_post_records: 20,
max_total_bytes: 70,
max_total_records: 100,
max_record_payload_bytes: 50,
max_request_bytes: 500,
};
const time0 = 11111111;
const time1 = 22222222;
function* responseGenerator() {
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: { "x-last-modified": time0, "x-weave-timestamp": time0 + 100 },
};
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: { "x-last-modified": time1, "x-weave-timestamp": time1 },
};
yield {
success: true,
status: 202,
obj: { batch: 5678 },
headers: { "x-last-modified": time1, "x-weave-timestamp": time1 + 100 },
};
yield {
success: true,
status: 202,
obj: { batch: 5678 },
headers: {
"x-last-modified": time1 + 200,
"x-weave-timestamp": time1 + 200,
},
};
}
let { pq, stats } = makePostQueue(config, time0, responseGenerator());
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 20
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 40, batch: 40
// this will exceed our POST byte limit, so will be in the 2nd POST - but still in the first batch.
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 60
// this will exceed our batch byte limit, so will be in a new batch.
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 20
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 40, batch: 40
// This will exceed POST byte limit, so will be in the 4th post, part of the 2nd batch.
ok((await pq.enqueue(makeRecord(20))).enqueued); // payloads = post: 20, batch: 60
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([20, 20]),
payloadBytes: 40,
numRecords: 2,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time0]],
},
{
nbytes: requestBytesFor([20]),
payloadBytes: 20,
numRecords: 1,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time0]],
},
{
nbytes: requestBytesFor([20, 20]),
payloadBytes: 40,
numRecords: 2,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time1]],
},
{
nbytes: requestBytesFor([20]),
payloadBytes: 20,
numRecords: 1,
commit: true,
batch: 5678,
headers: [["x-if-unmodified-since", time1]],
},
]);
deepEqual(stats.batches, [
{
posts: 2,
payloadBytes: 60,
numRecords: 3,
didCommit: true,
batch: 1234,
serverBatch: true,
},
{
posts: 2,
payloadBytes: 60,
numRecords: 3,
didCommit: true,
batch: 5678,
serverBatch: true,
},
]);
equal(pq.lastModified, time1 + 200);
});
// Test we split up the posts when we exceed the record limit when batch semantics
// are in place.
add_task(async function test_max_post_records_batch() {
let config = {
max_post_bytes: 1000,
max_post_records: 2,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 1000,
max_request_bytes: 1000,
};
const time = 11111111;
function* responseGenerator() {
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 },
};
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: {
"x-last-modified": time + 200,
"x-weave-timestamp": time + 200,
},
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
ok((await pq.enqueue(makeRecord(20))).enqueued);
ok((await pq.enqueue(makeRecord(20))).enqueued);
// will exceed record limit of 2, so will be in 2nd post.
ok((await pq.enqueue(makeRecord(20))).enqueued);
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([20, 20]),
numRecords: 2,
payloadBytes: 40,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time]],
},
{
nbytes: requestBytesFor([20]),
numRecords: 1,
payloadBytes: 20,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time]],
},
]);
deepEqual(stats.batches, [
{
posts: 2,
payloadBytes: 60,
numRecords: 3,
batch: 1234,
serverBatch: true,
didCommit: true,
},
]);
equal(pq.lastModified, time + 200);
});
// Test we do the right thing when the batch record limit is exceeded.
add_task(async function test_max_records_batch() {
let config = {
max_post_bytes: 1000,
max_post_records: 3,
max_total_bytes: 10000,
max_total_records: 5,
max_record_payload_bytes: 1000,
max_request_bytes: 10000,
};
const time0 = 11111111;
const time1 = 22222222;
function* responseGenerator() {
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: { "x-last-modified": time0, "x-weave-timestamp": time0 + 100 },
};
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: { "x-last-modified": time1, "x-weave-timestamp": time1 },
};
yield {
success: true,
status: 202,
obj: { batch: 5678 },
headers: { "x-last-modified": time1, "x-weave-timestamp": time1 + 100 },
};
yield {
success: true,
status: 202,
obj: { batch: 5678 },
headers: {
"x-last-modified": time1 + 200,
"x-weave-timestamp": time1 + 200,
},
};
}
let { pq, stats } = makePostQueue(config, time0, responseGenerator());
ok((await pq.enqueue(makeRecord(20))).enqueued);
ok((await pq.enqueue(makeRecord(20))).enqueued);
ok((await pq.enqueue(makeRecord(20))).enqueued);
ok((await pq.enqueue(makeRecord(20))).enqueued);
ok((await pq.enqueue(makeRecord(20))).enqueued);
ok((await pq.enqueue(makeRecord(20))).enqueued);
ok((await pq.enqueue(makeRecord(20))).enqueued);
ok((await pq.enqueue(makeRecord(20))).enqueued);
ok((await pq.enqueue(makeRecord(20))).enqueued);
await pq.flush(true);
deepEqual(stats.posts, [
{
// 3 records
nbytes: requestBytesFor([20, 20, 20]),
payloadBytes: 60,
numRecords: 3,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time0]],
},
{
// 2 records -- end batch1
nbytes: requestBytesFor([20, 20]),
payloadBytes: 40,
numRecords: 2,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time0]],
},
{
// 3 records
nbytes: requestBytesFor([20, 20, 20]),
payloadBytes: 60,
numRecords: 3,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time1]],
},
{
// 1 record -- end batch2
nbytes: requestBytesFor([20]),
payloadBytes: 20,
numRecords: 1,
commit: true,
batch: 5678,
headers: [["x-if-unmodified-since", time1]],
},
]);
deepEqual(stats.batches, [
{
posts: 2,
payloadBytes: 100,
numRecords: 5,
batch: 1234,
serverBatch: true,
didCommit: true,
},
{
posts: 2,
payloadBytes: 80,
numRecords: 4,
batch: 5678,
serverBatch: true,
didCommit: true,
},
]);
equal(pq.lastModified, time1 + 200);
});
// Test we do the right thing when the limits are met but not exceeded.
add_task(async function test_packed_batch() {
let config = {
max_post_bytes: 41,
max_post_records: 4,
max_total_bytes: 81,
max_total_records: 8,
max_record_payload_bytes: 20 + makeRecord.nonPayloadOverhead + 1,
max_request_bytes: requestBytesFor([10, 10, 10, 10]) + 1,
};
const time = 11111111;
function* responseGenerator() {
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: { "x-last-modified": time, "x-weave-timestamp": time + 100 },
};
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: {
"x-last-modified": time + 200,
"x-weave-timestamp": time + 200,
},
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
ok((await pq.enqueue(makeRecord(10))).enqueued);
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([10, 10, 10, 10]),
numRecords: 4,
payloadBytes: 40,
commit: false,
batch: "true",
headers: [["x-if-unmodified-since", time]],
},
{
nbytes: requestBytesFor([10, 10, 10, 10]),
numRecords: 4,
payloadBytes: 40,
commit: true,
batch: 1234,
headers: [["x-if-unmodified-since", time]],
},
]);
deepEqual(stats.batches, [
{
posts: 2,
payloadBytes: 80,
numRecords: 8,
batch: 1234,
serverBatch: true,
didCommit: true,
},
]);
equal(pq.lastModified, time + 200);
});
// Tests that check that a single record fails to enqueue for the provided config
async function test_enqueue_failure_case(failureLimit, config) {
const time = 11111111;
function* responseGenerator() {
yield {
success: true,
status: 202,
obj: { batch: 1234 },
headers: {
"x-last-modified": time + 100,
"x-weave-timestamp": time + 100,
},
};
}
let { pq, stats } = makePostQueue(config, time, responseGenerator());
// Check on empty postqueue
let result = await pq.enqueue(makeRecord(failureLimit + 1));
ok(!result.enqueued);
notEqual(result.error, undefined);
ok((await pq.enqueue(makeRecord(5))).enqueued);
// check on nonempty postqueue
result = await pq.enqueue(makeRecord(failureLimit + 1));
ok(!result.enqueued);
notEqual(result.error, undefined);
// make sure that we keep working, skipping the bad record entirely
// (handling the error the queue reported is left up to caller)
ok((await pq.enqueue(makeRecord(5))).enqueued);
await pq.flush(true);
deepEqual(stats.posts, [
{
nbytes: requestBytesFor([5, 5]),
numRecords: 2,
payloadBytes: 10,
commit: true,
batch: "true",
headers: [["x-if-unmodified-since", time]],
},
]);
deepEqual(stats.batches, [
{
posts: 1,
payloadBytes: 10,
numRecords: 2,
batch: 1234,
serverBatch: true,
didCommit: true,
},
]);
equal(pq.lastModified, time + 100);
}
add_task(async function test_max_post_bytes_enqueue_failure() {
await test_enqueue_failure_case(50, {
max_post_bytes: 50,
max_post_records: 100,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 500,
max_request_bytes: 500,
});
});
add_task(async function test_max_request_bytes_enqueue_failure() {
await test_enqueue_failure_case(50, {
max_post_bytes: 500,
max_post_records: 100,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 500,
max_request_bytes: 50,
});
});
add_task(async function test_max_record_payload_bytes_enqueue_failure() {
await test_enqueue_failure_case(50, {
max_post_bytes: 500,
max_post_records: 100,
max_total_bytes: 5000,
max_total_records: 100,
max_record_payload_bytes: 50,
max_request_bytes: 500,
});
});