Skip to content

Commit

Permalink
fix for publisher shutdown review changes for awaitTermination method
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinav-qlogic committed Apr 29, 2019
1 parent 35ee213 commit 4bc01c7
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.BackgroundResourceAggregation;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorAsBackgroundResource;
import com.google.api.gax.core.ExecutorProvider;
Expand All @@ -40,6 +42,7 @@
import com.google.cloud.pubsub.v1.stub.PublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -86,15 +89,16 @@ public class Publisher {
private final BatchingSettings batchingSettings;

private final Lock messagesBatchLock;
private MessagesBatch messagesBatch;
private List<OutstandingPublish> messagesBatch;
private int batchedBytes;

private final AtomicBoolean activeAlarm;

private final PublisherStub publisherStub;

private final ScheduledExecutorService executor;
private final AtomicBoolean shutdown;
private final List<AutoCloseable> closeables;
private final BackgroundResource backgroundResources;
private final MessageWaiter messagesWaiter;
private ScheduledFuture<?> currentAlarmFuture;
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;
Expand All @@ -115,15 +119,14 @@ private Publisher(Builder builder) throws IOException {
this.batchingSettings = builder.batchingSettings;
this.messageTransform = builder.messageTransform;

messagesBatch = new MessagesBatch();
messagesBatch = new LinkedList<>();
messagesBatchLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
executor = builder.executorProvider.getExecutor();

List<BackgroundResource> backgroundResourceList = new ArrayList<>();
if (builder.executorProvider.shouldAutoClose()) {
closeables =
Collections.<AutoCloseable>singletonList(new ExecutorAsBackgroundResource(executor));
} else {
closeables = Collections.emptyList();
backgroundResourceList.add(new ExecutorAsBackgroundResource(executor));
}

// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
Expand Down Expand Up @@ -151,7 +154,8 @@ private Publisher(Builder builder) throws IOException {
.setRetrySettings(retrySettings)
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());

backgroundResourceList.add(publisherStub);
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
shutdown = new AtomicBoolean(false);
messagesWaiter = new MessageWaiter();
}
Expand Down Expand Up @@ -197,75 +201,96 @@ public ApiFuture<String> publish(PubsubMessage message) {
}

message = messageTransform.apply(message);
List<OutstandingBatch> batchesToSend = new ArrayList<>();
final OutstandingPublish outstandingPublish = new OutstandingPublish(message);
final int messageSize = message.getSerializedSize();
OutstandingBatch batchToSend = null;
SettableApiFuture<String> publishResult = SettableApiFuture.<String>create();
final OutstandingPublish outstandingPublish = new OutstandingPublish(publishResult, message);
messagesBatchLock.lock();
try {
// Check if the next message makes the current batch exceed the max batch byte size.
if (!messagesBatch.isEmpty()
&& hasBatchingBytes()
&& messagesBatch.getBatchedBytes() + outstandingPublish.messageSize
>= getMaxBatchBytes()) {
batchesToSend.add(messagesBatch.popOutstandingBatch());
&& batchedBytes + messageSize >= getMaxBatchBytes()) {
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
messagesBatch = new LinkedList<>();
batchedBytes = 0;
}

messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);

// Border case: If the message to send is greater or equals to the max batch size then send it
// immediately.
// Alternatively if after adding the message we have reached the batch max messages then we
// have a batch to send.
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
|| messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
batchesToSend.add(messagesBatch.popOutstandingBatch());
// Border case if the message to send is greater or equals to the max batch size then can't
// be included in the current batch and instead sent immediately.
if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
batchedBytes += messageSize;
messagesBatch.add(outstandingPublish);

// If after adding the message we have reached the batch max messages then we have a batch
// to send.
if (messagesBatch.size() == getBatchingSettings().getElementCountThreshold()) {
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
messagesBatch = new LinkedList<>();
batchedBytes = 0;
}
}
// Setup the next duration based delivery alarm if there are messages batched.
setupAlarm();
if (!messagesBatch.isEmpty()) {
setupDurationBasedPublishAlarm();
} else if (currentAlarmFuture != null) {
logger.log(Level.FINER, "Cancelling alarm, no more messages");
if (activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
}
} finally {
messagesBatchLock.unlock();
}

messagesWaiter.incrementPendingMessages(1);

if (!batchesToSend.isEmpty()) {
for (final OutstandingBatch batch : batchesToSend) {
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
executor.execute(
new Runnable() {
@Override
public void run() {
publishOutstandingBatch(batch);
}
});
}
if (batchToSend != null) {
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
final OutstandingBatch finalBatchToSend = batchToSend;
executor.execute(
new Runnable() {
@Override
public void run() {
publishOutstandingBatch(finalBatchToSend);
}
});
}

// If the message is over the size limit, it was not added to the pending messages and it will
// be sent in its own batch immediately.
if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) {
logger.log(
Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send.");
executor.execute(
new Runnable() {
@Override
public void run() {
publishOutstandingBatch(
new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize));
}
});
}

