Skip to content

Commit

Permalink
Do not retry by default non-idempotent operations. Fixes #1444
Browse files Browse the repository at this point in the history
Also use the chance to improve javadoc on some *Options objects

Signed-off-by: Cristian Ferretti <jcferretti2020@gmail.com>
  • Loading branch information
jcferretti authored and lburgazzoli committed Feb 18, 2025
1 parent a240dc6 commit 05f92e0
Show file tree
Hide file tree
Showing 14 changed files with 329 additions and 33 deletions.
9 changes: 9 additions & 0 deletions jetcd-core/src/main/java/io/etcd/jetcd/KV.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.TxnOption;
import io.etcd.jetcd.support.CloseableClient;

/**
Expand Down Expand Up @@ -115,4 +116,12 @@ public interface KV extends CloseableClient {
* @return a Txn
*/
Txn txn();

/**
* creates a transaction.
*
* @param option TxnOption
* @return a Txn
*/
Txn txn(TxnOption option);
}
8 changes: 4 additions & 4 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public CompletableFuture<CampaignResponse> campaign(ByteSequence electionName, l
execute(
() -> stubWithLeader().campaign(request),
CampaignResponse::new,
Errors::isRetryable));
Errors::isRetryableForNoSafeRedoOp));
}

@Override
Expand All @@ -111,7 +111,7 @@ public CompletableFuture<ProclaimResponse> proclaim(LeaderKey leaderKey, ByteSeq
execute(
() -> stubWithLeader().proclaim(request),
ProclaimResponse::new,
Errors::isRetryable));
Errors::isRetryableForNoSafeRedoOp));
}

@Override
Expand All @@ -126,7 +126,7 @@ public CompletableFuture<LeaderResponse> leader(ByteSequence electionName) {
execute(
() -> stubWithLeader().leader(request),
response -> new LeaderResponse(response, namespace),
Errors::isRetryable));
Errors::isRetryableForNoSafeRedoOp));
}

@Override
Expand Down Expand Up @@ -162,7 +162,7 @@ public CompletableFuture<ResignResponse> resign(LeaderKey leaderKey) {
execute(
() -> stubWithLeader().resign(request),
ResignResponse::new,
Errors::isRetryable));
Errors::isRetryableForNoSafeRedoOp));
}

