diff --git a/ext/kv/proto/datapath.proto b/ext/kv/proto/datapath.proto index ea48f2385cc935..59793000b6384c 100644 --- a/ext/kv/proto/datapath.proto +++ b/ext/kv/proto/datapath.proto @@ -48,6 +48,7 @@ message KvMutation { bytes key = 1; KvValue value = 2; KvMutationType mutation_type = 3; + int64 expire_at_ms = 4; } message KvValue { diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs index 36c4d3af206cb9..38b233cc375e4d 100644 --- a/ext/kv/remote.rs +++ b/ext/kv/remote.rs @@ -230,11 +230,7 @@ impl Database for RemoteDb

{ }) }) .collect::>()?, - kv_mutations: write - .mutations - .into_iter() - .map(|x| encode_mutation(x.key, x.kind)) - .collect(), + kv_mutations: write.mutations.into_iter().map(encode_mutation).collect(), enqueues: vec![], }; @@ -333,32 +329,41 @@ fn encode_value(value: crate::Value) -> pb::KvValue { } } -fn encode_mutation(key: Vec, mutation: MutationKind) -> pb::KvMutation { - match mutation { +fn encode_mutation(m: crate::KvMutation) -> pb::KvMutation { + let key = m.key; + let expire_at_ms = + m.expire_at.and_then(|x| i64::try_from(x).ok()).unwrap_or(0); + + match m.kind { MutationKind::Set(x) => pb::KvMutation { key, value: Some(encode_value(x)), mutation_type: pb::KvMutationType::MSet as _, + expire_at_ms, }, MutationKind::Delete => pb::KvMutation { key, value: Some(encode_value(crate::Value::Bytes(vec![]))), mutation_type: pb::KvMutationType::MClear as _, + expire_at_ms, }, MutationKind::Max(x) => pb::KvMutation { key, value: Some(encode_value(x)), mutation_type: pb::KvMutationType::MMax as _, + expire_at_ms, }, MutationKind::Min(x) => pb::KvMutation { key, value: Some(encode_value(x)), mutation_type: pb::KvMutationType::MMin as _, + expire_at_ms, }, MutationKind::Sum(x) => pb::KvMutation { key, value: Some(encode_value(x)), mutation_type: pb::KvMutationType::MSum as _, + expire_at_ms, }, } }