Skip to content

Commit

Permalink
Introduce distinction TimerKind::Invoke and TimerKind::CleanInvocatio…
Browse files Browse the repository at this point in the history
…nStatus
  • Loading branch information
tillrohrmann committed Apr 30, 2024
1 parent f881aa2 commit 75653c5
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 105 deletions.
25 changes: 17 additions & 8 deletions crates/partition-store/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,18 +495,22 @@ impl KeyCodec for TimerKind {
self.serialized_length()
);
match self {
TimerKind::Invocation { invocation_uuid } => {
TimerKind::Invoke { invocation_uuid } => {
target.put_u8(0);
invocation_uuid.encode(target);
}
TimerKind::Journal {
TimerKind::CompleteJournalEntry {
invocation_uuid,
journal_index,
} => {
target.put_u8(1);
invocation_uuid.encode(target);
journal_index.encode(target);
}
TimerKind::CleanInvocationStatus { invocation_uuid } => {
target.put_u8(2);
invocation_uuid.encode(target);
}
}
}

Expand All @@ -520,16 +524,20 @@ impl KeyCodec for TimerKind {
Ok(match source.get_u8() {
0 => {
let invocation_uuid = InvocationUuid::decode(source)?;
TimerKind::Invocation { invocation_uuid }
TimerKind::Invoke { invocation_uuid }
}
1 => {
let invocation_uuid = InvocationUuid::decode(source)?;
let journal_index = u32::decode(source)?;
TimerKind::Journal {
TimerKind::CompleteJournalEntry {
invocation_uuid,
journal_index,
}
}
2 => {
let invocation_uuid = InvocationUuid::decode(source)?;
TimerKind::CleanInvocationStatus { invocation_uuid }
}
i => {
return Err(StorageError::Generic(anyhow!(
"Unknown discriminator for TimerKind: '{}'",
Expand All @@ -541,16 +549,17 @@ impl KeyCodec for TimerKind {

fn serialized_length(&self) -> usize {
1 + match self {
TimerKind::Invocation { invocation_uuid } => {
KeyCodec::serialized_length(invocation_uuid)
}
TimerKind::Journal {
TimerKind::Invoke { invocation_uuid } => KeyCodec::serialized_length(invocation_uuid),
TimerKind::CompleteJournalEntry {
invocation_uuid,
journal_index,
} => {
KeyCodec::serialized_length(invocation_uuid)
+ KeyCodec::serialized_length(journal_index)
}
TimerKind::CleanInvocationStatus { invocation_uuid } => {
KeyCodec::serialized_length(invocation_uuid)
}
}
}
}
Expand Down
154 changes: 112 additions & 42 deletions crates/partition-store/src/timer_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,31 +69,36 @@ fn exclusive_start_key_range(
) -> TableScan<TimersKey> {
if let Some(timer_key) = timer_key {
let next_timer_key = match timer_key.kind {
TimerKind::Invocation { invocation_uuid } => {
let invocation_uuid_value: u128 = invocation_uuid.into();
TimerKind::Invoke { invocation_uuid } => {
let incremented_invocation_uuid = increment_invocation_uuid(invocation_uuid);
TimerKey {
timestamp: timer_key.timestamp,
kind: TimerKind::Invocation {
invocation_uuid: InvocationUuid::from(
invocation_uuid_value
.checked_add(1)
.expect("invocation_uuid should be smaller than u128::MAX"),
),
kind: TimerKind::Invoke {
invocation_uuid: incremented_invocation_uuid,
},
}
}
TimerKind::Journal {
TimerKind::CompleteJournalEntry {
invocation_uuid,
journal_index,
} => TimerKey {
timestamp: timer_key.timestamp,
kind: TimerKind::Journal {
kind: TimerKind::CompleteJournalEntry {
invocation_uuid,
journal_index: journal_index
.checked_add(1)
.expect("journal index should be smaller than u64::MAX"),
},
},
TimerKind::CleanInvocationStatus { invocation_uuid } => {
let incremented_invocation_uuid = increment_invocation_uuid(invocation_uuid);
TimerKey {
timestamp: timer_key.timestamp,
kind: TimerKind::CleanInvocationStatus {
invocation_uuid: incremented_invocation_uuid,
},
}
}
};

let lower_bound = write_timer_key(partition_id, &next_timer_key);
Expand All @@ -108,6 +113,15 @@ fn exclusive_start_key_range(
}
}

fn increment_invocation_uuid(invocation_uuid: InvocationUuid) -> InvocationUuid {
let invocation_uuid_value: u128 = invocation_uuid.into();
InvocationUuid::from(
invocation_uuid_value
.checked_add(1)
.expect("invocation_uuid should be smaller than u128::MAX"),
)
}

fn add_timer<S: StorageAccess>(
storage: &mut S,
partition_id: PartitionId,
Expand Down Expand Up @@ -203,9 +217,9 @@ mod tests {
InvocationUuid::from_parts(1706027034946, 12345678900001);

#[test]
fn round_trip_journal_kind() {
fn round_trip_complete_journal_entry_kind() {
let key = TimerKey {
kind: TimerKind::Journal {
kind: TimerKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 1448,
},
Expand All @@ -219,9 +233,24 @@ mod tests {
}

#[test]
fn round_trip_invocation_kind() {
fn round_trip_invoke_kind() {
let key = TimerKey {
kind: TimerKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 87654321,
};

let key_bytes = write_timer_key(PartitionId::from(1337), &key).serialize();
let got = timer_key_from_key_slice(&key_bytes).expect("should not fail");

assert_eq!(got, key);
}

#[test]
fn round_trip_clean_invocation_status_kind() {
let key = TimerKey {
kind: TimerKind::Invocation {
kind: TimerKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 87654321,
Expand All @@ -236,11 +265,14 @@ mod tests {
#[test]
fn test_lexicographical_sorting_by_timestamp() {
let kinds = [
TimerKind::Journal {
TimerKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
},
TimerKind::Invocation {
TimerKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION,
},
TimerKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION,
},
];
Expand All @@ -255,117 +287,152 @@ mod tests {
kind: second_kind.clone(),
timestamp: 301,
};
assert_in_range(a, b);
assert_in_range(&a, &b);
}
}
}

#[test]
fn test_lexicographical_sorting_by_invocation_journal_kind() {
fn test_lexicographical_sorting_by_invocation_uuid_complete_journal_entry_kind() {
// Higher random part should be sorted correctly in bytes
let a = TimerKey {
kind: TimerKind::Journal {
kind: TimerKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
},
timestamp: 300,
};
let b = TimerKey {
kind: TimerKind::Journal {
kind: TimerKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION.increment_random(),
journal_index: 0,
},
timestamp: 300,
};
assert_in_range(a.clone(), b);
assert_in_range(&a, &b);

// Also ensure that higher timestamp is sorted correctly
let b = TimerKey {
kind: TimerKind::Journal {
kind: TimerKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(),
journal_index: 0,
},
timestamp: 300,
};
assert_in_range(a, b);
assert_in_range(&a, &b);
}

#[test]
fn test_lexicographical_sorting_by_invocation_invocation_kind() {
fn test_lexicographical_sorting_by_invocation_uuid_invoke_kind() {
// Higher random part should be sorted correctly in bytes
let a = TimerKey {
kind: TimerKind::Invocation {
kind: TimerKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 300,
};
let b = TimerKey {
kind: TimerKind::Invocation {
kind: TimerKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION.increment_random(),
},
timestamp: 300,
};
assert_in_range(a.clone(), b);
assert_in_range(&a, &b);

// Also ensure that higher timestamp is sorted correctly
let b = TimerKey {
kind: TimerKind::Invocation {
kind: TimerKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(),
},
timestamp: 300,
};
assert_in_range(a, b);
assert_in_range(&a, &b);
}

#[test]
fn test_lexicographical_sorting_by_invocation_uuid_clean_invoation_status_kind() {
// Higher random part should be sorted correctly in bytes
let a = TimerKey {
kind: TimerKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 300,
};
let b = TimerKey {
kind: TimerKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION.increment_random(),
},
timestamp: 300,
};
assert_in_range(&a, &b);

// Also ensure that higher timestamp is sorted correctly
let b = TimerKey {
kind: TimerKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(),
},
timestamp: 300,
};
assert_in_range(&a, &b);
}

#[test]
fn test_lexicographical_sorting_by_journal_index() {
let a = TimerKey {
kind: TimerKind::Journal {
kind: TimerKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
},
timestamp: 300,
};
let b = TimerKey {
kind: TimerKind::Journal {
kind: TimerKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 1,
},
timestamp: 300,
};
assert_in_range(a, b);
assert_in_range(&a, &b);
}

#[test]
fn test_lexicographical_sorting_journal_invocation_kind() {
fn test_lexicographical_sorting_timer_kind() {
let a = TimerKey {
kind: TimerKind::Invocation {
kind: TimerKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 300,
};

let b = TimerKey {
kind: TimerKind::Journal {
kind: TimerKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
},
timestamp: 300,
};

assert_in_range(a, b);
let c = TimerKey {
kind: TimerKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 300,
};

assert_in_range(&a, &b);
assert_in_range(&b, &c);
}

#[track_caller]
fn assert_in_range(key_a: TimerKey, key_b: TimerKey) {
fn assert_in_range(key_a: &TimerKey, key_b: &TimerKey) {
assert!(key_a < key_b);

let key_a_bytes = write_timer_key(PartitionId::from(1), &key_a).serialize();
let key_b_bytes = write_timer_key(PartitionId::from(1), &key_b).serialize();
let key_a_bytes = write_timer_key(PartitionId::from(1), key_a).serialize();
let key_b_bytes = write_timer_key(PartitionId::from(1), key_b).serialize();

assert!(less_than(&key_a_bytes, &key_b_bytes));

let (low, high) = match exclusive_start_key_range(PartitionId::from(1), Some(&key_a)) {
let (low, high) = match exclusive_start_key_range(PartitionId::from(1), Some(key_a)) {
TableScan::KeyRangeInclusiveInSinglePartition(p, low, high) if *p == 1 => (low, high),
_ => panic!(""),
};
Expand Down Expand Up @@ -411,13 +478,16 @@ mod tests {
match TimerKindDiscriminants::VARIANTS
[rand::thread_rng().gen_range(0..TimerKindDiscriminants::VARIANTS.len())]
{
TimerKindDiscriminants::Invocation => TimerKind::Invocation {
TimerKindDiscriminants::Invoke => TimerKind::Invoke {
invocation_uuid: InvocationUuid::new(),
},
TimerKindDiscriminants::Journal => TimerKind::Journal {
TimerKindDiscriminants::CompleteJournalEntry => TimerKind::CompleteJournalEntry {
invocation_uuid: InvocationUuid::new(),
journal_index: rand::thread_rng().gen_range(0..2 ^ 16),
},
TimerKindDiscriminants::CleanInvocationStatus => TimerKind::CleanInvocationStatus {
invocation_uuid: InvocationUuid::new(),
},
}
};

Expand Down
Loading

0 comments on commit 75653c5

Please sign in to comment.