Skip to content

Commit

Permalink
Cancel on close propagation
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
  • Loading branch information
danielkec committed Jun 12, 2020
1 parent a235101 commit 135a0ea
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ public Void call() throws IOException {
} finally {
if (closed) {
try {
closeByClient();
responseContext.close();
} catch (final Exception e) {
// if no exception remembered before, remember this one
Expand Down Expand Up @@ -333,6 +334,10 @@ public boolean isClosed() {
return closed;
}

protected void closeByClient(){

}

@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override
public boolean equals(final Object obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.io.Flushable;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -45,13 +45,13 @@
* <p>
* The reference should be obtained via injection into the resource method.
*
* @author Adam Lindenthal]
* @author Adam Lindenthal
*/
class JerseyEventSink extends ChunkedOutput<OutboundSseEvent>
implements SseEventSink, Flushable, JerseyFlowSubscriber<Object> {

private static final Logger LOGGER = Logger.getLogger(JerseyEventSink.class.getName());
private static final byte[] SSE_EVENT_DELIMITER = "\n".getBytes(Charset.forName("UTF-8"));
private static final byte[] SSE_EVENT_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8);
private Flow.Subscription subscription = null;
private final AtomicBoolean subscribed = new AtomicBoolean(false);
private volatile MediaType implicitMediaType = null;
Expand Down Expand Up @@ -129,14 +129,21 @@ public void onError(final Throwable throwable) {
if (throwable == null) {
throw new NullPointerException(LocalizationMessages.PARAM_NULL("throwable"));
}
LOGGER.log(Level.INFO, LocalizationMessages.EVENT_SOURCE_DEFAULT_ONERROR(), throwable);
try {
super.close();
} catch (IOException e) {
LOGGER.log(Level.INFO, LocalizationMessages.EVENT_SINK_CLOSE_FAILED(), e);
}
}

public void onComplete() {
try {
super.close();
} catch (Throwable e) {
LOGGER.log(Level.INFO, LocalizationMessages.EVENT_SINK_CLOSE_FAILED(), e);
}
}

@Override
public void close() {
try {
Expand All @@ -154,7 +161,9 @@ public CompletionStage<?> send(OutboundSseEvent event) {
this.write(event);
return CompletableFuture.completedFuture(null);
} catch (IOException e) {
return CompletableFuture.completedFuture(e);
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}

Expand All @@ -171,12 +180,9 @@ public void flush() throws IOException {
super.flushQueue();
}

public void onComplete() {
try {
super.close();
} catch (Throwable e) {
LOGGER.log(Level.INFO, LocalizationMessages.EVENT_SINK_CLOSE_FAILED(), e);
}
@Override
protected void closeByClient() {
cancelUpstream();
}

private void cancelUpstream() {
Expand Down

0 comments on commit 135a0ea

Please sign in to comment.