Skip to content

Commit

Permalink
properly signal no matching minions on async
Browse files Browse the repository at this point in the history
  • Loading branch information
lucidd committed Mar 22, 2019
1 parent 38a846d commit 7a6df70
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 67 deletions.
138 changes: 72 additions & 66 deletions src/main/java/com/suse/salt/netapi/calls/LocalCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public Map<String, Object> getPayload() {
* @param batch parameter for enabling and configuring batching
* @return information about the scheduled job
*/
public CompletionStage<LocalAsyncResult<R>> callAsync(final SaltClient client, Target<?> target, AuthMethod auth,
public CompletionStage<Optional<LocalAsyncResult<R>>> callAsync(final SaltClient client, Target<?> target, AuthMethod auth,
Batch batch) {
return callAsync(client, target, auth, Optional.of(batch));
}
Expand All @@ -140,7 +140,7 @@ public CompletionStage<LocalAsyncResult<R>> callAsync(final SaltClient client, T
* @param batch parameter for enabling and configuring batching
* @return information about the scheduled job
*/
public CompletionStage<LocalAsyncResult<R>> callAsync(final SaltClient client, Target<?> target, AuthMethod auth,
public CompletionStage<Optional<LocalAsyncResult<R>>> callAsync(final SaltClient client, Target<?> target, AuthMethod auth,
Optional<Batch> batch) {

Map<String, Object> customArgs = new HashMap<>();
Expand All @@ -155,7 +155,11 @@ public CompletionStage<LocalAsyncResult<R>> callAsync(final SaltClient client, T
.thenApply(wrapper -> {
LocalAsyncResult<R> result = wrapper.getResult().get(0);
result.setType(getReturnType());
return result;
if (result.getJid() == null) {
return Optional.empty();
} else {
return Optional.of(result);
}
});
}

Expand All @@ -170,7 +174,7 @@ public CompletionStage<LocalAsyncResult<R>> callAsync(final SaltClient client, T
* @param auth authentication credentials to use
* @return information about the scheduled job
*/
public CompletionStage<LocalAsyncResult<R>> callAsync(final SaltClient client, Target<?> target, AuthMethod auth) {
public CompletionStage<Optional<LocalAsyncResult<R>>> callAsync(final SaltClient client, Target<?> target, AuthMethod auth) {
return callAsync(client, target, auth, Optional.empty());
}

Expand All @@ -186,7 +190,7 @@ public CompletionStage<LocalAsyncResult<R>> callAsync(final SaltClient client, T
* @param batch parameter for enabling and configuring batching
* @return a map from minion id to future of the result.
*/
public CompletionStage<Map<String, CompletionStage<Result<R>>>> callAsync(
public CompletionStage<Optional<Map<String, CompletionStage<Result<R>>>>> callAsync(
SaltClient client,
Target<?> target,
AuthMethod auth,
Expand All @@ -208,7 +212,7 @@ public CompletionStage<Map<String, CompletionStage<Result<R>>>> callAsync(
* @param batch parameter for enabling and configuring batching
* @return a map from minion id to future of the result.
*/
public CompletionStage<Map<String, CompletionStage<Result<R>>>> callAsync(
public CompletionStage<Optional<Map<String, CompletionStage<Result<R>>>>> callAsync(
SaltClient client,
Target<?> target,
AuthMethod auth,
Expand All @@ -233,82 +237,84 @@ public CompletionStage<Map<String, CompletionStage<Result<R>>>> callAsync(
* @param cancel future to cancel the action
* @return a map from minion id to future of the result.
*/
public CompletionStage<Map<String, CompletionStage<Result<R>>>> callAsync(
Function<LocalCall<R>, CompletionStage<LocalAsyncResult<R>>> localAsync,
public CompletionStage<Optional<Map<String, CompletionStage<Result<R>>>>> callAsync(
Function<LocalCall<R>, CompletionStage<Optional<LocalAsyncResult<R>>>> localAsync,
Function<RunnerCall<Map<String, R>>,
CompletionStage<RunnerAsyncResult<Map<String, R>>>> runnerAsync,
EventStream events,
CompletionStage<GenericError> cancel) {

return localAsync.apply(this).thenApply(lar -> {
return localAsync.apply(this).thenApply(optLar -> {
TypeToken<R> returnTypeToken = this.getReturnType();
Type result = ClientUtils.parameterizedType(null,
Result.class, returnTypeToken.getType());
@SuppressWarnings("unchecked")
TypeToken<Result<R>> typeToken = (TypeToken<Result<R>>) TypeToken.get(result);

Map<String, CompletableFuture<Result<R>>> futures =
lar.getMinions().stream().collect(Collectors.toMap(
mid -> mid,
mid -> new CompletableFuture<>()
)
);

EventListener listener = new EventListener() {
@Override
public void notify(Event event) {
Optional<JobReturnEvent> jobReturnEvent = JobReturnEvent.parse(event);
if (jobReturnEvent.isPresent()) {
jobReturnEvent.ifPresent(e ->
onJobReturn(lar.getJid(), e, typeToken, futures)
);
} else {
RunnerReturnEvent.parse(event).ifPresent(e ->
onRunnerReturn(lar.getJid(), e, typeToken, futures)
return optLar.map(lar -> {
Map<String, CompletableFuture<Result<R>>> futures =
lar.getMinions().stream().collect(Collectors.toMap(
mid -> mid,
mid -> new CompletableFuture<>()
)
);

EventListener listener = new EventListener() {
@Override
public void notify(Event event) {
Optional<JobReturnEvent> jobReturnEvent = JobReturnEvent.parse(event);
if (jobReturnEvent.isPresent()) {
jobReturnEvent.ifPresent(e ->
onJobReturn(lar.getJid(), e, typeToken, futures)
);
} else {
RunnerReturnEvent.parse(event).ifPresent(e ->
onRunnerReturn(lar.getJid(), e, typeToken, futures)
);
}
}
}

@Override
public void eventStreamClosed(int code, String phrase) {
Result<R> error = Result.error(
new GenericError(
"EventStream closed with reason "
+ phrase));
futures.values().forEach(f -> f.complete(error));
}
};

CompletableFuture<Void> allResolves = CompletableFuture.allOf(
futures.entrySet().stream().map(entry ->
// mask errors since CompletableFuture.allOf resolves on first error
entry.getValue().<Integer>handle((v, e) -> 0)
).toArray(CompletableFuture[]::new)
);

allResolves.whenComplete((v, e) ->
events.removeEventListener(listener)
);

cancel.whenComplete((v, e) -> {
if (v != null) {
Result<R> error = Result.error(v);
futures.values().forEach(f -> f.complete(error));
} else if (e != null) {
futures.values().forEach(f -> f.completeExceptionally(e));
}
});

events.addEventListener(listener);
@Override
public void eventStreamClosed(int code, String phrase) {
Result<R> error = Result.error(
new GenericError(
"EventStream closed with reason "
+ phrase));
futures.values().forEach(f -> f.complete(error));
}
};

CompletableFuture<Void> allResolves = CompletableFuture.allOf(
futures.entrySet().stream().map(entry ->
// mask errors since CompletableFuture.allOf resolves on first error
entry.getValue().<Integer>handle((v, e) -> 0)
).toArray(CompletableFuture[]::new)
);

allResolves.whenComplete((v, e) ->
events.removeEventListener(listener)
);

cancel.whenComplete((v, e) -> {
if (v != null) {
Result<R> error = Result.error(v);
futures.values().forEach(f -> f.complete(error));
} else if (e != null) {
futures.values().forEach(f -> f.completeExceptionally(e));
}
});

// fire off lookup to get a result event for minions that already finished
// before we installed the listeners
runnerAsync.apply(Jobs.lookupJid(lar));
events.addEventListener(listener);

return futures.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
e -> (CompletionStage<Result<R>>) e.getValue()
));
// fire off lookup to get a result event for minions that already finished
// before we installed the listeners
runnerAsync.apply(Jobs.lookupJid(lar));

return futures.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
e -> (CompletionStage<Result<R>>) e.getValue()
));
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testAsyncCall() throws SaltException, InterruptedException {
Duration.of(7, ChronoUnit.SECONDS)
),
Optional.empty()
).toCompletableFuture().join();
).toCompletableFuture().join().get();

CountDownLatch countDownLatch = new CountDownLatch(5);
Map<String, Result<Boolean>> results = new HashMap<>();
Expand Down

0 comments on commit 7a6df70

Please sign in to comment.