From b54bd5b3354454a4bcfec7386969fb0059acc3c5 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 25 Oct 2019 13:05:49 +0200 Subject: [PATCH] Prefer drain-loop instead of recursion in RedisSubscription #1140 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RedisSubscription.onDataAvailable(…) and RedisSubscription.read(…) now use a drain-loop instead of recursive reads for element emission. Recursive emission is error prone if the response contains many response elements. onDataAvailable(…) calls read(…) if there is demand and if data is available. --- .../java/io/lettuce/core/RedisPublisher.java | 41 +++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/lettuce/core/RedisPublisher.java b/src/main/java/io/lettuce/core/RedisPublisher.java index 8e8f08863f..722bde5474 100644 --- a/src/main/java/io/lettuce/core/RedisPublisher.java +++ b/src/main/java/io/lettuce/core/RedisPublisher.java @@ -566,12 +566,16 @@ void request(RedisSubscription subscription, long n) { @Override void onDataAvailable(RedisSubscription subscription) { - do { + while (subscription.hasDemand()) { + + if (subscription.state() == NO_DEMAND && !subscription.changeState(NO_DEMAND, DEMAND)) { + return; + } if (!read(subscription)) { return; } - } while (subscription.hasDemand() && subscription.changeState(NO_DEMAND, this)); + } } @Override @@ -589,28 +593,41 @@ void request(RedisSubscription subscription, long n) { } } + /** + * @param subscription + * @return {@literal true} if the {@code read()} call was able to perform a read and whether this method should be + * called again to emit remaining data. + */ private boolean read(RedisSubscription subscription) { - if (subscription.changeState(this, READING)) { + // concurrency/entry guard + if (!subscription.changeState(this, READING)) { + return false; + } + + boolean hasDemand = subscription.readAndPublish(); - boolean hasDemand = subscription.readAndPublish(); + try { - if (subscription.allDataRead && subscription.data.isEmpty()) { - subscription.onAllDataRead(); - return true; + if (subscription.data.isEmpty()) { + + if (subscription.allDataRead) { + subscription.onAllDataRead(); + } + + return false; } + return true; + } finally { + + // concurrency/leave guard if (hasDemand) { subscription.changeState(READING, DEMAND); - subscription.checkOnDataAvailable(); } else { subscription.changeState(READING, NO_DEMAND); } - - return true; } - - return false; } },