Skip to content

Commit

Permalink
Subscription: allow generate subsequent events with the same tablet b…
Browse files Browse the repository at this point in the history
…atch to avoid large message & improve poll logic to avoid unnecessary nack (#14452) (#14476)
  • Loading branch information
VGalaxies authored Dec 18, 2024
1 parent 8814143 commit d0dbd73
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ private List<SubscriptionMessage> singlePoll(
LOGGER.warn("unexpected response type: {}", responseType);
return Optional.empty();
})
.apply(response, timer)
// TODO: reuse previous timer?
.apply(response, new PollTimer(System.currentTimeMillis(), timeoutMs))
.ifPresent(currentMessages::add);
} catch (final SubscriptionRuntimeNonCriticalException e) {
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -180,7 +181,7 @@ public List<SubscriptionMessage> poll(final Set<String> topicNames, final long t
LOGGER.info(
"SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s)",
this,
parsedTopicNames,
CollectionUtils.getLimitedString(parsedTopicNames, 32),
timeoutMs);
return messages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.util.CollectionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -178,7 +179,7 @@ public void run() {
LOGGER.info(
"SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s)",
this,
subscribedTopics.keySet(),
CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32),
autoPollTimeoutMs);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void close() {
final long startTime = System.currentTimeMillis();
outputPipeConnector.close();
LOGGER.info(
"Pipe: connector subtask {} was closed {} within {} ms",
"Pipe: connector subtask {} ({}) was closed within {} ms",
taskID,
outputPipeConnector,
System.currentTimeMillis() - startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -88,18 +91,92 @@ public boolean isEmpty() {

public List<SubscriptionEvent> poll(
final String consumerId, final Set<String> topicNames, final long maxBytes) {
final List<SubscriptionEvent> events = new ArrayList<>();
final List<SubscriptionEvent> eventsToPoll = new ArrayList<>();
final Set<String> candidateTopicNames = prepareCandidateTopicNames(topicNames, eventsToPoll);

// Sort topic names based on the current subscription states (number of events received per
// topic)
final List<String> sortedTopicNames = new ArrayList<>(candidateTopicNames);
sortedTopicNames.sort(
Comparator.comparingLong(
topicName ->
Objects.requireNonNull(consumerIdToSubscriptionStates.get(consumerId))
.getStates(topicName)));

final List<SubscriptionEvent> eventsToNack = new ArrayList<>();
long totalSize = 0;
final Map<String, Long> topicNameToIncrements = new HashMap<>();

// Iterate over each sorted topic name and poll the corresponding events
for (final String topicName : sortedTopicNames) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
// Recheck
if (Objects.isNull(prefetchingQueue) || prefetchingQueue.isClosed()) {
continue;
}

// Poll the event from the prefetching queue
final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
if (Objects.isNull(event)) {
continue;
}

// Try to get the current size of the event
final long currentSize;
try {
currentSize = event.getCurrentResponseSize();
} catch (final IOException e) {
// If there is an error getting the event's size, nack the event
eventsToNack.add(event);
continue;
}

// Add the event to the poll list
eventsToPoll.add(event);

// Increment the event count for the topic
topicNameToIncrements.merge(event.getCommitContext().getTopicName(), 1L, Long::sum);

// Update the total size
totalSize += currentSize;

// If adding this event exceeds the maxBytes (pessimistic estimation), break the loop
if (totalSize + currentSize > maxBytes) {
break;
}
}

// Update the subscription states with the increments for the topics processed
Objects.requireNonNull(consumerIdToSubscriptionStates.get(consumerId))
.updateStates(topicNameToIncrements);

// Commit the nack events for the consumer
commit(
consumerId,
eventsToNack.stream().map(SubscriptionEvent::getCommitContext).collect(Collectors.toList()),
true);

// Return the list of events that are to be polled
return eventsToPoll;
}

private Set<String> prepareCandidateTopicNames(
final Set<String> topicNames,
final List<SubscriptionEvent> eventsToPoll /* output parameter */) {
final Set<String> candidateTopicNames = new HashSet<>();
for (final String topicName : topicNames) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
// If there is no prefetching queue for the topic, check if it's completed
if (Objects.isNull(prefetchingQueue)) {
// check if completed
if (completedTopicNames.containsKey(topicName)) {
LOGGER.info(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is completed, return termination response to client",
topicName,
brokerId);
events.add(
// Add a termination event for the completed topic
eventsToPoll.add(
new SubscriptionEvent(
SubscriptionPollResponseType.TERMINATION.getType(),
new TerminationPayload(),
Expand All @@ -118,29 +195,20 @@ public List<SubscriptionEvent> poll(
// 2.2. potential disorder of unbind and close prefetching queue...
continue;
}

// Check if the prefetching queue is closed
if (prefetchingQueue.isClosed()) {
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed",
topicName,
brokerId);
continue;
}
final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
if (Objects.nonNull(event)) {
events.add(event);
}

candidateTopicNames.add(topicName);
}

final Pair<List<SubscriptionEvent>, List<SubscriptionEvent>> eventsToPollWithEventsToNack =
Objects.requireNonNull(consumerIdToSubscriptionStates.get(consumerId))
.filter(events, maxBytes);
commit(
consumerId,
eventsToPollWithEventsToNack.right.stream()
.map(SubscriptionEvent::getCommitContext)
.collect(Collectors.toList()),
true);
return eventsToPollWithEventsToNack.left;
return candidateTopicNames;
}

public List<SubscriptionEvent> pollTsFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
Expand Down Expand Up @@ -343,8 +342,8 @@ private void tryPrefetch() {
continue;
}

if (event instanceof PipeTsFileInsertionEvent) {
if (onEvent((PipeTsFileInsertionEvent) event)) {
if (event instanceof TsFileInsertionEvent) {
if (onEvent((TsFileInsertionEvent) event)) {
return;
}
continue;
Expand Down Expand Up @@ -448,7 +447,7 @@ private boolean ackInternal(
this);
}

ev.ack();
ev.ack(this::enqueueEventToPrefetchingQueue);
ev.recordCommittedTimestamp(); // now committed
acked.set(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.subscription.broker;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;

import org.apache.tsfile.utils.Pair;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class SubscriptionStates {
* @return a Pair containing two lists: the first list contains the events to poll, and the second
* list contains the events to nack
*/
@TestOnly
public Pair<List<SubscriptionEvent>, List<SubscriptionEvent>> filter(
final List<SubscriptionEvent> events, final long maxBytes) {
final List<SubscriptionEvent> eventsToPoll = new ArrayList<>();
Expand Down Expand Up @@ -82,40 +84,40 @@ public Pair<List<SubscriptionEvent>, List<SubscriptionEvent>> filter(
}

// Update the subscription state with the increments calculated during filtering
update(topicNameToIncrements);
updateStates(topicNameToIncrements);
return new Pair<>(eventsToPoll, eventsToNack);
}

/**
* Sorts a list of SubscriptionEvents according to the event count in the subscription states.
* Events with fewer counts are prioritized.
*
* @param events the list of events to sort
*/
private void sort(final List<SubscriptionEvent> events) {
events.sort(
Comparator.comparingLong(event -> getStates(event.getCommitContext().getTopicName())));
}

/**
* Updates the subscription state by incrementing the event count for multiple topics.
*
* @param topicNameToIncrements a map where the key is the topic name and the value is the number
* of events to add to the count
*/
private void update(final Map<String, Long> topicNameToIncrements) {
public void updateStates(final Map<String, Long> topicNameToIncrements) {
for (final Entry<String, Long> entry : topicNameToIncrements.entrySet()) {
topicNameToEventCount.merge(entry.getKey(), entry.getValue(), Long::sum);
}
}

/**
* Sorts a list of SubscriptionEvents according to the event count in the subscription states.
* Events with fewer counts are prioritized.
*
* @param events the list of events to sort
*/
private void sort(final List<SubscriptionEvent> events) {
events.sort(
Comparator.comparingLong(event -> getCount(event.getCommitContext().getTopicName())));
}

/**
* Returns the number of events received for a specific topic.
*
* @param topicName the name of the topic
* @return the number of events received for the topic
*/
private long getCount(final String topicName) {
public long getStates(final String topicName) {
return topicNameToEventCount.getOrDefault(topicName, 0L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.subscription.event;

import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;

@FunctionalInterface
public interface SubscriptionCommitContextSupplier {

SubscriptionCommitContext get();
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;

Expand Down Expand Up @@ -87,9 +88,12 @@ public SubscriptionEvent(final SubscriptionPollResponse response) {
* SubscriptionEventTabletResponse}.
*/
public SubscriptionEvent(
final SubscriptionPipeTabletEventBatch batch, final SubscriptionCommitContext commitContext) {
final SubscriptionPipeTabletEventBatch batch,
final SubscriptionCommitContextSupplier commitContextSupplier) {
this.pipeEvents = new SubscriptionPipeTabletBatchEvents(batch);
this.response = new SubscriptionEventTabletResponse(batch, commitContext);
final SubscriptionCommitContext commitContext = commitContextSupplier.get();
this.response =
new SubscriptionEventTabletResponse(batch, commitContext, commitContextSupplier);
this.commitContext = commitContext;
}

Expand Down Expand Up @@ -138,7 +142,11 @@ public boolean isCommittable() {
return response.isCommittable();
}

public void ack() {
public void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
// ack response
response.ack(onCommittedHook);

// ack pipe events
pipeEvents.ack();
}

Expand Down
Loading

0 comments on commit d0dbd73

Please sign in to comment.