Skip to content

Commit

Permalink
Add create function to kv store
Browse files Browse the repository at this point in the history
  • Loading branch information
praveenperera authored Feb 14, 2024
1 parent a75b88a commit 26813e8
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 1 deletion.
107 changes: 107 additions & 0 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,60 @@ impl Store {
})
}

/// Create will add the key/value pair if it does not exist. If it does exist, it will return an error.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
///
/// let status = kv.create("key", "value".into()).await;
/// assert!(status.is_ok());
///
/// let status = kv.create("key", "value".into()).await;
/// assert!(status.is_err());
///
/// # Ok(())
/// # }
/// ```
pub async fn create<T: AsRef<str>>(
&self,
key: T,
value: bytes::Bytes,
) -> Result<u64, CreateError> {
let update_err = match self.update(key.as_ref(), value.clone(), 0).await {
Ok(revision) => return Ok(revision),
Err(err) => err,
};

match self.entry(key.as_ref()).await? {
// Deleted or Purged key, we can create it again.
Some(Entry {
operation: Operation::Delete | Operation::Purge,
..
}) => {
let revision = self.put(key, value).await?;
Ok(revision)
}

// key already exists.
Some(_) => Err(CreateError::new(CreateErrorKind::AlreadyExists)),

// Something went wrong with the initial update, return that error
None => Err(update_err.into()),
}
}

/// Puts new key value pair into the bucket.
/// If key didn't exist, it is created. If it did exist, a new value with a new version is
/// added.
Expand Down Expand Up @@ -1180,6 +1234,59 @@ impl Display for StatusErrorKind {

pub type StatusError = Error<StatusErrorKind>;

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum CreateErrorKind {
AlreadyExists,
InvalidKey,
Publish,
Ack,
Other,
}

impl From<UpdateError> for CreateError {
fn from(error: UpdateError) -> Self {
match error.kind() {
UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
UpdateErrorKind::Other => Error::from(CreateErrorKind::Other),
}
}
}

impl From<PutError> for CreateError {
fn from(error: PutError) -> Self {
match error.kind() {
PutErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
PutErrorKind::Publish => Error::from(CreateErrorKind::Publish),
PutErrorKind::Ack => Error::from(CreateErrorKind::Ack),
}
}
}

impl From<EntryError> for CreateError {
fn from(error: EntryError) -> Self {
match error.kind() {
EntryErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
EntryErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
EntryErrorKind::Other => Error::from(CreateErrorKind::Other),
}
}
}

impl Display for CreateErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::AlreadyExists => write!(f, "key already exists"),
Self::Publish => write!(f, "failed to create key in store"),
Self::Ack => write!(f, "ack error"),
Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
Self::Other => write!(f, "other error"),
}
}
}

pub type CreateError = Error<CreateErrorKind>;

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum PutErrorKind {
InvalidKey,
Expand Down
41 changes: 40 additions & 1 deletion async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod kv {
use futures::{StreamExt, TryStreamExt};

#[tokio::test]
async fn create() {
async fn create_bucket() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
Expand All @@ -52,6 +52,45 @@ mod kv {
assert!(info.config.allow_direct);
}

#[tokio::test]
async fn create() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

let kv = context
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "test".into(),
description: "test_description".into(),
history: 10,
storage: StorageType::File,
num_replicas: 1,
..Default::default()
})
.await
.unwrap();

let payload: Bytes = "data".into();
let create = kv.create("key", payload.clone()).await;
assert!(create.is_ok());

let create = kv.create("key", payload.clone()).await;
assert!(create.is_err());

kv.delete("key").await.unwrap();
let create = kv.create("key", payload.clone()).await;
assert!(create.is_ok());

kv.purge("key").await.unwrap();
let create = kv.create("key", payload.clone()).await;
assert!(create.is_ok());
}

#[tokio::test]
async fn put() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit 26813e8

Please sign in to comment.