Skip to content

Commit

Permalink
coord: Make storage usage interval configurable (#14787)
Browse files Browse the repository at this point in the history
This commit makes the storage usage collection interval configurable,
so that it is easier to test.

Works towards resolving #3739
  • Loading branch information
jkosh44 authored Sep 13, 2022
1 parent 424cc9e commit 149a59c
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 5 deletions.
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ wrappers = [
"mysql_common",
"native-tls",
"opentelemetry",
"parse_duration",
"procfs",
"prometheus",
"proptest",
Expand Down
9 changes: 4 additions & 5 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ mod timestamp_selection;
pub const DEFAULT_LOGICAL_COMPACTION_WINDOW_MS: Option<mz_repr::Timestamp> =
Some(Timestamp::new(1_000));

/// The default interval at which to collect storage usage information.
pub const DEFAULT_STORAGE_USAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(3600);

/// A dummy availability zone to use when no availability zones are explicitly
/// specified.
pub const DUMMY_AVAILABILITY_ZONE: &str = "";
Expand Down Expand Up @@ -232,6 +229,7 @@ pub struct Config<S> {
pub default_storage_host_size: Option<String>,
pub connection_context: ConnectionContext,
pub storage_usage_client: StorageUsageClient,
pub storage_usage_collection_interval: Duration,
}

/// Soft-state metadata about a compute replica
Expand Down Expand Up @@ -352,7 +350,7 @@ pub struct Coordinator<S> {
/// dropped and for which no further updates should be recorded.
transient_replica_metadata: HashMap<ReplicaId, Option<ReplicaMetadata>>,

// Persist client for fetching storage metadata such as size metrics.
/// Persist client for fetching storage metadata such as size metrics.
storage_usage_client: StorageUsageClient,
/// The interval at which to collect storage usage information.
storage_usage_collection_interval: Duration,
Expand Down Expand Up @@ -820,6 +818,7 @@ pub async fn serve<S: Append + 'static>(
mut availability_zones,
connection_context,
storage_usage_client,
storage_usage_collection_interval,
}: Config<S>,
) -> Result<(Handle, Client), AdapterError> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -923,7 +922,7 @@ pub async fn serve<S: Append + 'static>(
connection_context,
transient_replica_metadata: HashMap::new(),
storage_usage_client,
storage_usage_collection_interval: DEFAULT_STORAGE_USAGE_COLLECTION_INTERVAL,
storage_usage_collection_interval,
};
let bootstrap =
handle.block_on(coord.bootstrap(builtin_migration_metadata, builtin_table_updates));
Expand Down
11 changes: 11 additions & 0 deletions src/environmentd/src/bin/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::process;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use anyhow::{bail, Context};
use clap::{ArgEnum, Parser};
Expand Down Expand Up @@ -53,6 +54,7 @@ use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::{PersistConfig, PersistLocation};
use mz_repr::util::parse_duration;
use mz_secrets::SecretsController;
use mz_storage::types::connections::ConnectionContext;

Expand Down Expand Up @@ -356,6 +358,14 @@ pub struct Args {
requires = "storage-host-sizes"
)]
default_storage_host_size: Option<String>,
/// The interval in seconds at which to collect storage usage information.
#[clap(
long,
env = "STORAGE_USAGE_COLLECTION_INTERVAL",
parse(try_from_str = parse_duration),
default_value = "3600s"
)]
storage_usage_collection_interval_sec: Duration,

// === Tracing options. ===
#[clap(flatten)]
Expand Down Expand Up @@ -685,6 +695,7 @@ max log level: {max_log_level}",
secrets_reader,
),
otel_enable_callback,
storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
}))?;

println!(
Expand Down
4 changes: 4 additions & 0 deletions src/environmentd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context};
use futures::StreamExt;
Expand Down Expand Up @@ -102,6 +103,8 @@ pub struct Config {
pub storage_host_sizes: StorageHostSizeMap,
/// Default storage host size, should be a key from storage_host_sizes.
pub default_storage_host_size: Option<String>,
/// The interval at which to collect storage usage information.
pub storage_usage_collection_interval: Duration,

// === Tracing options. ===
/// The metrics registry to use.
Expand Down Expand Up @@ -266,6 +269,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
availability_zones: config.availability_zones,
connection_context: config.connection_context,
storage_usage_client,
storage_usage_collection_interval: config.storage_usage_collection_interval,
})
.await?;