private <S> CompletableFuture<S> wrapConvertException(CompletableFuture<S> future) {
Expand Down
6 changes: 4 additions & 2 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/Impl.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ protected <S> CompletableFuture<S> completable(
*/
protected <S, T> CompletableFuture<T> execute(
Supplier<Future<S>> supplier,
Function<S, T> resultConvert) {
Function<S, T> resultConvert,
boolean autoRetry) {

return execute(supplier, resultConvert, Errors::isRetryable);
return execute(supplier, resultConvert,
autoRetry ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
}

/**
Expand Down
17 changes: 12 additions & 5 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/KVImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.TxnOption;
import io.etcd.jetcd.support.Errors;
import io.etcd.jetcd.support.Requests;

Expand Down Expand Up @@ -65,7 +66,7 @@ public CompletableFuture<PutResponse> put(ByteSequence key, ByteSequence value,
return execute(
() -> stub.put(Requests.mapPutRequest(key, value, option, namespace)),
response -> new PutResponse(response, namespace),
Errors::isRetryable);
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
}

@Override
Expand All @@ -81,7 +82,7 @@ public CompletableFuture<GetResponse> get(ByteSequence key, GetOption option) {
return execute(
() -> stub.range(Requests.mapRangeRequest(key, option, namespace)),
response -> new GetResponse(response, namespace),
Errors::isRetryable);
Errors::isRetryableForSafeRedoOp);
}

@Override
Expand All @@ -97,7 +98,7 @@ public CompletableFuture<DeleteResponse> delete(ByteSequence key, DeleteOption o
return execute(
() -> stub.deleteRange(Requests.mapDeleteRequest(key, option, namespace)),
response -> new DeleteResponse(response, namespace),
Errors::isRetryable);
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
}

@Override
Expand All @@ -116,15 +117,21 @@ public CompletableFuture<CompactResponse> compact(long rev, CompactOption option
return execute(
() -> stub.compact(request),
CompactResponse::new,
Errors::isRetryable);
Errors::isRetryableForSafeRedoOp);
}

@Override
public Txn txn() {
return txn(TxnOption.DEFAULT);
}

@Override
public Txn txn(TxnOption option) {
return TxnImpl.newTxn(
request -> execute(
() -> stub.txn(request),
response -> new TxnResponse(response, namespace), Errors::isRetryable),
response -> new TxnResponse(response, namespace),
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp),
namespace);
}
}
12 changes: 8 additions & 4 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public CompletableFuture<LeaseGrantResponse> grant(long ttl) {
LeaseGrantRequest.newBuilder()
.setTTL(ttl)
.build()),
LeaseGrantResponse::new);
LeaseGrantResponse::new,
true);
}

@Override
Expand All @@ -94,7 +95,8 @@ public CompletableFuture<LeaseGrantResponse> grant(long ttl, long timeout, TimeU
LeaseGrantRequest.newBuilder()
.setTTL(ttl)
.build()),
LeaseGrantResponse::new);
LeaseGrantResponse::new,
true);
}

@Override
Expand All @@ -104,7 +106,8 @@ public CompletableFuture<LeaseRevokeResponse> revoke(long leaseId) {
LeaseRevokeRequest.newBuilder()
.setID(leaseId)
.build()),
LeaseRevokeResponse::new);
LeaseRevokeResponse::new,
true);
}

@Override
Expand All @@ -118,7 +121,8 @@ public CompletableFuture<LeaseTimeToLiveResponse> timeToLive(long leaseId, Lease

return execute(
() -> this.stub.leaseTimeToLive(leaseTimeToLiveRequest),
LeaseTimeToLiveResponse::new);
LeaseTimeToLiveResponse::new,
true);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public CompletableFuture<LockResponse> lock(ByteSequence name, long leaseId) {
return execute(
() -> stubWithLeader().lock(request),
response -> new LockResponse(response, namespace),
Errors::isRetryable);
Errors::isRetryableForSafeRedoOp);
}

@Override
Expand All @@ -83,6 +83,6 @@ public CompletableFuture<UnlockResponse> unlock(ByteSequence lockKey) {
return execute(
() -> stubWithLeader().unlock(request),
UnlockResponse::new,
Errors::isRetryable);
Errors::isRetryableForSafeRedoOp);
}
}
4 changes: 4 additions & 0 deletions jetcd-core/src/main/java/io/etcd/jetcd/kv/GetResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public boolean isMore() {

/**
* Returns the number of keys within the range when requested.
* Note this value is never affected by filtering options (limit, min or max created or modified revisions)
* Count is the count for keys on the range part of a request.
* Filters for limit and created or modified revision ranges restrict the
* returned KVs, but not the count.
*
* @return count.
*/
Expand Down
41 changes: 39 additions & 2 deletions jetcd-core/src/main/java/io/etcd/jetcd/options/DeleteOption.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ public final class DeleteOption {
private final ByteSequence endKey;
private final boolean prevKV;
private final boolean prefix;
private final boolean autoRetry;

private DeleteOption(ByteSequence endKey, boolean prevKV, boolean prefix) {
private DeleteOption(ByteSequence endKey, boolean prevKV, boolean prefix, final boolean autoRetry) {
this.endKey = endKey;
this.prevKV = prevKV;
this.prefix = prefix;
this.autoRetry = autoRetry;
}

public Optional<ByteSequence> getEndKey() {
Expand All @@ -49,10 +51,25 @@ public boolean isPrevKV() {
return prevKV;
}

/**
* Whether to treat this deletion as deletion by prefix
*
* @return true if deletion by prefix.
*/
public boolean isPrefix() {
return prefix;
}

/**
* Whether to treat a delete operation as idempotent from the point of view of automated retries.
* Note under failure scenarios this may mean a single delete is attempted more than once.
*
* @return true if automated retries should happen.
*/
public boolean isAutoRetry() {
return autoRetry;
}

/**
* Returns the builder.
*
Expand All @@ -65,6 +82,11 @@ public static Builder newBuilder() {
return builder();
}

/**
* Returns the builder.
*
* @return the builder
*/
public static Builder builder() {
return new Builder();
}
Expand All @@ -73,6 +95,7 @@ public static final class Builder {
private ByteSequence endKey;
private boolean prevKV = false;
private boolean prefix = false;
private boolean autoRetry = false;

private Builder() {
}
Expand Down Expand Up @@ -144,8 +167,22 @@ public Builder withPrevKV(boolean prevKV) {
return this;
}

/**
* When autoRetry is set, the delete operation is treated as idempotent from the point of view of automated retries.
* Note under some failure scenarios true may make a delete operation be attempted more than once, where
* a first attempt succeeded but its result did not reach the client; by default (autoRetry=false),
* the client won't retry since it is not safe to assume on such a failure the operation did not happen.
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
*
* @return builder
*/
public Builder withAutoRetry() {
this.autoRetry = true;
return this;
}

public DeleteOption build() {
return new DeleteOption(endKey, prevKV, prefix);
return new DeleteOption(endKey, prevKV, prefix, autoRetry);
}

}
Expand Down
Loading

0 comments on commit 05f92e0

Please sign in to comment.