Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Commit

Permalink
Merge pull request #41 from zserik/refactor
Browse files Browse the repository at this point in the history
Reduce code duplication + export TxnOp
  • Loading branch information
zarvd authored Feb 7, 2021
2 parents 9b72ee0 + d465199 commit 8854100
Show file tree
Hide file tree
Showing 20 changed files with 269 additions and 421 deletions.
9 changes: 5 additions & 4 deletions examples/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn grant_lease(client: &Client) -> Result<()> {

{
// watch key modification
let mut inbound = client.watch(KeyRange::key(key)).await;
let mut inbound = client.watch(KeyRange::key(key)).await.unwrap();
tokio::spawn(async move {
while let Some(resp) = inbound.next().await {
println!("watch response: {:?}", resp);
Expand Down Expand Up @@ -46,7 +46,7 @@ async fn keep_alive_lease(client: &Client) -> Result<()> {

{
// watch key modification
let mut inbound = client.watch(KeyRange::key(key)).await;
let mut inbound = client.watch(KeyRange::key(key)).await.unwrap();
tokio::spawn(async move {
while let Some(resp) = inbound.next().await {
println!("watch response: {:?}", resp);
Expand All @@ -64,7 +64,7 @@ async fn keep_alive_lease(client: &Client) -> Result<()> {

{
// watch keep alive event
let mut inbound = client.lease().keep_alive_responses().await;
let mut inbound = client.lease().keep_alive_responses().await.unwrap();
tokio::spawn(async move {
loop {
match inbound.next().await {
Expand Down Expand Up @@ -102,7 +102,8 @@ async fn keep_alive_lease(client: &Client) -> Result<()> {
client
.lease()
.keep_alive(LeaseKeepAliveRequest::new(lease_id))
.await;
.await
.unwrap();
}
}

Expand Down
11 changes: 6 additions & 5 deletions examples/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ async fn compose(cli: &mut Client) -> Result<()> {
revision = resp.take_header().unwrap().revision();

for v in 0..10 {
let _ = cli.kv().put(PutRequest::new(format!("key-{}", v), format!("{}", v))).await?;
let _ = cli
.kv()
.put(PutRequest::new(format!("key-{}", v), format!("{}", v)))
.await?;
}
}

Expand Down Expand Up @@ -53,7 +56,6 @@ async fn compose(cli: &mut Client) -> Result<()> {
Ok(())
}


async fn cas(cli: &mut Client) -> Result<()> {
reset(cli).await?;
println!("start CAS section =====>");
Expand Down Expand Up @@ -86,7 +88,7 @@ async fn main() -> Result<()> {
auth: None,
tls: None,
})
.await?;
.await?;

// Compare-and-Set
if let Err(e) = cas(&mut client).await {
Expand All @@ -98,6 +100,5 @@ async fn main() -> Result<()> {
println!("failed to execute CAS: {:?}", e);
}


Ok(())
}
}
2 changes: 1 addition & 1 deletion examples/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async fn watch(client: &Client) -> Result<()> {
println!("watch key value modification");

{
let mut inbound = client.watch(KeyRange::key("foo")).await;
let mut inbound = client.watch(KeyRange::key("foo")).await.unwrap();

// print out all received watch responses
tokio::spawn(async move {
Expand Down
28 changes: 6 additions & 22 deletions src/auth/authenticate.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::proto::etcdserverpb;
use crate::ResponseHeader;

/// Request for authenticating.
pub struct AuthenticateRequest {
proto: etcdserverpb::AuthenticateRequest,
}
pbwrap_request!(
/// Request for authenticating.
AuthenticateRequest
);

impl AuthenticateRequest {
pub fn new<N, P>(name: N, password: P) -> Self
Expand All @@ -20,23 +20,13 @@ impl AuthenticateRequest {
}
}

impl Into<etcdserverpb::AuthenticateRequest> for AuthenticateRequest {
fn into(self) -> etcdserverpb::AuthenticateRequest {
self.proto
}
}

/// Response for authenticating.
#[derive(Debug)]
pub struct AuthenticateResponse {
proto: etcdserverpb::AuthenticateResponse,
}
pbwrap_response!(AuthenticateResponse);

impl AuthenticateResponse {
/// Takes the header out of response, leaving a `None` in its place.
pub fn take_header(&mut self) -> Option<ResponseHeader> {
match self.proto.header.take() {
Some(header) => Some(From::from(header)),
Some(header) => Some(header.into()),
_ => None,
}
}
Expand All @@ -46,9 +36,3 @@ impl AuthenticateResponse {
&self.proto.token
}
}

impl From<etcdserverpb::AuthenticateResponse> for AuthenticateResponse {
fn from(resp: etcdserverpb::AuthenticateResponse) -> Self {
Self { proto: resp }
}
}
2 changes: 1 addition & 1 deletion src/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ impl Auth {
.authenticate(tonic::Request::new(req.into()))
.await?;

Ok(From::from(resp.into_inner()))
Ok(resp.into_inner().into())
}
}
57 changes: 23 additions & 34 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,54 +74,39 @@ impl Client {
// If authentication provided, generates token before connecting.
let token = Self::generate_auth_token(&cfg).await?;

let auth_interceptor = if let Some(token) = token {
let auth_interceptor = token.map(|token| {
let token = MetadataValue::from_str(&token).unwrap();
Some(Interceptor::new(move |mut req: Request<()>| {
Interceptor::new(move |mut req: Request<()>| {
req.metadata_mut().insert("authorization", token.clone());

Ok(req)
}))
} else {
None
};
})
});

let channel = Self::get_channel(&cfg)?;

let inner = {
let (auth_client, kv_client, watch_client, lease_client) =
if let Some(auth_interceptor) = auth_interceptor {
(
Auth::new(AuthClient::with_interceptor(
channel.clone(),
auth_interceptor.clone(),
)),
Kv::new(KvClient::with_interceptor(
channel.clone(),
auth_interceptor.clone(),
)),
Watch::new(WatchClient::with_interceptor(
channel.clone(),
auth_interceptor.clone(),
)),
Lease::new(LeaseClient::with_interceptor(
channel.clone(),
auth_interceptor,
)),
AuthClient::with_interceptor(channel.clone(), auth_interceptor.clone()),
KvClient::with_interceptor(channel.clone(), auth_interceptor.clone()),
WatchClient::with_interceptor(channel.clone(), auth_interceptor.clone()),
LeaseClient::with_interceptor(channel.clone(), auth_interceptor),
)
} else {
(
Auth::new(AuthClient::new(channel.clone())),
Kv::new(KvClient::new(channel.clone())),
Watch::new(WatchClient::new(channel.clone())),
Lease::new(LeaseClient::new(channel.clone())),
AuthClient::new(channel.clone()),
KvClient::new(channel.clone()),
WatchClient::new(channel.clone()),
LeaseClient::new(channel.clone()),
)
};
Inner {
channel,
auth_client,
kv_client,
watch_client,
lease_client,
auth_client: Auth::new(auth_client),
kv_client: Kv::new(kv_client),
watch_client: Watch::new(watch_client),
lease_client: Lease::new(lease_client),
}
};

Expand All @@ -146,9 +131,13 @@ impl Client {
}

/// Perform a watch operation
pub async fn watch(&self, key_range: KeyRange) -> impl Stream<Item = Result<WatchResponse>> {
let mut client = self.inner.watch_client.clone();
client.watch(key_range).await
pub async fn watch(
&self,
key_range: KeyRange,
) -> Result<impl Stream<Item = Result<WatchResponse>>> {
let mut wc = self.watch_client();
wc.watch(key_range).await?;
Ok(wc.take_receiver().await)
}

/// Gets a lease client.
Expand Down
38 changes: 10 additions & 28 deletions src/kv/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use super::{KeyRange, KeyValue};
use crate::proto::etcdserverpb;
use crate::ResponseHeader;

/// Request for deleting key-value pairs.
pub struct DeleteRequest {
proto: etcdserverpb::DeleteRangeRequest,
}
pbwrap_request!(
/// Request for deleting key-value pairs.
DeleteRangeRequest => DeleteRequest
);

impl DeleteRequest {
/// Creates a new DeleteRequest for the specified key range.
Expand All @@ -25,25 +25,12 @@ impl DeleteRequest {
}
}

impl Into<etcdserverpb::DeleteRangeRequest> for DeleteRequest {
fn into(self) -> etcdserverpb::DeleteRangeRequest {
self.proto
}
}

/// Response for DeleteRequest.
#[derive(Debug)]
pub struct DeleteResponse {
proto: etcdserverpb::DeleteRangeResponse,
}
pbwrap_response!(DeleteRangeResponse => DeleteResponse);

impl DeleteResponse {
/// Takes the header out of response, leaving a `None` in its place.
pub fn take_header(&mut self) -> Option<ResponseHeader> {
match self.proto.header.take() {
Some(header) => Some(From::from(header)),
_ => None,
}
self.proto.header.take().map(From::from)
}

/// Returns the number of keys deleted by the delete range request.
Expand All @@ -53,19 +40,14 @@ impl DeleteResponse {

/// Takes the previous key-value pairs out of response, leaving an empty vector in its place.
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
let kvs = std::mem::replace(&mut self.proto.prev_kvs, vec![]);

kvs.into_iter().map(From::from).collect()
std::mem::take(&mut self.proto.prev_kvs)
.into_iter()
.map(From::from)
.collect()
}

/// Returns `true` if the previous key-value pairs is not empty, and `false` otherwise.
pub fn has_prev_kvs(&self) -> bool {
!self.proto.prev_kvs.is_empty()
}
}

impl From<etcdserverpb::DeleteRangeResponse> for DeleteResponse {
fn from(resp: etcdserverpb::DeleteRangeResponse) -> Self {
Self { proto: resp }
}
}
28 changes: 10 additions & 18 deletions src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod txn;
pub use delete::{DeleteRequest, DeleteResponse};
pub use put::{PutRequest, PutResponse};
pub use range::{RangeRequest, RangeResponse};
pub use txn::{TxnCmp, TxnOpResponse, TxnRequest, TxnResponse};
pub use txn::{TxnCmp, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};

use tonic::transport::Channel;

Expand All @@ -29,14 +29,14 @@ impl Kv {
pub async fn put(&mut self, req: PutRequest) -> Res<PutResponse> {
let resp = self.client.put(tonic::Request::new(req.into())).await?;

Ok(From::from(resp.into_inner()))
Ok(resp.into_inner().into())
}

/// Performs a key-value fetching operation.
pub async fn range(&mut self, req: RangeRequest) -> Res<RangeResponse> {
let resp = self.client.range(tonic::Request::new(req.into())).await?;

Ok(From::from(resp.into_inner()))
Ok(resp.into_inner().into())
}

/// Performs a key-value deleting operation.
Expand All @@ -46,14 +46,14 @@ impl Kv {
.delete_range(tonic::Request::new(req.into()))
.await?;

Ok(From::from(resp.into_inner()))
Ok(resp.into_inner().into())
}

/// Performs a transaction operation.
pub async fn txn(&mut self, req: TxnRequest) -> Res<TxnResponse> {
let resp = self.client.txn(tonic::Request::new(req.into())).await?;

Ok(From::from(resp.into_inner()))
Ok(resp.into_inner().into())
}
}

Expand All @@ -71,7 +71,7 @@ impl KeyValue {

/// Takes the key out of response, leaving an empty vector in its place.
pub fn take_key(&mut self) -> Vec<u8> {
std::mem::replace(&mut self.proto.key, vec![])
std::mem::take(&mut self.proto.key)
}

/// Converts the key from bytes `&[u8]` to `&str`.
Expand All @@ -87,7 +87,7 @@ impl KeyValue {

/// Takes the value out of response, leaving an empty vector in its place.
pub fn take_value(&mut self) -> Vec<u8> {
std::mem::replace(&mut self.proto.value, vec![])
std::mem::take(&mut self.proto.value)
}

/// Converts the value from bytes `&[u8]` to `&str`.
Expand Down Expand Up @@ -130,8 +130,8 @@ impl From<mvccpb::KeyValue> for KeyValue {

/// KeyRange is an abstraction for describing etcd key of various types.
pub struct KeyRange {
key: Vec<u8>,
range_end: Vec<u8>,
pub key: Vec<u8>,
pub range_end: Vec<u8>,
}

impl KeyRange {
Expand Down Expand Up @@ -184,20 +184,12 @@ impl KeyRange {
for i in (0..end.len()).rev() {
if end[i] < 0xff {
end[i] += 1;
end = end[0..=i].to_vec();
end.truncate(i + 1);
break;
}
}
end
};
Self { key, range_end }
}

pub fn take_key(&mut self) -> Vec<u8> {
std::mem::replace(&mut self.key, vec![])
}

pub fn take_range_end(&mut self) -> Vec<u8> {
std::mem::replace(&mut self.range_end, vec![])
}
}
Loading

0 comments on commit 8854100

Please sign in to comment.