Skip to content

Commit

Permalink
coord: Make storage usage interval configurable
Browse files Browse the repository at this point in the history
This commit makes the storage usage collection interval configurable,
so that it is easier to test.

Works towards resolving #3739
  • Loading branch information
jkosh44 committed Sep 12, 2022
1 parent fcaf3c0 commit 974a109
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 5 deletions.
9 changes: 4 additions & 5 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ mod timestamp_selection;
pub const DEFAULT_LOGICAL_COMPACTION_WINDOW_MS: Option<mz_repr::Timestamp> =
Some(Timestamp::new(1_000));

/// The default interval at which to collect storage usage information.
pub const DEFAULT_STORAGE_USAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(3600);

/// A dummy availability zone to use when no availability zones are explicitly
/// specified.
pub const DUMMY_AVAILABILITY_ZONE: &str = "";
Expand Down Expand Up @@ -233,6 +230,7 @@ pub struct Config<S> {
pub default_storage_host_size: Option<String>,
pub connection_context: ConnectionContext,
pub storage_usage_client: StorageUsageClient,
pub storage_usage_collection_interval: Duration,
}

/// Soft-state metadata about a compute replica
Expand Down Expand Up @@ -353,7 +351,7 @@ pub struct Coordinator<S> {
/// dropped and for which no further updates should be recorded.
transient_replica_metadata: HashMap<ReplicaId, Option<ReplicaMetadata>>,

// Persist client for fetching storage metadata such as size metrics.
/// Persist client for fetching storage metadata such as size metrics.
storage_usage_client: StorageUsageClient,
/// The interval at which to collect storage usage information.
storage_usage_collection_interval: Duration,
Expand Down Expand Up @@ -821,6 +819,7 @@ pub async fn serve<S: Append + 'static>(
mut availability_zones,
connection_context,
storage_usage_client,
storage_usage_collection_interval,
}: Config<S>,
) -> Result<(Handle, Client), AdapterError> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -924,7 +923,7 @@ pub async fn serve<S: Append + 'static>(
connection_context,
transient_replica_metadata: HashMap::new(),
storage_usage_client,
storage_usage_collection_interval: DEFAULT_STORAGE_USAGE_COLLECTION_INTERVAL,
storage_usage_collection_interval,
};
let bootstrap =
handle.block_on(coord.bootstrap(builtin_migration_metadata, builtin_table_updates));
Expand Down
11 changes: 11 additions & 0 deletions src/environmentd/src/bin/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::process;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use anyhow::{bail, Context};
use clap::{ArgEnum, Parser};
Expand Down Expand Up @@ -356,6 +357,13 @@ pub struct Args {
requires = "storage-host-sizes"
)]
default_storage_host_size: Option<String>,
/// The interval in seconds at which to collect storage usage information.
#[clap(
long,
env = "STORAGE_USAGE_COLLECTION_INTERVAL_SEC",
default_value = "3600"
)]
storage_usage_collection_interval_sec: u64,

// === Tracing options. ===
#[clap(flatten)]
Expand Down Expand Up @@ -685,6 +693,9 @@ max log level: {max_log_level}",
secrets_reader,
),
otel_enable_callback,
storage_usage_collection_interval: Duration::from_secs(
args.storage_usage_collection_interval_sec,
),
}))?;

println!(
Expand Down
4 changes: 4 additions & 0 deletions src/environmentd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context};
use futures::StreamExt;
Expand Down Expand Up @@ -102,6 +103,8 @@ pub struct Config {
pub storage_host_sizes: StorageHostSizeMap,
/// Default storage host size, should be a key from storage_host_sizes.
pub default_storage_host_size: Option<String>,
/// The interval at which to collect storage usage information.
pub storage_usage_collection_interval: Duration,

// === Tracing options. ===
/// The metrics registry to use.
Expand Down Expand Up @@ -266,6 +269,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
availability_zones: config.availability_zones,
connection_context: config.connection_context,
storage_usage_client,
storage_usage_collection_interval: config.storage_usage_collection_interval,
})
.await?;

Expand Down
27 changes: 27 additions & 0 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,33 @@ fn test_alter_system_invalid_param() -> Result<(), Box<dyn Error>> {
Ok(())
}

#[test]
fn test_storage_usage_collection_interval() -> Result<(), Box<dyn Error>> {
mz_ore::test::init_logging();

let config =
util::Config::default().with_storage_usage_collection_interval(Duration::from_secs(1));
let server = util::start_server(config)?;
let mut client = server.connect(postgres::NoTls)?;

let initial_storage = client
.query_one("SELECT SUM(size_bytes)::int8 FROM mz_storage_usage;", &[])?
.get::<_, i64>(0);

client.batch_execute(&"CREATE TABLE t (a INT)")?;
client.batch_execute(&"INSERT INTO t VALUES (1), (2)")?;

std::thread::sleep(Duration::from_secs(3));

let updated_storage = client
.query_one("SELECT SUM(size_bytes)::int8 FROM mz_storage_usage;", &[])?
.get::<_, i64>(0);

assert!(updated_storage > initial_storage);

Ok(())
}

/// Group commit will block writes until the current time has advanced. This can make
/// performing inserts while using deterministic time difficult. This is a helper
/// method to perform writes and advance the current time.
Expand Down
12 changes: 12 additions & 0 deletions src/environmentd/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -56,6 +57,7 @@ pub struct Config {
workers: usize,
now: NowFn,
seed: u32,
storage_usage_collection_interval: Duration,
}

impl Default for Config {
Expand All @@ -68,6 +70,7 @@ impl Default for Config {
workers: 1,
now: SYSTEM_TIME.clone(),
seed: rand::random(),
storage_usage_collection_interval: Duration::from_secs(3600),
}
}
}
Expand Down Expand Up @@ -111,6 +114,14 @@ impl Config {
self.now = now;
self
}

pub fn with_storage_usage_collection_interval(
mut self,
storage_usage_collection_interval: Duration,
) -> Self {
self.storage_usage_collection_interval = storage_usage_collection_interval;
self
}
}

pub fn start_server(config: Config) -> Result<Server, anyhow::Error> {
Expand Down Expand Up @@ -202,6 +213,7 @@ pub fn start_server(config: Config) -> Result<Server, anyhow::Error> {
(Arc::clone(&orchestrator) as Arc<dyn SecretsController>).reader(),
),
otel_enable_callback: mz_ore::tracing::OpenTelemetryEnableCallback::none(),
storage_usage_collection_interval: config.storage_usage_collection_interval,
}))?;
let server = Server {
inner,
Expand Down
2 changes: 2 additions & 0 deletions src/sqllogictest/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use once_cell::sync::Lazy;
use postgres_protocol::types;
use regex::Regex;
use tempfile::TempDir;
use time::Duration;
use tokio::runtime::Runtime;
use tokio::sync::{oneshot, Mutex};
use tokio_postgres::types::FromSql;
Expand Down Expand Up @@ -677,6 +678,7 @@ impl Runner {
(Arc::clone(&orchestrator) as Arc<dyn SecretsController>).reader(),
),
otel_enable_callback: mz_ore::tracing::OpenTelemetryEnableCallback::none(),
storage_usage_collection_interval: Duration::from_secs(3600),
};
// We need to run the server on its own Tokio runtime, which in turn
// requires its own thread, so that we can wait for any tasks spawned
Expand Down

0 comments on commit 974a109

Please sign in to comment.