return outstandingPublish.publishResult;
return publishResult;
}

private void setupAlarm() {
if (!messagesBatch.isEmpty()) {
if (!activeAlarm.getAndSet(true)) {
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs);
currentAlarmFuture =
executor.schedule(
new Runnable() {
@Override
public void run() {
logger.log(Level.FINER, "Sending messages based on schedule.");
activeAlarm.getAndSet(false);
publishAllOutstanding();
}
},
delayThresholdMs,
TimeUnit.MILLISECONDS);
}
} else if (currentAlarmFuture != null) {
logger.log(Level.FINER, "Cancelling alarm, no more messages");
if (activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
private void setupDurationBasedPublishAlarm() {
if (!activeAlarm.getAndSet(true)) {
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs);
currentAlarmFuture =
executor.schedule(
new Runnable() {
@Override
public void run() {
logger.log(Level.FINER, "Sending messages based on schedule.");
activeAlarm.getAndSet(false);
publishAllOutstanding();
}
},
delayThresholdMs,
TimeUnit.MILLISECONDS);
}
}

Expand All @@ -281,25 +306,24 @@ public void publishAllOutstanding() {
if (messagesBatch.isEmpty()) {
return;
}
batchToSend = messagesBatch.popOutstandingBatch();
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
messagesBatch = new LinkedList<>();
batchedBytes = 0;
} finally {
messagesBatchLock.unlock();
}
publishOutstandingBatch(batchToSend);
}

private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
PublishRequest.Builder publishRequest = PublishRequest.newBuilder();
publishRequest.setTopic(topicName);
for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) {
publishRequest.addMessages(outstandingPublish.message);
}

return publisherStub.publishCallable().futureCall(publishRequest.build());
}

private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
ApiFutureCallback<PublishResponse> futureCallback =
ApiFutures.addCallback(
publisherStub.publishCallable().futureCall(publishRequest.build()),
new ApiFutureCallback<PublishResponse>() {
@Override
public void onSuccess(PublishResponse result) {
Expand Down Expand Up @@ -338,9 +362,7 @@ public void onFailure(Throwable t) {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
}
}
};

ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
});
}

private static final class OutstandingBatch {
Expand All @@ -356,18 +378,21 @@ private static final class OutstandingBatch {
this.batchSizeBytes = batchSizeBytes;
}

int size() {
public int getAttempt() {
return attempt;
}

public int size() {
return outstandingPublishes.size();
}
}

private static final class OutstandingPublish {
final SettableApiFuture<String> publishResult;
final PubsubMessage message;
final int messageSize;
SettableApiFuture<String> publishResult;
PubsubMessage message;

OutstandingPublish(PubsubMessage message) {
this.publishResult = SettableApiFuture.create();
OutstandingPublish(SettableApiFuture<String> publishResult, PubsubMessage message) {
this.publishResult = publishResult;
this.message = message;
this.messageSize = message.getSerializedSize();
}
Expand Down Expand Up @@ -397,10 +422,7 @@ public void shutdown() throws Exception {
currentAlarmFuture.cancel(false);
}
publishAllOutstanding();
for (AutoCloseable closeable : closeables) {
closeable.close();
}
publisherStub.shutdown();
backgroundResources.shutdown();
}

/**
Expand All @@ -410,37 +432,7 @@ public void shutdown() throws Exception {
* <p>Call this method to make sure all resources are freed properly.
*/
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
final long startDuration = System.currentTimeMillis();
final long totalDurationMs = TimeUnit.MILLISECONDS.convert(duration, unit);
messagesWaiter.waitNoMessages();
long remainingDuration = getRemainingDuration(startDuration, totalDurationMs);
boolean isAwaited =
remainingDuration < totalDurationMs
? publisherStub.awaitTermination(remainingDuration, TimeUnit.MILLISECONDS)
: false;
if (isAwaited) {
for (AutoCloseable closeable : closeables) {
ExecutorAsBackgroundResource executorAsBackgroundResource =
(ExecutorAsBackgroundResource) closeable;
remainingDuration = getRemainingDuration(startDuration, totalDurationMs);
System.out.println(remainingDuration);
isAwaited =
remainingDuration < totalDurationMs
? executorAsBackgroundResource.awaitTermination(
getRemainingDuration(startDuration, totalDurationMs), TimeUnit.MILLISECONDS)
: false;
if (!isAwaited) {
return false;
}
}
} else {
return false;
}
return true;
}

private long getRemainingDuration(long startDuration, long totalDurationMs) {
return totalDurationMs - (System.currentTimeMillis() - startDuration);
return backgroundResources.awaitTermination(duration, unit);
}

private boolean hasBatchingBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,6 @@ public void testAwaitTermination() throws Exception {
ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
publisher.shutdown();
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
assertTrue(publishFuture1.isDone());
}

@Test
Expand Down

0 comments on commit 4bc01c7

Please sign in to comment.