29
29
import io .etcd .jetcd .kv .PutResponse ;
30
30
import io .etcd .jetcd .kv .TxnResponse ;
31
31
import io .etcd .jetcd .op .TxnImpl ;
32
- import io .etcd .jetcd .options .CompactOption ;
33
- import io .etcd .jetcd .options .DeleteOption ;
34
- import io .etcd .jetcd .options .GetOption ;
35
- import io .etcd .jetcd .options .PutOption ;
32
+ import io .etcd .jetcd .options .*;
36
33
import io .etcd .jetcd .support .Errors ;
37
34
import io .etcd .jetcd .support .Requests ;
38
35
@@ -65,7 +62,7 @@ public CompletableFuture<PutResponse> put(ByteSequence key, ByteSequence value,
65
62
return execute (
66
63
() -> stub .put (Requests .mapPutRequest (key , value , option , namespace )),
67
64
response -> new PutResponse (response , namespace ),
68
- Errors ::isRetryable );
65
+ option . isAutoRetry () ? Errors ::isRetryableForSafeRedoOp : Errors :: isRetryableForNoSafeRedoOp );
69
66
}
70
67
71
68
@ Override
@@ -81,7 +78,7 @@ public CompletableFuture<GetResponse> get(ByteSequence key, GetOption option) {
81
78
return execute (
82
79
() -> stub .range (Requests .mapRangeRequest (key , option , namespace )),
83
80
response -> new GetResponse (response , namespace ),
84
- Errors ::isRetryable );
81
+ Errors ::isRetryableForSafeRedoOp );
85
82
}
86
83
87
84
@ Override
@@ -97,7 +94,7 @@ public CompletableFuture<DeleteResponse> delete(ByteSequence key, DeleteOption o
97
94
return execute (
98
95
() -> stub .deleteRange (Requests .mapDeleteRequest (key , option , namespace )),
99
96
response -> new DeleteResponse (response , namespace ),
100
- Errors ::isRetryable );
97
+ option . isAutoRetry () ? Errors ::isRetryableForSafeRedoOp : Errors :: isRetryableForNoSafeRedoOp );
101
98
}
102
99
103
100
@ Override
@@ -116,15 +113,21 @@ public CompletableFuture<CompactResponse> compact(long rev, CompactOption option
116
113
return execute (
117
114
() -> stub .compact (request ),
118
115
CompactResponse ::new ,
119
- Errors ::isRetryable );
116
+ Errors ::isRetryableForSafeRedoOp );
120
117
}
121
118
122
119
@ Override
123
120
public Txn txn () {
121
+ return txn (TxnOption .DEFAULT );
122
+ }
123
+
124
+ @ Override
125
+ public Txn txn (TxnOption option ) {
124
126
return TxnImpl .newTxn (
125
127
request -> execute (
126
128
() -> stub .txn (request ),
127
- response -> new TxnResponse (response , namespace ), Errors ::isRetryable ),
129
+ response -> new TxnResponse (response , namespace ),
130
+ option .isAutoRetry () ? Errors ::isRetryableForSafeRedoOp : Errors ::isRetryableForNoSafeRedoOp ),
128
131
namespace );
129
132
}
130
133
}
0 commit comments