Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add integration test for dual mode #582

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ jobs:
export PATH=$PATH:$HOME/.cargo/bin
echo 'export PATH=$PATH:$HOME/.cargo/bin' >> $BASH_ENV
rustc --version
cargo build --features=emulator
cargo build --features=emulator --features=dual
- run:
name: Check formatting
command: |
Expand All @@ -153,7 +153,7 @@ jobs:
# when doing discovery, we found that the docker image `meminfo` and `cpuinfo` often report
# the machine level memory and CPU which are far higher than the memory allocated to the docker
# instance. This may be causing rust to be overly greedy triggering the VM to OOM the process.)
command: cargo test --features=emulator --jobs=2
command: cargo test --features=emulator --features=dual --jobs=2
- run:
name: Integration tests (Bigtable)
command: make integration-test
Expand All @@ -165,12 +165,15 @@ jobs:
command: make integration-test
environment:
TEST_RESULTS_DIR: workspace/test-results
# - run:
# name: Integration tests (Dual Bigtable/DynamoDB)
# command: make integration-test
# environment:
# DB_DSN: dual
# TEST_RESULTS_DIR: workspace/test-results
- run:
name: Integration tests (Dual Bigtable/DynamoDB)
command: make integration-test
environment:
RUST_LOG: autopush=debug,autopush_common=debug,autoendpoint=debug,autoconnect=debug,slog_mozlog_json=info,warn
# PYTEST_ARGS: -sv
DB_DSN: dual
DB_SETTINGS: tests/integration/dual_test.json
TEST_RESULTS_DIR: workspace/test-results
- store_test_results:
path: workspace/test-results
- save_cache:
Expand Down
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ integration-test-legacy:
integration-test:
$(POETRY) -V
$(POETRY) install --without dev,load --no-root
CONNECTION_BINARY=autoconnect \
CONNECTION_SETTINGS_PREFIX=autoconnect__ \
$(POETRY) run pytest $(INTEGRATION_TEST_FILE) \
--junit-xml=$(TEST_RESULTS_DIR)/integration_test_results.xml \
-v $(PYTEST_ARGS)
Expand Down
1 change: 1 addition & 0 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ impl BigTableClientImpl {

// Do the actual commit.
let bigtable = self.pool.get().await?;
debug!("🉑 writing row...");
let _resp = bigtable
.conn
.mutate_row_async(&req)
Expand Down
11 changes: 8 additions & 3 deletions autopush-common/src/db/bigtable/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ impl BigTablePool {
pub async fn get(
&self,
) -> Result<deadpool::managed::Object<BigtableClientManager>, error::BigTableError> {
self.pool
let obj = self
.pool
.get()
.await
.map_err(|e| error::BigTableError::Pool(e.to_string()))
.map_err(|e| error::BigTableError::Pool(e.to_string()))?;
debug!("🉑 Got db from pool");
Ok(obj)
}

/// Get the pools manager, because we would like to talk to them.
Expand Down Expand Up @@ -151,7 +154,9 @@ impl Manager for BigtableClientManager {
/// `BigtableClient` is the most atomic we can go.
async fn create(&self) -> Result<BigtableDb, DbError> {
debug!("🏊 Create a new pool entry.");
Ok(BigtableDb::new(self.get_channel()?))
let entry = BigtableDb::new(self.get_channel()?);
debug!("🏊 Bigtable connection acquired");
Ok(entry)
}

/// Recycle if the connection has outlived it's lifespan.
Expand Down
22 changes: 16 additions & 6 deletions autopush-common/src/db/dual/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct DualClientImpl {
pub struct DualDbSettings {
primary: DbSettings,
secondary: DbSettings,
#[serde(default)]
write_to_secondary: bool,
#[serde(default)]
median: Option<String>,
Expand All @@ -45,6 +46,7 @@ pub struct DualDbSettings {
impl DualClientImpl {
pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
// Not really sure we need the dsn here.
info!("Trying: {:?}", settings.db_settings);
let db_settings: DualDbSettings = from_str(&settings.db_settings).map_err(|e| {
DbError::General(format!("Could not parse DualDBSettings string {:?}", e))
})?;
Expand Down Expand Up @@ -79,6 +81,7 @@ impl DualClientImpl {
};
let primary = BigTableClientImpl::new(metrics.clone(), &db_settings.primary)?;
let secondary = DdbClientImpl::new(metrics.clone(), &db_settings.secondary)?;
debug!("⚖ Got primary and secondary");
Ok(Self {
primary,
secondary: secondary.clone(),
Expand All @@ -95,18 +98,20 @@ impl DualClientImpl {
/// allowance
/// Returns the dbclient to use and whether or not it's the primary database.
async fn allot<'a>(&'a self, uaid: &Uuid) -> DbResult<(Box<&'a dyn DbClient>, bool)> {
if let Some(median) = self.median {
let target: (Box<&'a dyn DbClient>, bool) = if let Some(median) = self.median {
if uaid.as_bytes()[0] <= median {
debug!("⚖ Routing user to Bigtable");
// These are migrations so the metrics should appear as
// `auto[endpoint|connect].migrate`.
Ok((Box::new(&self.primary), true))
(Box::new(&self.primary), true)
} else {
Ok((Box::new(&self.secondary), false))
(Box::new(&self.secondary), false)
}
} else {
Ok((Box::new(&self.primary), true))
}
(Box::new(&self.primary), true)
};
debug!("⚖ alloting to {}", target.0.name());
Ok(target)
}
}

Expand All @@ -117,7 +122,10 @@ impl DbClient for DualClientImpl {
if is_primary && self.write_to_secondary {
let _ = self.secondary.add_user(user).await?;
}
target.add_user(user).await
debug!("⚖ adding user to {}...", target.name());
let result = target.add_user(user).await?;
debug!("⚖ User added...");
Ok(result)
}

async fn update_user(&self, user: &User) -> DbResult<bool> {
Expand Down Expand Up @@ -163,7 +171,9 @@ impl DbClient for DualClientImpl {
}

async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
debug!("⚖ getting target");
let (target, _) = self.allot(uaid).await?;
debug!("⚖ Adding channel to {}", target.name());
target.add_channel(uaid, channel_id).await
}

Expand Down
1 change: 1 addition & 0 deletions autopush-common/src/db/dynamodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct DdbClientImpl {

impl DdbClientImpl {
pub fn new(metrics: Arc<StatsdClient>, db_settings: &DbSettings) -> DbResult<Self> {
debug!("🛢️DynamoDB Settings {:?}", db_settings);
let db_client = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") {
DynamoDbClient::new_with(
HttpClient::new().expect("TLS initialization error"),
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/dual_test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"primary": {
"db_settings": "{\"message_family\": \"message\", \"router_family\": \"router\", \"table_name\": \"projects/test/instances/test/tables/autopush\"}",
"dsn": "grpc://localhost:8086"
},
"secondary": {
"db_settings": "{\"message_table\": \"message_int_test\",\"router_table\": \"router_int_test\"}",
"dsn": "http://localhost:8000/"
}
}
12 changes: 9 additions & 3 deletions tests/integration/test_integration_all_rust.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@
MP_CONNECTION_PORT = 9052
MP_ROUTER_PORT = 9072

CONNECTION_BINARY = os.environ.get("CONNECTION_BINARY", "autopush_rs")
CONNECTION_SETTINGS_PREFIX = os.environ.get("CONNECTION_SETTINGS_PREFIX", "autopush__")
CONNECTION_BINARY = os.environ.get("CONNECTION_BINARY", "autoconnect")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we no longer use autopush_rs, switching to autoconnect for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you kill the CONNECTION_BINARY/SETTINGS override in the Makefile's integration-test target along with this?

CONNECTION_SETTINGS_PREFIX = os.environ.get("CONNECTION_SETTINGS_PREFIX", "autoconnect__")

CN_SERVER: subprocess.Popen | None = None
CN_MP_SERVER: subprocess.Popen | None = None
Expand Down Expand Up @@ -465,7 +465,7 @@ def _get_vapid(


def enqueue_output(out, queue):
for line in iter(out.readline, b""):
for line in iter(out.readline, ""):
queue.put(line)
out.close()

Expand Down Expand Up @@ -667,10 +667,14 @@ def setup_connection_server(connection_binary):
CONNECTION_CONFIG["port"] = parsed.port
CONNECTION_CONFIG["endpoint_scheme"] = parsed.scheme
write_config_to_env(CONNECTION_CONFIG, CONNECTION_SETTINGS_PREFIX)
log.debug("Using existing Connection server")
return
else:
write_config_to_env(CONNECTION_CONFIG, CONNECTION_SETTINGS_PREFIX)
cmd = [connection_binary]
run_args = os.getenv("RUN_ARGS")
if run_args is not None:
cmd.append(run_args)
log.debug(f"🐍🟢 Starting Connection server: {' '.join(cmd)}")
CN_SERVER = subprocess.Popen(
cmd,
Expand Down Expand Up @@ -701,6 +705,7 @@ def setup_megaphone_server(connection_binary):
parsed = urlparse(url)
MEGAPHONE_CONFIG["endpoint_port"] = parsed.port
write_config_to_env(MEGAPHONE_CONFIG, CONNECTION_SETTINGS_PREFIX)
log.debug("Using existing Megaphone server")
return
else:
write_config_to_env(MEGAPHONE_CONFIG, CONNECTION_SETTINGS_PREFIX)
Expand All @@ -725,6 +730,7 @@ def setup_endpoint_server():
ENDPOINT_CONFIG["hostname"] = parsed.hostname
ENDPOINT_CONFIG["port"] = parsed.port
ENDPOINT_CONFIG["endpoint_scheme"] = parsed.scheme
log.debug("Using existing Endpoint server")
return
else:
write_config_to_env(ENDPOINT_CONFIG, "autoend__")
Expand Down