Skip to content

Commit

Permalink
Merge pull request MaterializeInc#14745 from danhhz/persist_flake_mae…
Browse files Browse the repository at this point in the history
…lstrom

persist: fix bug where we'd incorrectly delete an initial rollup
  • Loading branch information
danhhz authored Sep 12, 2022
2 parents d54a05a + 51fc287 commit fcaf3c0
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions src/persist-client/src/internal/state_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,27 @@ impl StateVersions {
// We lost a CaS race and someone else initialized the shard,
// use the value included in the CaS expectation error.

let state = self.fetch_current_state(&shard_id, live_diffs).await;

// Clean up the rollup blob that we were trying to reference.
//
// SUBTLE: If we got an Indeterminate error in the CaS above,
// but it actually went through, then we'll "contend" with
// ourselves and get an expectation mismatch. Use the actual
// fetched state to determine if our rollup actually made it in
// and decide whether to delete based on that.
let (_, rollup_key) = initial_state.latest_rollup();
self.delete_rollup(&shard_id, rollup_key).await;
let should_delete_rollup = match state.as_ref() {
Ok(state) => !state.collections.rollups.values().any(|x| x == rollup_key),
// If the codecs don't match, then we definitely didn't
// write the state.
Err(CodecMismatch { .. }) => true,
};
if should_delete_rollup {
self.delete_rollup(&shard_id, rollup_key).await;
}

return self.fetch_current_state(&shard_id, live_diffs).await;
return state;
}
}
}
Expand Down

0 comments on commit fcaf3c0

Please sign in to comment.