Skip to content

Commit

Permalink
coord: Make storage usage collection more precise
Browse files Browse the repository at this point in the history
Previously, the Coordinator would collect storage metrics on a regular
interval using a tokio::time::interval. This was lacking in the
following ways:
  - Any time spent waiting would be forgotten in between restarts.
  - A crash loop could potentially cause repeated consecutive
  collections if we decided to blindly collect usages on restart.

This commit changes the logic to check when the previous collection was
and see how much more time needs to be spent waiting.

Fixes MaterializeInc#3739
  • Loading branch information
jkosh44 committed Sep 19, 2022
1 parent 23c3422 commit eaa1507
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 4 deletions.
10 changes: 10 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4433,6 +4433,16 @@ impl<S: Append> Catalog<S> {
pub fn system_config(&self) -> &SystemVars {
self.state.system_config()
}

pub async fn most_recent_storage_usage_collection(&self) -> Result<Option<EpochMillis>, Error> {
Ok(self
.storage()
.await
.storage_usage()
.await?
.map(|usage| usage.timestamp())
.max())
}
}

pub fn is_reserved_name(name: &str) -> bool {
Expand Down
5 changes: 1 addition & 4 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,7 @@ impl<S: Append + 'static> Coordinator<S> {
// Watcher that listens for and reports compute service status changes.
let mut compute_events = self.controller.compute.watch_services();

// Trigger a storage usage metric collection on configured interval.
let mut storage_usage_update_interval =
tokio::time::interval(self.storage_usage_collection_interval);
self.schedule_storage_usage_collection().await;

loop {
// Before adding a branch to this select loop, please ensure that the branch is
Expand Down Expand Up @@ -774,7 +772,6 @@ impl<S: Append + 'static> Coordinator<S> {
// `tick()` on `Interval` is cancel-safe:
// https://docs.rs/tokio/1.19.2/tokio/time/struct.Interval.html#cancel-safety
_ = advance_timelines_interval.tick() => Message::GroupCommitInitiate,
_ = storage_usage_update_interval.tick() => Message::StorageUsageFetch,
// `recv()` on `UnboundedReceiver` is cancellation safe:
// https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
Some(collections) = consolidations_rx.recv() => {
Expand Down
28 changes: 28 additions & 0 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
//! messages from various sources (ex: controller, clients, background tasks, etc).
use std::collections::HashMap;
use std::time::Duration;

use chrono::DurationRound;
use tracing::{event, warn, Level};

use mz_compute_client::controller::ComputeInstanceEvent;
use mz_controller::ControllerResponse;
use mz_ore::now::EpochMillis;
use mz_ore::task;
use mz_persist_client::ShardId;
use mz_sql::ast::Statement;
Expand Down Expand Up @@ -131,6 +133,32 @@ impl<S: Append + 'static> Coordinator<S> {
{
tracing::warn!("Failed to update storage metrics: {:?}", err);
}
self.schedule_storage_usage_collection().await;
}

pub async fn schedule_storage_usage_collection(&self) {
let most_recent_storage_usage_collection = self
.catalog
.most_recent_storage_usage_collection()
.await
.expect("unable to get storage usage")
.unwrap_or(EpochMillis::MIN);
let time_since_previous_storage_usage_collection = self
.now()
.saturating_sub(most_recent_storage_usage_collection);
let next_storage_usage_update_interval = self
.storage_usage_collection_interval
.saturating_sub(Duration::from_millis(
time_since_previous_storage_usage_collection,
));
let internal_cmd_tx = self.internal_cmd_tx.clone();
task::spawn(|| "storage_usage_collection", async move {
tokio::time::sleep(next_storage_usage_update_interval).await;
// If sending fails, the main thread has shutdown.
if internal_cmd_tx.send(Message::StorageUsageFetch).is_err() {
return;
}
});
}

#[tracing::instrument(level = "debug", skip_all)]
Expand Down
9 changes: 9 additions & 0 deletions src/audit-log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,13 @@ impl VersionedStorageUsage {
pub fn serialize(&self) -> Vec<u8> {
serde_json::to_vec(self).expect("must serialize")
}

pub fn timestamp(&self) -> EpochMillis {
match self {
VersionedStorageUsage::V1(StorageUsageV1 {
collection_timestamp,
..
}) => *collection_timestamp,
}
}
}
103 changes: 103 additions & 0 deletions src/environmentd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,3 +677,106 @@ fn test_storage_usage_collection_interval() -> Result<(), Box<dyn Error>> {

Ok(())
}

#[test]
fn test_storage_usage_updates_between_restarts() -> Result<(), Box<dyn Error>> {
mz_ore::test::init_logging();

let data_dir = tempfile::tempdir()?;
let storage_usage_collection_interval = Duration::from_secs(3);
let config = util::Config::default()
.with_storage_usage_collection_interval(storage_usage_collection_interval)
.data_directory(data_dir.path());

// Wait for initial storage usage collection.
let initial_timestamp: f64 = {
let server = util::start_server(config.clone())?;
let mut client = server.connect(postgres::NoTls)?;
// Retry because it may take some time for the initial snapshot to be taken.
Retry::default().max_duration(Duration::from_secs(5)).retry(|_| {
Ok::<f64, String>(
client
.query_one(
"SELECT EXTRACT(EPOCH FROM MAX(collection_timestamp))::float8 FROM mz_storage_usage;",
&[],
)
.map_err(|e| e.to_string())?
.try_get::<_, f64>(0)
.map_err(|e| e.to_string())?,
)
})?
};

std::thread::sleep(storage_usage_collection_interval);

// Another storage usage collection should be scheduled immediately.
{
let server = util::start_server(config)?;
let mut client = server.connect(postgres::NoTls)?;

// Retry until storage usage is updated.
Retry::default().max_duration(Duration::from_secs(2)).retry(|_| {
let updated_timestamp = client
.query_one("SELECT EXTRACT(EPOCH FROM MAX(collection_timestamp))::float8 FROM mz_storage_usage;", &[])
.map_err(|e| e.to_string())?
.try_get::<_, f64>(0)
.map_err(|e| e.to_string())?;

if updated_timestamp > initial_timestamp {
Ok(())
} else {
Err(format!("updated storage collection timestamp {updated_timestamp} is not greater than initial timestamp {initial_timestamp}"))
}
})?;
}

Ok(())
}

#[test]
fn test_storage_usage_doesnt_update_between_restarts() -> Result<(), Box<dyn Error>> {
mz_ore::test::init_logging();

let data_dir = tempfile::tempdir()?;
let storage_usage_collection_interval = Duration::from_secs(3);
let config = util::Config::default()
.with_storage_usage_collection_interval(storage_usage_collection_interval)
.data_directory(data_dir.path());

// Wait for initial storage usage collection.
let initial_timestamp: f64 = {
let server = util::start_server(config.clone())?;
let mut client = server.connect(postgres::NoTls)?;
// Retry because it may take some time for the initial snapshot to be taken.
Retry::default().max_duration(Duration::from_secs(5)).retry(|_| {
Ok::<f64, String>(
client
.query_one(
"SELECT EXTRACT(EPOCH FROM MAX(collection_timestamp))::float8 FROM mz_storage_usage;",
&[],
)
.map_err(|e| e.to_string())?
.try_get::<_, f64>(0)
.map_err(|e| e.to_string())?,
)
})?
};

std::thread::sleep(Duration::from_secs(2));

// Another storage usage collection should not be scheduled immediately.
{
let server = util::start_server(config)?;
let mut client = server.connect(postgres::NoTls)?;

let updated_timestamp = client
.query_one(
"SELECT EXTRACT(EPOCH FROM MAX(collection_timestamp))::float8 FROM mz_storage_usage;",
&[],
)?.get::<_, f64>(0);

assert_eq!(initial_timestamp, updated_timestamp);
}

Ok(())
}

0 comments on commit eaa1507

Please sign in to comment.