Skip to content

Commit

Permalink
Fix the sys_inbox table (#1577)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored May 31, 2024
1 parent 534bf34 commit 7d4ec58
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 24 deletions.
26 changes: 14 additions & 12 deletions crates/partition-store/src/inbox_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use crate::keys::{define_table_key, KeyKind, TableKey};
use crate::TableKind::Inbox;
use crate::{RocksDBTransaction, StorageAccess};
use crate::{PartitionStore, RocksDBTransaction, StorageAccess};
use crate::{TableScan, TableScanIterationDecision};
use bytestring::ByteString;
use futures::Stream;
Expand Down Expand Up @@ -113,8 +113,20 @@ impl<'a> InboxTable for RocksDBTransaction<'a> {
},
))
}
}

fn decode_inbox_key_value(k: &[u8], mut v: &[u8]) -> Result<SequenceNumberInboxEntry> {
let key = InboxKey::deserialize_from(&mut Cursor::new(k))?;
let sequence_number = *key.sequence_number_ok_or()?;

let inbox_entry = StorageCodec::decode::<InboxEntry, _>(&mut v)
.map_err(|error| StorageError::Generic(error.into()))?;

Ok(SequenceNumberInboxEntry::new(sequence_number, inbox_entry))
}

fn all_inboxes(
impl PartitionStore {
pub fn all_inboxes(
&mut self,
range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<SequenceNumberInboxEntry>> + Send {
Expand All @@ -128,16 +140,6 @@ impl<'a> InboxTable for RocksDBTransaction<'a> {
}
}

fn decode_inbox_key_value(k: &[u8], mut v: &[u8]) -> Result<SequenceNumberInboxEntry> {
let key = InboxKey::deserialize_from(&mut Cursor::new(k))?;
let sequence_number = *key.sequence_number_ok_or()?;

let inbox_entry = StorageCodec::decode::<InboxEntry, _>(&mut v)
.map_err(|error| StorageError::Generic(error.into()))?;

Ok(SequenceNumberInboxEntry::new(sequence_number, inbox_entry))
}

#[cfg(test)]
mod tests {
use crate::inbox_table::InboxKey;
Expand Down
6 changes: 0 additions & 6 deletions crates/storage-api/src/inbox_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use restate_types::identifiers::{InvocationId, PartitionKey, ServiceId, WithPart
use restate_types::message::MessageIndex;
use restate_types::state_mut::ExternalStateMutation;
use std::future::Future;
use std::ops::RangeInclusive;

#[derive(Debug, Clone, PartialEq)]
pub enum InboxEntry {
Expand Down Expand Up @@ -106,9 +105,4 @@ pub trait InboxTable {
&mut self,
service_id: &ServiceId,
) -> impl Stream<Item = Result<SequenceNumberInboxEntry>> + Send;

fn all_inboxes(
&mut self,
range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<SequenceNumberInboxEntry>> + Send;
}
3 changes: 3 additions & 0 deletions crates/storage-query-datafusion/src/inbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ mod schema;
mod table;

pub(crate) use table::register_self;

#[cfg(test)]
mod tests;
5 changes: 2 additions & 3 deletions crates/storage-query-datafusion/src/inbox/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ pub(crate) fn append_inbox_row(
output: &mut String,
inbox_entry: SequenceNumberInboxEntry,
) {
let mut row = builder.row();

let SequenceNumberInboxEntry {
inbox_sequence_number,
inbox_entry,
} = inbox_entry;

if let InboxEntry::Invocation(service_id, invocation_id) = inbox_entry {
let mut row = builder.row();
row.partition_key(invocation_id.partition_key());

row.service_name(&service_id.service_name);

row.service_key(&service_id.key);

if row.is_id_defined() {
Expand Down
5 changes: 2 additions & 3 deletions crates/storage-query-datafusion/src/inbox/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures::{Stream, StreamExt};
use tokio::sync::mpsc::Sender;

use restate_partition_store::{PartitionStore, PartitionStoreManager};
use restate_storage_api::inbox_table::{InboxTable, SequenceNumberInboxEntry};
use restate_storage_api::inbox_table::SequenceNumberInboxEntry;
use restate_storage_api::StorageError;
use restate_types::identifiers::PartitionKey;

Expand Down Expand Up @@ -54,8 +54,7 @@ impl ScanLocalPartition for InboxScanner {
range: RangeInclusive<PartitionKey>,
projection: SchemaRef,
) {
let mut transaction = partition_store.transaction();
let rows = transaction.all_inboxes(range);
let rows = partition_store.all_inboxes(range);
for_each_state(projection, tx, rows).await;
}
}
Expand Down
89 changes: 89 additions & 0 deletions crates/storage-query-datafusion/src/inbox/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::mocks::*;
use crate::row;
use datafusion::arrow::array::{LargeStringArray, UInt64Array};
use datafusion::arrow::record_batch::RecordBatch;
use futures::StreamExt;
use googletest::all;
use googletest::prelude::{assert_that, eq};
use restate_core::TaskCenterBuilder;
use restate_storage_api::inbox_table::{InboxEntry, InboxTable, SequenceNumberInboxEntry};
use restate_storage_api::Transaction;
use restate_types::identifiers::{InvocationId, InvocationUuid, ServiceId, WithPartitionKey};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_inbox() {
let tc = TaskCenterBuilder::default()
.default_runtime_handle(tokio::runtime::Handle::current())
.build()
.expect("task_center builds");
let mut engine = tc
.run_in_scope("mock-query-engine", None, MockQueryEngine::create())
.await;

let mut tx = engine.partition_store().transaction();
let service_id = ServiceId::mock_random();
let invocation_id_1 =
InvocationId::from_parts(service_id.partition_key(), InvocationUuid::new());
tx.put_inbox_entry(
&service_id,
SequenceNumberInboxEntry {
inbox_sequence_number: 0,
inbox_entry: InboxEntry::Invocation(service_id.clone(), invocation_id_1),
},
)
.await;
let invocation_id_2 =
InvocationId::from_parts(service_id.partition_key(), InvocationUuid::new());
tx.put_inbox_entry(
&service_id,
SequenceNumberInboxEntry {
inbox_sequence_number: 1,
inbox_entry: InboxEntry::Invocation(service_id.clone(), invocation_id_2),
},
)
.await;
tx.commit().await.unwrap();

let records = engine
.execute("SELECT * FROM sys_inbox ORDER BY sequence_number")
.await
.unwrap()
.collect::<Vec<Result<RecordBatch, _>>>()
.await
.remove(0)
.unwrap();

assert_that!(
records,
all!(
row!(
0,
{
"id" => LargeStringArray: eq(invocation_id_1.to_string()),
"sequence_number" => UInt64Array: eq(0),
"service_name" => LargeStringArray: eq(service_id.service_name.to_string()),
"service_key" => LargeStringArray: eq(service_id.key.to_string()),
}
),
row!(
1,
{
"id" => LargeStringArray: eq(invocation_id_2.to_string()),
"sequence_number" => UInt64Array: eq(1),
"service_name" => LargeStringArray: eq(service_id.service_name.to_string()),
"service_key" => LargeStringArray: eq(service_id.key.to_string()),
}
)
)
);
}

0 comments on commit 7d4ec58

Please sign in to comment.