Skip to content

Commit

Permalink
feat: have bigtable remove_channel write conditionally (#625)
Browse files Browse the repository at this point in the history
Closes: SYNC-4149

also: always apply a column family filter
  • Loading branch information
pjenvey authored Feb 20, 2024
1 parent 4bd4827 commit 60c6675
Showing 1 changed file with 44 additions and 26 deletions.
70 changes: 44 additions & 26 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableErr
Ok(vec![router_gc_policy_filter(), timestamp_filter])
}

/// Return a Column family regex RowFilter
fn family_filter(regex: String) -> data::RowFilter {
let mut filter = data::RowFilter::default();
filter.set_family_name_regex_filter(regex);
filter
}

/// Escape bytes for RE values
///
/// Based off google-re2/perl's quotemeta function
Expand All @@ -126,16 +133,17 @@ fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
/// Return a chain of RowFilters limiting to a match of the specified
/// `version`'s column value
fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
let mut family_filter = data::RowFilter::default();
family_filter.set_family_name_regex_filter(format!("^{ROUTER_FAMILY}$"));

let mut cq_filter = data::RowFilter::default();
cq_filter.set_column_qualifier_regex_filter("^version$".as_bytes().to_vec());

let mut value_filter = data::RowFilter::default();
value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));

vec![family_filter, cq_filter, value_filter]
vec![
family_filter(format!("^{ROUTER_FAMILY}$")),
cq_filter,
value_filter,
]
}

/// Return a newly generated `version` column `Cell`
Expand Down Expand Up @@ -324,12 +332,13 @@ impl BigTableClientImpl {
Ok(())
}

/// Read a given row from the row key.
async fn read_row(&self, row_key: &str) -> Result<Option<row::Row>, error::BigTableError> {
debug!("🉑 Row key: {row_key}");
let req = self.read_row_request(row_key);
/// Read one row for the [ReadRowsRequest] (assuming only a single row was requested).
async fn read_row(
&self,
req: bigtable::ReadRowsRequest,
) -> Result<Option<row::Row>, error::BigTableError> {
let mut rows = self.read_rows(req).await?;
Ok(rows.remove(row_key))
Ok(rows.pop_first().map(|(_, v)| v))
}

/// Take a big table ReadRowsRequest (containing the keys and filters) and return a set of row data indexed by row key.
Expand Down Expand Up @@ -717,7 +726,9 @@ impl DbClient for BigTableClientImpl {

async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
let row_key = uaid.as_simple().to_string();
let Some(mut row) = self.read_row(&row_key).await? else {
let mut req = self.read_row_request(&row_key);
req.set_filter(family_filter(format!("^{ROUTER_FAMILY}$")));
let Some(mut row) = self.read_row(req).await? else {
return Ok(None);
};

Expand Down Expand Up @@ -819,21 +830,16 @@ impl DbClient for BigTableClientImpl {
let row_key = uaid.simple().to_string();
let mut req = self.read_row_request(&row_key);

let mut family_filter = data::RowFilter::default();
family_filter.set_family_name_regex_filter(format!("^{ROUTER_FAMILY}$"));

let mut cq_filter = data::RowFilter::default();
cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());

req.set_filter(filter_chain(vec![
router_gc_policy_filter(),
family_filter,
family_filter(format!("^{ROUTER_FAMILY}$")),
cq_filter,
]));

let mut rows = self.read_rows(req).await?;
let mut result = HashSet::new();
if let Some(record) = rows.remove(&row_key) {
if let Some(record) = self.read_row(req).await? {
for mut cells in record.cells.into_values() {
let Some(cell) = cells.pop() else {
continue;
Expand All @@ -853,7 +859,7 @@ impl DbClient for BigTableClientImpl {
/// Delete the channel. Does not delete its associated pending messages.
async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
let row_key = uaid.simple().to_string();
let mut req = self.mutate_row_request(&row_key);
let mut req = self.check_and_mutate_row_request(&row_key);

// Delete the column representing the channel_id
let column = format!("chid:{}", channel_id.as_hyphenated());
Expand All @@ -865,11 +871,14 @@ impl DbClient for BigTableClientImpl {
row.cells
.insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
mutations.extend(self.get_mutations(row.cells)?);
req.set_mutations(mutations);

self.mutate_row(req).await?;
// XXX: this could be check_and_mutate to determine if the channel existed
Ok(true)
// check if the channel existed/was actually removed
let mut cq_filter = data::RowFilter::default();
cq_filter.set_column_qualifier_regex_filter(format!("^{column}$").into_bytes());
req.set_predicate_filter(filter_chain(vec![router_gc_policy_filter(), cq_filter]));
req.set_true_mutations(mutations);

Ok(self.check_and_mutate(req).await?)
}

/// Remove the node_id
Expand Down Expand Up @@ -1048,7 +1057,10 @@ impl DbClient for BigTableClientImpl {
rows.set_row_ranges(row_ranges);
req.set_rows(rows);

req.set_filter(filter_chain(message_gc_policy_filter()?));
let mut filters = message_gc_policy_filter()?;
filters.push(family_filter(format!("^{MESSAGE_TOPIC_FAMILY}$")));

req.set_filter(filter_chain(filters));
if limit > 0 {
trace!("🉑 Setting limit to {limit}");
req.set_rows_limit(limit as i64);
Expand Down Expand Up @@ -1114,7 +1126,10 @@ impl DbClient for BigTableClientImpl {
// therefore run two filters, one to fetch the candidate IDs
// and another to fetch the content of the messages.
*/
req.set_filter(filter_chain(message_gc_policy_filter()?));
let mut filters = message_gc_policy_filter()?;
filters.push(family_filter(format!("^{MESSAGE_FAMILY}$")));

req.set_filter(filter_chain(filters));
if limit > 0 {
req.set_rows_limit(limit as i64);
}
Expand Down Expand Up @@ -1300,7 +1315,8 @@ mod tests {
assert_eq!(channels, new_channels);

// can we remove a channel?
client.remove_channel(&uaid, &chid_to_remove).await?;
assert!(client.remove_channel(&uaid, &chid_to_remove).await?);
assert!(!client.remove_channel(&uaid, &chid_to_remove).await?);
new_channels.remove(&chid_to_remove);
let channels = client.get_channels(&uaid).await?;
assert_eq!(channels, new_channels);
Expand Down Expand Up @@ -1387,6 +1403,7 @@ mod tests {
assert!(client.remove_channel(&uaid, &chid).await.is_ok());

// Now, can we do all that with topic messages
client.add_channel(&uaid, &topic_chid).await?;
let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned();
let timestamp = now();
let sort_key = now();
Expand Down Expand Up @@ -1465,7 +1482,8 @@ mod tests {
}],
);
client.write_row(row).await.unwrap();
let Some(row) = client.read_row(&row_key).await.unwrap() else {
let req = client.read_row_request(&row_key);
let Some(row) = client.read_row(req).await.unwrap() else {
panic!("Expected row");
};
assert_eq!(row.cells.len(), 1);
Expand Down

0 comments on commit 60c6675

Please sign in to comment.