Skip to content

Commit

Permalink
fix(webserver): ensure queues are not closed in nioEventLoop
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Jan 27, 2025
1 parent e83b9fe commit d89ee01
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1278,15 +1278,12 @@ public Flux<Event<Execution>> follow(

cancel.set(receive);
}, FluxSink.OverflowStrategy.BUFFER)
.doOnCancel(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
})
.doOnComplete(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
.doFinally(ignored -> {
Schedulers.boundedElastic().schedule(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand Down Expand Up @@ -152,15 +153,12 @@ public Flux<Event<LogEntry>> follow(

cancel.set(receive);
}, FluxSink.OverflowStrategy.BUFFER)
.doOnCancel(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
})
.doOnComplete(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
.doFinally(ignored -> {
Schedulers.boundedElastic().schedule(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
});
});
}

Expand Down

0 comments on commit d89ee01

Please sign in to comment.