Revision control
Copy as Markdown
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
use crate::bookmark_sync::BookmarksSyncEngine;
use crate::db::db::{PlacesDb, SharedPlacesDb};
use crate::error::*;
use crate::history_sync::HistorySyncEngine;
use crate::storage::{
self, bookmarks::bookmark_sync, delete_meta, get_meta, history::history_sync, put_meta,
use crate::util::normalize_path;
use error_support::handle_error;
use interrupt_support::register_interrupt;
use lazy_static::lazy_static;
use parking_lot::Mutex;
use rusqlite::OpenFlags;
use std::cell::Cell;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Weak,
use sync15::client::{sync_multiple, MemoryCachedState, Sync15StorageClientInit, SyncResult};
use sync15::engine::{EngineSyncAssociation, SyncEngine, SyncEngineId};
use sync15::{telemetry, KeyBundle};
// Not clear if this should be here, but this is the "global sync state"
// which is persisted to disk and reused for all engines.
// Note that this is only ever round-tripped, and never changed by, or impacted
// by a store or collection, so it's safe to storage globally rather than
// per collection.
pub const GLOBAL_STATE_META_KEY: &str = "global_sync_state_v2";
// Our "sync manager" will use whatever is stashed here.
lazy_static::lazy_static! {
// Mutex: just taken long enough to update the contents - needed to wrap
// the Weak as it isn't `Sync`
// [Arc/Weak]: Stores the places api used to create the connection for
// BookmarksSyncEngine/HistorySyncEngine
static ref PLACES_API_FOR_SYNC_MANAGER: Mutex<Weak<PlacesApi>> = Mutex::new(Weak::new());
// Called by the sync manager to get a sync engine via the PlacesApi previously
// registered with the sync manager.
pub fn get_registered_sync_engine(engine_id: &SyncEngineId) -> Option<Box<dyn SyncEngine>> {
match PLACES_API_FOR_SYNC_MANAGER.lock().upgrade() {
None => {
log::warn!("places: get_registered_sync_engine: no PlacesApi registered");
Some(places_api) => match create_sync_engine(&places_api, engine_id) {
Ok(engine) => Some(engine),
Err(e) => {
// Report this to Sentry, except if it's an open database error. That indicates
// that there is a registered sync engine, but the connection is busy so we can't
// open it. This is a known issue that we don't need more reports for (see
if !matches!(e, Error::OpenDatabaseError(_)) {
"places: get_registered_sync_engine: {}",
fn create_sync_engine(
places_api: &PlacesApi,
engine_id: &SyncEngineId,
) -> Result<Box<dyn SyncEngine>> {
let conn = places_api.get_sync_connection()?;
match engine_id {
SyncEngineId::Bookmarks => Ok(Box::new(BookmarksSyncEngine::new(conn)?)),
SyncEngineId::History => Ok(Box::new(HistorySyncEngine::new(conn)?)),
_ => unreachable!("can't provide unknown engine: {}", engine_id),
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ConnectionType {
ReadOnly = 1,
ReadWrite = 2,
Sync = 3,
impl ConnectionType {
pub fn from_primitive(p: u8) -> Option<Self> {
match p {
1 => Some(ConnectionType::ReadOnly),
2 => Some(ConnectionType::ReadWrite),
3 => Some(ConnectionType::Sync),
_ => None,
impl ConnectionType {
pub fn rusqlite_flags(self) -> OpenFlags {
let common_flags = OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_URI;
match self {
ConnectionType::ReadOnly => common_flags | OpenFlags::SQLITE_OPEN_READ_ONLY,
ConnectionType::ReadWrite => {
common_flags | OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE
ConnectionType::Sync => common_flags | OpenFlags::SQLITE_OPEN_READ_WRITE,
// We only allow a single PlacesApi per filename.
lazy_static! {
static ref APIS: Mutex<HashMap<PathBuf, Weak<PlacesApi>>> = Mutex::new(HashMap::new());
static ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
pub struct SyncState {
pub mem_cached_state: Cell<MemoryCachedState>,
pub disk_cached_state: Cell<Option<String>>,
/// For uniffi we need to expose our `Arc` returning constructor as a global function :(
/// meaning we are forced into this unfortunate workaround.
pub fn places_api_new(db_name: impl AsRef<Path>) -> ApiResult<Arc<PlacesApi>> {
/// The entry-point to the places API. This object gives access to database
/// connections and other helpers. It enforces that only 1 write connection
/// can exist to the database at once.
pub struct PlacesApi {
db_name: PathBuf,
write_connection: Mutex<Option<PlacesDb>>,
sync_state: Mutex<Option<SyncState>>,
coop_tx_lock: Arc<Mutex<()>>,
// Used for get_sync_connection()
// - The inner mutex synchronizes sync operation (for example one of the [SyncEngine] methods).
// This avoids issues like #867
// - The weak facilitates connection sharing. When `get_sync_connection()` returns an Arc, we
// keep a weak reference to it. If the Arc is still alive when `get_sync_connection()` is
// called again, we reuse it.
// - The outer mutex synchronizes the `get_sync_connection()` operation. If multiple threads
// ran that at the same time there would be issues.
sync_connection: Mutex<Weak<SharedPlacesDb>>,
id: usize,
impl PlacesApi {
/// Create a new, or fetch an already open, PlacesApi backed by a file on disk.
pub fn new(db_name: impl AsRef<Path>) -> Result<Arc<Self>> {
let db_name = normalize_path(db_name)?;
/// Create a new, or fetch an already open, memory-based PlacesApi. You must
/// provide a name, but you are still able to have a single writer and many
/// reader connections to the same memory DB open.
pub fn new_memory(db_name: &str) -> Result<Arc<Self>> {
let name = PathBuf::from(format!("file:{}?mode=memory&cache=shared", db_name));
fn new_or_existing_into(
target: &mut HashMap<PathBuf, Weak<PlacesApi>>,
db_name: PathBuf,
) -> Result<Arc<Self>> {
let id = ID_COUNTER.fetch_add(1, Ordering::SeqCst);
match target.get(&db_name).and_then(Weak::upgrade) {
Some(existing) => Ok(existing),
None => {
// We always create a new read-write connection for an initial open so
// we can create the schema and/or do version upgrades.
let coop_tx_lock = Arc::new(Mutex::new(()));
let connection = PlacesDb::open(
let new = PlacesApi {
db_name: db_name.clone(),
write_connection: Mutex::new(Some(connection)),
sync_state: Mutex::new(None),
sync_connection: Mutex::new(Weak::new()),
let arc = Arc::new(new);
target.insert(db_name, Arc::downgrade(&arc));
fn new_or_existing(db_name: PathBuf) -> Result<Arc<Self>> {
let mut guard = APIS.lock();
Self::new_or_existing_into(&mut guard, db_name)
/// Open a connection to the database.
pub fn open_connection(&self, conn_type: ConnectionType) -> Result<PlacesDb> {
match conn_type {
ConnectionType::ReadOnly => {
// make a new one - we can have as many of these as we want.
ConnectionType::ReadWrite => {
// We only allow one of these.
let mut guard = self.write_connection.lock();
match guard.take() {
None => Err(Error::ConnectionAlreadyOpen),
Some(db) => Ok(db),
ConnectionType::Sync => {
panic!("Use `get_sync_connection` to open a sync connection");
// Get a database connection to sync with
// This function provides a couple features to facilitate sharing the connection between
// different sync engines:
// - Each connection is wrapped in a `Mutex<>` to synchronize access.
// - The mutex is then wrapped in an Arc<>. If the last Arc<> returned is still alive, then
// get_sync_connection() will reuse it.
pub fn get_sync_connection(&self) -> Result<Arc<SharedPlacesDb>> {
// First step: lock the outer mutex
let mut conn = self.sync_connection.lock();
match conn.upgrade() {
// If our Weak is still alive, then re-use that
Some(db) => Ok(db),
// If not, create a new connection
None => {
let db = Arc::new(SharedPlacesDb::new(PlacesDb::open(
// Store a weakref for next time
*conn = Arc::downgrade(&db);
/// Close a connection to the database. If the connection is the write
/// connection, you can re-fetch it using open_connection.
pub fn close_connection(&self, connection: PlacesDb) -> Result<()> {
if connection.api_id() != {
return Err(Error::WrongApiForClose);
if connection.conn_type() == ConnectionType::ReadWrite {
// We only allow one of these.
let mut guard = self.write_connection.lock();
*guard = Some(connection);
fn get_disk_persisted_state(&self, conn: &PlacesDb) -> Result<Option<String>> {
get_meta::<String>(conn, GLOBAL_STATE_META_KEY)
fn set_disk_persisted_state(&self, conn: &PlacesDb, state: &Option<String>) -> Result<()> {
match state {
Some(ref s) => put_meta(conn, GLOBAL_STATE_META_KEY, s),
None => delete_meta(conn, GLOBAL_STATE_META_KEY),
// This allows the embedding app to say "make this instance available to
// the sync manager". The implementation is more like "offer to sync mgr"
// (thereby avoiding us needing to link with the sync manager) but
// `register_with_sync_manager()` is logically what's happening so that's
// the name it gets.
pub fn register_with_sync_manager(self: Arc<Self>) {
*PLACES_API_FOR_SYNC_MANAGER.lock() = Arc::downgrade(&self);
// NOTE: These should be deprecated as soon as possible - that will be once
// all consumers have been updated to use the .sync() method below, and/or
// we have implemented the sync manager and migrated consumers to that.
pub fn sync_history(
client_init: &Sync15StorageClientInit,
key_bundle: &KeyBundle,
) -> Result<telemetry::SyncTelemetryPing> {
move |conn, mem_cached_state, disk_cached_state| {
let engine = HistorySyncEngine::new(conn)?;
pub fn sync_bookmarks(
client_init: &Sync15StorageClientInit,
key_bundle: &KeyBundle,
) -> Result<telemetry::SyncTelemetryPing> {
move |conn, mem_cached_state, disk_cached_state| {
let engine = BookmarksSyncEngine::new(conn)?;
pub fn do_sync_one<F>(
name: &'static str,
syncer: F,
) -> Result<telemetry::SyncTelemetryPing>
F: FnOnce(
&mut MemoryCachedState,
&mut Option<String>,
) -> Result<SyncResult>,
let mut guard = self.sync_state.lock();
let conn = self.get_sync_connection()?;
if guard.is_none() {
*guard = Some(SyncState {
mem_cached_state: Cell::default(),
disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn.lock())?),
let sync_state = guard.as_ref().unwrap();
let mut mem_cached_state = sync_state.mem_cached_state.take();
let mut disk_cached_state = sync_state.disk_cached_state.take();
let mut result = syncer(conn.clone(), &mut mem_cached_state, &mut disk_cached_state)?;
// even on failure we set the persisted state - sync itself takes care
// to ensure this has been None'd out if necessary.
self.set_disk_persisted_state(&conn.lock(), &disk_cached_state)?;
// for b/w compat reasons, we do some dances with the result.
if let Err(e) = result.result {
return Err(e.into());
match result.engine_results.remove(name) {
None | Some(Ok(())) => Ok(result.telemetry),
Some(Err(e)) => Err(e.into()),
// This is the new sync API until the sync manager lands. It's currently
// not wired up via the FFI - it's possible we'll do declined engines too
// before we do.
// Note we've made a policy decision about the return value - even though
// it is Result<SyncResult>, we will only return an Err() if there's a
// fatal error that prevents us starting a sync, such as failure to open
// the DB. Any errors that happen *after* sync must not escape - ie, once
// we have a SyncResult, we must return it.
pub fn sync(
client_init: &Sync15StorageClientInit,
key_bundle: &KeyBundle,
) -> Result<SyncResult> {
let mut guard = self.sync_state.lock();
let conn = self.get_sync_connection()?;
if guard.is_none() {
*guard = Some(SyncState {
mem_cached_state: Cell::default(),
disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn.lock())?),
let sync_state = guard.as_ref().unwrap();
let bm_engine = BookmarksSyncEngine::new(conn.clone())?;
let history_engine = HistorySyncEngine::new(conn.clone())?;
let mut mem_cached_state = sync_state.mem_cached_state.take();
let mut disk_cached_state = sync_state.disk_cached_state.take();
// NOTE: After here we must never return Err()!
let result = sync_multiple(
&[&history_engine, &bm_engine],
&mut disk_cached_state,
&mut mem_cached_state,
// even on failure we set the persisted state - sync itself takes care
// to ensure this has been None'd out if necessary.
if let Err(e) = self.set_disk_persisted_state(&conn.lock(), &disk_cached_state) {
"Failed to persist the sync state: {:?}",
pub fn wipe_bookmarks(&self) -> Result<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock();
let conn = self.get_sync_connection()?;
pub fn reset_bookmarks(&self) -> Result<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock();
let conn = self.get_sync_connection()?;
bookmark_sync::reset(&conn.lock(), &EngineSyncAssociation::Disconnected)?;
pub fn reset_history(&self) -> ApiResult<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock();
let conn = self.get_sync_connection()?;
history_sync::reset(&conn.lock(), &EngineSyncAssociation::Disconnected)?;
pub mod test {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
// A helper for our tests to get their own memory Api.
static ATOMIC_COUNTER: AtomicUsize = AtomicUsize::new(0);
pub fn new_mem_api() -> Arc<PlacesApi> {
// A bit hacky, but because this is a test-only function that almost all tests use,
// it's a convenient place to initialize logging for tests.
let _ = env_logger::try_init();
let counter = ATOMIC_COUNTER.fetch_add(1, Ordering::Relaxed);
PlacesApi::new_memory(&format!("test-api-{}", counter)).expect("should get an API")
pub fn new_mem_connection() -> PlacesDb {
.expect("should get a connection")
pub struct MemConnections {
pub read: PlacesDb,
pub write: PlacesDb,
pub api: Arc<PlacesApi>,
pub fn new_mem_connections() -> MemConnections {
let api = new_mem_api();
let read = api
.expect("should get a read connection");
let write = api
.expect("should get a write connection");
MemConnections { read, write, api }
mod tests {
use super::test::*;
use super::*;
use sql_support::ConnExt;
fn test_multi_writers_fails() {
let api = new_mem_api();
let writer1 = api
.expect("should get writer");
.expect_err("should fail to get second writer");
// But we should be able to re-get it after closing it.
.expect("should be able to close");
.expect("should get a writer after closing the other");
fn test_shared_memory() {
let api = new_mem_api();
let writer = api
.expect("should get writer");
"CREATE TABLE test_table (test_value INTEGER);
INSERT INTO test_table VALUES (999)",
.expect("should insert");
let reader = api
.expect("should get reader");
let val = reader
.query_one::<i64>("SELECT test_value FROM test_table")
.expect("should get value");
assert_eq!(val, 999);
fn test_reader_before_writer() {
let api = new_mem_api();
let reader = api
.expect("should get reader");
let writer = api
.expect("should get writer");
"CREATE TABLE test_table (test_value INTEGER);
INSERT INTO test_table VALUES (999)",
.expect("should insert");
let val = reader
.query_one::<i64>("SELECT test_value FROM test_table")
.expect("should get value");
assert_eq!(val, 999);
fn test_wrong_writer_close() {
let api = new_mem_api();
// Grab this so `api` doesn't think it still has a writer.
let _writer = api
.expect("should get writer");
let fake_api = new_mem_api();
let fake_writer = fake_api
.expect("should get writer 2");
fn test_valid_writer_close() {
let api = new_mem_api();
let writer = api
.expect("should get writer");
.expect("Should allow closing own connection");
// Make sure we can open it again.