Source code
Revision control
Copy as Markdown
Other Tools
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
use super::{CollectionUpdate, GlobalState, LocalCollStateMachine, Sync15StorageClient};
use crate::clients_engine;
use crate::engine::SyncEngine;
use crate::error::Error;
use crate::telemetry;
use crate::KeyBundle;
use interrupt_support::Interruptee;
#[allow(clippy::too_many_arguments)]
pub fn synchronize_with_clients_engine(
client: &Sync15StorageClient,
global_state: &GlobalState,
root_sync_key: &KeyBundle,
clients: Option<&clients_engine::Engine<'_>>,
engine: &dyn SyncEngine,
fully_atomic: bool,
telem_engine: &mut telemetry::Engine,
interruptee: &dyn Interruptee,
) -> Result<(), Error> {
let collection = engine.collection_name();
log::info!("Syncing collection {}", collection);
// our global state machine is ready - get the collection machine going.
let coll_state = match LocalCollStateMachine::get_state(engine, global_state, root_sync_key)? {
Some(coll_state) => coll_state,
None => {
// XXX - this is either "error" or "declined".
log::warn!(
"can't setup for the {} collection - hopefully it works later",
collection
);
return Ok(());
}
};
if let Some(clients) = clients {
engine.prepare_for_sync(&|| clients.get_client_data())?;
}
interruptee.err_if_interrupted()?;
// We assume an "engine" manages exactly one "collection" with the engine's name.
match engine.get_collection_request(coll_state.last_modified)? {
None => {
log::info!("skipping incoming for {} - not needed.", collection);
}
Some(collection_request) => {
// Ideally we would "batch" incoming records (eg, fetch just 1000 at a time)
// and ask the engine to "stage" them as they come in - but currently we just read
// them all in one request.
// Doing this batching will involve specifying a "limit=" param and
// "x-if-unmodified-since" for each request, looking for an
// "X-Weave-Next-Offset header in the response and using that in subsequent
// requests.
//
// But even if we had that, we need to deal with a 412 response on a subsequent batch,
// so we can't know if we've staged *every* record for that timestamp; the next
// sync must use an earlier one.
//
// For this reason, an engine can't really trust a server timestamp until the
// very end when we know we've staged them all.
let incoming = super::fetch_incoming(client, &coll_state, collection_request)?;
log::info!("Downloaded {} remote changes", incoming.len());
engine.stage_incoming(incoming, telem_engine)?;
interruptee.err_if_interrupted()?;
}
};
// Should consider adding a new `fetch_outgoing()` and having `apply()` only apply.
// It *might* even make sense to only call `apply()` when something was staged,
// but that's not clear - see the discussion at
log::info!("Applying changes");
let outgoing = engine.apply(coll_state.last_modified, telem_engine)?;
interruptee.err_if_interrupted()?;
// XXX - this upload strategy is buggy due to batching. With enough records, we will commit
// 2 batches on the server. If the second fails, we get an Err<> here, so can't tell the
// engine about the successful server batch commit.
// Most stuff below should be called per-batch rather than at the successful end of all
// batches, but that's not trivial.
log::info!("Uploading {} outgoing changes", outgoing.len());
let upload_info = CollectionUpdate::new_from_changeset(
client,
&coll_state,
collection,
outgoing,
fully_atomic,
)?
.upload()?;
log::info!(
"Upload success ({} records success, {} records failed)",
upload_info.successful_ids.len(),
upload_info.failed_ids.len()
);
let mut telem_outgoing = telemetry::EngineOutgoing::new();
telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len());
telem_outgoing.failed(upload_info.failed_ids.len());
telem_engine.outgoing(telem_outgoing);
engine.set_uploaded(upload_info.modified_timestamp, upload_info.successful_ids)?;
// The above should all be per-batch :(
engine.sync_finished()?;
log::info!("Sync finished!");
Ok(())
}