Expand Down
41 changes: 41 additions & 0 deletions src/environmentd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,3 +636,44 @@ fn test_cancel_dataflow_removal() -> Result<(), Box<dyn Error>> {

Ok(())
}

#[test]
fn test_storage_usage_collection_interval() -> Result<(), Box<dyn Error>> {
mz_ore::test::init_logging();

let config =
util::Config::default().with_storage_usage_collection_interval(Duration::from_secs(1));
let server = util::start_server(config)?;
let mut client = server.connect(postgres::NoTls)?;

// Retry because it may take some time for the initial snapshot to be taken.
let initial_storage: i64 = Retry::default().retry(|_| {
Ok::<i64, String>(
client
.query_one("SELECT SUM(size_bytes)::int8 FROM mz_storage_usage;", &[])
.map_err(|e| e.to_string())?
.try_get::<_, i64>(0)
.map_err(|e| e.to_string())?,
)
})?;

client.batch_execute(&"CREATE TABLE t (a INT)")?;
client.batch_execute(&"INSERT INTO t VALUES (1), (2)")?;

// Retry until storage usage is updated.
Retry::default().max_duration(Duration::from_secs(5)).retry(|_| {
let updated_storage = client
.query_one("SELECT SUM(size_bytes)::int8 FROM mz_storage_usage;", &[])
.map_err(|e| e.to_string())?
.try_get::<_, i64>(0)
.map_err(|e| e.to_string())?;

if updated_storage > initial_storage {
Ok(())
} else {
Err(format!("updated storage count {updated_storage} is not greater than initial storage {initial_storage}"))
}
})?;

Ok(())
}
12 changes: 12 additions & 0 deletions src/environmentd/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -56,6 +57,7 @@ pub struct Config {
workers: usize,
now: NowFn,
seed: u32,
storage_usage_collection_interval: Duration,
}

impl Default for Config {
Expand All @@ -68,6 +70,7 @@ impl Default for Config {
workers: 1,
now: SYSTEM_TIME.clone(),
seed: rand::random(),
storage_usage_collection_interval: Duration::from_secs(3600),
}
}
}
Expand Down Expand Up @@ -111,6 +114,14 @@ impl Config {
self.now = now;
self
}

pub fn with_storage_usage_collection_interval(
mut self,
storage_usage_collection_interval: Duration,
) -> Self {
self.storage_usage_collection_interval = storage_usage_collection_interval;
self
}
}

pub fn start_server(config: Config) -> Result<Server, anyhow::Error> {
Expand Down Expand Up @@ -202,6 +213,7 @@ pub fn start_server(config: Config) -> Result<Server, anyhow::Error> {
(Arc::clone(&orchestrator) as Arc<dyn SecretsController>).reader(),
),
otel_enable_callback: mz_ore::tracing::OpenTelemetryEnableCallback::none(),
storage_usage_collection_interval: config.storage_usage_collection_interval,
}))?;
let server = Server {
inner,
Expand Down
2 changes: 2 additions & 0 deletions src/sqllogictest/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use std::path::Path;
use std::str;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use anyhow::{anyhow, bail};
use bytes::{Buf, BytesMut};
Expand Down Expand Up @@ -677,6 +678,7 @@ impl Runner {
(Arc::clone(&orchestrator) as Arc<dyn SecretsController>).reader(),
),
otel_enable_callback: mz_ore::tracing::OpenTelemetryEnableCallback::none(),
storage_usage_collection_interval: Duration::from_secs(3600),
};
// We need to run the server on its own Tokio runtime, which in turn
// requires its own thread, so that we can wait for any tasks spawned
Expand Down

0 comments on commit 149a59c

Please sign in to comment.