Revision control
Copy as Markdown
Other Tools
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
//! # Metrics Ping Scheduler
//!
//! The Metrics Ping Scheduler (MPS) is responsible for scheduling "metrics" pings.
//! It implements the spec described in
use crate::metrics::{DatetimeMetric, StringMetric, TimeUnit};
use crate::storage::INTERNAL_STORAGE;
use crate::util::local_now_with_offset;
use crate::{CommonMetricData, Glean, Lifetime};
use chrono::prelude::*;
use chrono::Duration;
use once_cell::sync::Lazy;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
const SCHEDULED_HOUR: u32 = 4;
// Clippy thinks an AtomicBool would be preferred, but Condvar requires a full Mutex.
#[allow(clippy::mutex_atomic)]
static TASK_CONDVAR: Lazy<Arc<(Mutex<bool>, Condvar)>> =
Lazy::new(|| Arc::new((Mutex::new(false), Condvar::new())));
/// Describes the interface for a submitter of "metrics" pings.
/// Used to decouple the implementation so we can test it.
trait MetricsPingSubmitter {
/// Submits a metrics ping, updating the last sent time to `now`
/// (which might not be _right now_ due to processing delays (or in tests))
fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>);
}
/// Describes the interface for a scheduler of "metrics" pings.
/// Used to decouple the implementation so we can test it.
trait MetricsPingScheduler {
/// Begins a recurring schedule of "metrics" ping submissions, on another thread.
/// `now` is used with `when` to determine the first schedule interval and
/// may not be _right now_ due to processing delays (or in tests).
fn start_scheduler(
&self,
submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
);
}
/// Uses Glean to submit "metrics" pings directly.
struct GleanMetricsPingSubmitter {}
impl MetricsPingSubmitter for GleanMetricsPingSubmitter {
fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>) {
glean.submit_ping_by_name("metrics", reason);
// Always update the collection date, irrespective of the ping being sent.
get_last_sent_time_metric().set_sync_chrono(glean, now);
}
}
/// Schedule "metrics" pings directly using the default behaviour.
struct GleanMetricsPingScheduler {}
impl MetricsPingScheduler for GleanMetricsPingScheduler {
fn start_scheduler(
&self,
submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
) {
start_scheduler(submitter, now, when);
}
}
/// Performs startup checks to decide when to schedule the next "metrics" ping collection.
/// **Must** be called before draining the preinit queue.
/// (We're at the Language Bindings' mercy for that)
pub fn schedule(glean: &Glean) {
let now = local_now_with_offset();
let (cancelled_lock, _condvar) = &**TASK_CONDVAR;
if *cancelled_lock.lock().unwrap() {
log::debug!("Told to schedule, but already cancelled. Are we in a test?");
}
*cancelled_lock.lock().unwrap() = false; // Uncancel the thread.
let submitter = GleanMetricsPingSubmitter {};
let scheduler = GleanMetricsPingScheduler {};
schedule_internal(glean, submitter, scheduler, now)
}
/// Tells the scheduler task to exit quickly and cleanly.
pub fn cancel() {
let (cancelled_lock, condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
*cancelled_lock.lock().unwrap() = true; // Cancel the scheduler thread.
condvar.notify_all(); // Notify any/all listening schedulers to check whether they were cancelled.
}
fn schedule_internal(
glean: &Glean,
submitter: impl MetricsPingSubmitter + Send + 'static,
scheduler: impl MetricsPingScheduler,
now: DateTime<FixedOffset>,
) {
let last_sent_build_metric = get_last_sent_build_metric();
if let Some(last_sent_build) = last_sent_build_metric.get_value(glean, Some(INTERNAL_STORAGE)) {
// If `app_build` is longer than StringMetric's max length, we will always
// treat it as a changed build when really it isn't.
// This will be externally-observable as InvalidOverflow errors on both the core
// `client_info.app_build` metric and the scheduler's internal metric.
if last_sent_build != glean.app_build {
last_sent_build_metric.set_sync(glean, &glean.app_build);
log::info!("App build changed. Sending 'metrics' ping");
submitter.submit_metrics_ping(glean, Some("upgrade"), now);
scheduler.start_scheduler(submitter, now, When::Reschedule);
return;
}
} else {
// No value in last_sent_build. Better set one.
last_sent_build_metric.set_sync(glean, &glean.app_build);
}
let last_sent_time = get_last_sent_time_metric().get_value(glean, INTERNAL_STORAGE);
if let Some(last_sent) = last_sent_time {
log::info!("The 'metrics' ping was last sent on {}", last_sent);
}
// We aim to cover 3 cases here:
//
// 1. The ping was already collected on the current calendar day;
// only schedule one for collection on the next calendar day at the due time.
// 2. The ping was NOT collected on the current calendar day AND we're later
// than today's due time; collect the ping immediately.
// 3. The ping was NOT collected on the current calendar day BUT we still have
// some time to the due time; schedule for submitting the current calendar day.
let already_sent_today = last_sent_time.is_some_and(|d| d.date() == now.date());
if already_sent_today {
// Case #1
log::info!("The 'metrics' ping was already sent today, {}", now);
scheduler.start_scheduler(submitter, now, When::Tomorrow);
} else if now > now.date().and_hms(SCHEDULED_HOUR, 0, 0) {
// Case #2
log::info!("Sending the 'metrics' ping immediately, {}", now);
submitter.submit_metrics_ping(glean, Some("overdue"), now);
scheduler.start_scheduler(submitter, now, When::Reschedule);
} else {
// Case #3
log::info!("The 'metrics' collection is scheduled for today, {}", now);
scheduler.start_scheduler(submitter, now, When::Today);
}
}
/// "metrics" ping scheduling deadlines.
#[derive(Debug, PartialEq)]
enum When {
Today,
Tomorrow,
Reschedule,
}
impl When {
/// Returns the duration from now until our deadline.
/// Note that std::time::Duration doesn't do negative time spans, so if
/// our deadline has passed, this will return zero.
fn until(&self, now: DateTime<FixedOffset>) -> std::time::Duration {
let fire_date = match self {
Self::Today => now.date().and_hms(SCHEDULED_HOUR, 0, 0),
// Doesn't actually save us from being an hour off on DST because
// chrono doesn't know when DST changes. : (
Self::Tomorrow | Self::Reschedule => {
(now.date() + Duration::days(1)).and_hms(SCHEDULED_HOUR, 0, 0)
}
};
// After rust-lang/rust#73544 can use std::time::Duration::ZERO
(fire_date - now)
.to_std()
.unwrap_or_else(|_| std::time::Duration::from_millis(0))
}
/// The "metrics" ping reason corresponding to our deadline.
fn reason(&self) -> &'static str {
match self {
Self::Today => "today",
Self::Tomorrow => "tomorrow",
Self::Reschedule => "reschedule",
}
}
}
fn start_scheduler(
submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
) -> JoinHandle<()> {
let pair = Arc::clone(&TASK_CONDVAR);
std::thread::Builder::new()
.name("glean.mps".into())
.spawn(move || {
let (cancelled_lock, condvar) = &*pair;
let mut when = when;
let mut now = now;
loop {
let dur = when.until(now);
log::info!("Scheduling for {} after {:?}, reason {:?}", now, dur, when);
let mut timed_out = false;
{
match condvar.wait_timeout_while(cancelled_lock.lock().unwrap(), dur, |cancelled| !*cancelled) {
Err(err) => {
log::warn!("Condvar wait failure. MPS exiting. {}", err);
break;
}
Ok((cancelled, wait_result)) => {
if *cancelled {
log::info!("Metrics Ping Scheduler cancelled. Exiting.");
break;
} else if wait_result.timed_out() {
// Can't get the global glean while holding cancelled's lock.
timed_out = true;
} else {
// This should be impossible. `cancelled_lock` is acquired, and
// `!*cancelled` is checked by the condvar before it is allowed
// to return from `wait_timeout_while` (I checked).
// So `Ok(_)` implies `*cancelled || wait_result.timed_out`.
log::warn!("Spurious wakeup of the MPS condvar should be impossible.");
}
}
}
}
// Safety:
// We are okay dropping the condvar's cancelled lock here because it only guards
// whether we're cancelled, and we've established that we weren't when we timed out.
// We might _now_ be cancelled at any time, in which case when we loop back over
// we'll immediately exit. But first we need to submit our "metrics" ping.
if timed_out {
log::info!("Time to submit our metrics ping, {:?}", when);
let glean = crate::core::global_glean().expect("Global Glean not present when trying to send scheduled 'metrics' ping?!").lock().unwrap();
submitter.submit_metrics_ping(&glean, Some(when.reason()), now);
when = When::Reschedule;
}
now = local_now_with_offset();
}
}).expect("Unable to spawn Metrics Ping Scheduler thread.")
}
fn get_last_sent_time_metric() -> DatetimeMetric {
DatetimeMetric::new(
CommonMetricData {
name: "last_sent_time".into(),
category: "mps".into(),
send_in_pings: vec![INTERNAL_STORAGE.into()],
lifetime: Lifetime::User,
..Default::default()
},
TimeUnit::Minute,
)
}
fn get_last_sent_build_metric() -> StringMetric {
StringMetric::new(CommonMetricData {
name: "last_sent_build".into(),
category: "mps".into(),
send_in_pings: vec![INTERNAL_STORAGE.into()],
lifetime: Lifetime::User,
..Default::default()
})
}
#[cfg(test)]
mod test {
use super::*;
use crate::tests::new_glean;
use std::sync::atomic::{AtomicU32, Ordering};
struct ValidatingSubmitter<F: Fn(DateTime<FixedOffset>, Option<&str>)> {
submit_validator: F,
validator_run_count: Arc<AtomicU32>,
}
struct ValidatingScheduler<F: Fn(DateTime<FixedOffset>, When)> {
schedule_validator: F,
validator_run_count: Arc<AtomicU32>,
}
impl<F: Fn(DateTime<FixedOffset>, Option<&str>)> MetricsPingSubmitter for ValidatingSubmitter<F> {
fn submit_metrics_ping(
&self,
_glean: &Glean,
reason: Option<&str>,
now: DateTime<FixedOffset>,
) {
(self.submit_validator)(now, reason);
self.validator_run_count.fetch_add(1, Ordering::Relaxed);
}
}
impl<F: Fn(DateTime<FixedOffset>, When)> MetricsPingScheduler for ValidatingScheduler<F> {
fn start_scheduler(
&self,
_submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
) {
(self.schedule_validator)(now, when);
self.validator_run_count.fetch_add(1, Ordering::Relaxed);
}
}
fn new_proxies<
F1: Fn(DateTime<FixedOffset>, Option<&str>),
F2: Fn(DateTime<FixedOffset>, When),
>(
submit_validator: F1,
schedule_validator: F2,
) -> (
ValidatingSubmitter<F1>,
Arc<AtomicU32>,
ValidatingScheduler<F2>,
Arc<AtomicU32>,
) {
let submitter_count = Arc::new(AtomicU32::new(0));
let submitter = ValidatingSubmitter {
submit_validator,
validator_run_count: Arc::clone(&submitter_count),
};
let scheduler_count = Arc::new(AtomicU32::new(0));
let scheduler = ValidatingScheduler {
schedule_validator,
validator_run_count: Arc::clone(&scheduler_count),
};
(submitter, submitter_count, scheduler, scheduler_count)
}
// Ensure on first run that we actually set the last sent build metric.
// (and that we send an "overdue" ping if it's after the scheduled hour)
#[test]
fn first_run_last_sent_build() {
let (mut glean, _t) = new_glean(None);
glean.app_build = "a build".into();
let lsb_metric = get_last_sent_build_metric();
assert_eq!(None, lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE)));
let fake_now = FixedOffset::east(0)
.ymd(2022, 11, 15)
.and_hms(SCHEDULED_HOUR, 0, 1);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| assert_eq!(reason, Some("overdue")),
|_, when| assert_eq!(when, When::Reschedule),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
assert_eq!(
Some(glean.app_build.to_string()),
lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE))
);
}
// Ensure that if we have a different build, we immediately submit an "upgrade" ping
// and schedule a "reschedule" ping for tomorrow.
#[test]
fn different_app_builds_submit_and_reschedule() {
let (mut glean, _t) = new_glean(None);
glean.app_build = "a build".into();
get_last_sent_build_metric().set_sync(&glean, "a different build");
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| assert_eq!(reason, Some("upgrade")),
|_, when| assert_eq!(when, When::Reschedule),
);
schedule_internal(&glean, submitter, scheduler, local_now_with_offset());
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
// If we've already sent a ping today, ensure we don't send a ping but we
// do schedule a ping for tomorrow. ("Case #1" in schedule_internal)
#[test]
fn case_1_no_submit_but_schedule_tomorrow() {
let (glean, _t) = new_glean(None);
let fake_now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(14, 36, 14);
get_last_sent_time_metric().set_sync_chrono(&glean, fake_now);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| panic!("Case #1 shouldn't submit a ping! reason: {:?}", reason),
|_, when| assert_eq!(when, When::Tomorrow),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
// If we haven't sent a ping today and we're after the scheduled time,
// ensure we send a ping and then schedule a "reschedule" ping for tomorrow.
// ("Case #2" in schedule_internal)
#[test]
fn case_2_submit_ping_and_reschedule() {
let (glean, _t) = new_glean(None);
let fake_yesterday = FixedOffset::east(0)
.ymd(2021, 4, 29)
.and_hms(SCHEDULED_HOUR, 0, 1);
get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
let fake_now = fake_yesterday + Duration::days(1);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| assert_eq!(reason, Some("overdue")),
|_, when| assert_eq!(when, When::Reschedule),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
// If we haven't sent a ping today and we're before the scheduled time,
// ensure we don't send a ping but schedule a "today" ping for today.
// ("Case #3" in schedule_internal)
#[test]
fn case_3_no_submit_but_schedule_today() {
let (glean, _t) = new_glean(None);
let fake_yesterday =
FixedOffset::east(0)
.ymd(2021, 4, 29)
.and_hms(SCHEDULED_HOUR - 1, 0, 1);
get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
let fake_now = fake_yesterday + Duration::days(1);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| panic!("Case #3 shouldn't submit a ping! reason: {:?}", reason),
|_, when| assert_eq!(when, When::Today),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
// `When` is responsible for date math. Let's make sure it's correct.
#[test]
fn when_gets_at_least_some_date_math_correct() {
let now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(15, 2, 10);
// `now` is after `SCHEDULED_HOUR` so should be zero:
assert_eq!(std::time::Duration::from_secs(0), When::Today.until(now));
// If we bring it back before `SCHEDULED_HOUR` it should give us the duration:
let earlier = now.date().and_hms(SCHEDULED_HOUR - 1, 0, 0);
assert_eq!(
std::time::Duration::from_secs(3600),
When::Today.until(earlier)
);
// `Tomorrow` and `Reschedule` should differ only in their `reason()`
// 46670s is 12h57m10s (aka, the time from 15:02:10 to 04:00:00
// (when the timezone doesn't change between them)).
assert_eq!(
std::time::Duration::from_secs(46670),
When::Tomorrow.until(now)
);
assert_eq!(
std::time::Duration::from_secs(46670),
When::Reschedule.until(now)
);
assert_eq!(When::Tomorrow.until(now), When::Reschedule.until(now));
assert_ne!(When::Tomorrow.reason(), When::Reschedule.reason());
}
// Scheduler tests mutate global state and thus must not be run in parallel.
// Otherwise one test could cancel the other.
// This Mutex aims to solve that.
static SCHEDULER_TEST_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
// The scheduler has been designed to be cancellable. Can we cancel it?
#[test]
fn cancellable_tasks_can_be_cancelled() {
// First and foremost, all scheduler tests must ensure they start uncancelled.
// Perils of having shared state.
let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
*cancelled_lock.lock().unwrap() = false;
// Pick a time at least two hours from the next scheduled submission.
// (So that this test will time out if cancellation fails).
let now = FixedOffset::east(0)
.ymd(2021, 4, 30)
.and_hms(SCHEDULED_HOUR - 2, 0, 0);
let proxy_factory = || {
new_proxies(
|_, reason| {
panic!(
"Shouldn't submit when testing scheduler. reason: {:?}",
reason
)
},
|_, _| panic!("Not even using the scheduler this time."),
)
};
// Test Today.
let (submitter, submitter_count, _, _) = proxy_factory();
let handle = start_scheduler(submitter, now, When::Today);
super::cancel();
handle.join().unwrap(); // Should complete immediately.
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
// Test Tomorrow.
let (submitter, submitter_count, _, _) = proxy_factory();
*cancelled_lock.lock().unwrap() = false; // Uncancel.
let handle = start_scheduler(submitter, now, When::Tomorrow);
super::cancel();
handle.join().unwrap(); // Should complete immediately.
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
// Test Reschedule.
let (submitter, submitter_count, _, _) = proxy_factory();
*cancelled_lock.lock().unwrap() = false; // Uncancel.
let handle = start_scheduler(submitter, now, When::Reschedule);
super::cancel();
handle.join().unwrap(); // Should complete immediately.
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
}
// We're not keen to wait like the scheduler is, but we can test a quick schedule.
#[test]
fn immediate_task_runs_immediately() {
// First and foremost, all scheduler tests must ensure they start uncancelled.
// Perils of having shared state.
let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
*cancelled_lock.lock().unwrap() = false;
// We're actually going to submit a ping from the scheduler, which requires a global glean.
let (glean, _t) = new_glean(None);
assert!(
!glean.schedule_metrics_pings,
"Real schedulers not allowed in tests!"
);
assert!(crate::core::setup_glean(glean).is_ok());
// We're choosing a time after SCHEDULED_HOUR so `When::Today` will give us a duration of 0.
let now = FixedOffset::east(0).ymd(2021, 4, 20).and_hms(15, 42, 0);
let (submitter, submitter_count, _, _) = new_proxies(
move |_, reason| {
assert_eq!(reason, Some("today"));
// After submitting the ping we expect, let's cancel this scheduler so the thread exits.
// (But do it on another thread because the condvar loop is currently holding `cancelled`'s mutex)
std::thread::spawn(super::cancel);
},
|_, _| panic!("Not using the scheduler this time."),
);
let handle = start_scheduler(submitter, now, When::Today);
handle.join().unwrap();
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
}
}