Skip to content

Commit

Permalink
MINOR: Update Consumer and Producer JavaDocs for committing offsets (#…
Browse files Browse the repository at this point in the history
…18336)

The consumer/producer JavaDocs still contain instruction for naively
computing the offset to be committed.

This PR updates the JavaDocs with regard to the improvements of KIP-1094.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lmagrans@confluent.io>
  • Loading branch information
mjsax authored Jan 6, 2025
1 parent c4840f5 commit 3918f37
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@
* <h3>Offsets and Consumer Position</h3>
* Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of
* a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer
* which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There
* are actually two notions of position relevant to the user of the consumer:
* which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5.
* Note that offsets are not guaranteed to be consecutive (such as compacted topic or when records have been produced
* using transactions). For example, if the consumer did read a record with offset 4, but 5 is not an offset
* with a record, its position might advance to 6 (or higher) directly. Similarly, if the consumer's position is 5,
* but there is no record with offset 5, the consumer will return the record with the next higher offset.
* There are actually two notions of position relevant to the user of the consumer:
* <p>
* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given
* out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
Expand Down Expand Up @@ -266,8 +270,7 @@
* for (ConsumerRecord&lt;String, String&gt; record : partitionRecords) {
* System.out.println(record.offset() + &quot;: &quot; + record.value());
* }
* long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
* consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
* consumer.commitSync(Collections.singletonMap(partition, records.nextOffsets().get(partition)));
* }
* }
* } finally {
Expand All @@ -276,7 +279,10 @@
* </pre>
*
* <b>Note: The committed offset should always be the offset of the next message that your application will read.</b>
* Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should add one to the offset of the last message processed.
* Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should use {@code nextRecordToBeProcessed.offset()}
* or if {@link ConsumerRecords} is exhausted already {@link ConsumerRecords#nextOffsets()} instead.
* You should also add the leader epoch as commit metadata, which can be obtained from
* {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets()}.
*
* <h4><a name="manualassignment">Manual Partition Assignment</a></h4>
*
Expand Down Expand Up @@ -984,7 +990,10 @@ public void commitSync(Duration timeout) {
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1. If automatic group management with {@link #subscribe(Collection)} is used,
* i.e. {@code nextRecordToBeProcessed.offset()} (or {@link ConsumerRecords#nextOffsets()}).
* You should also add the leader epoch as commit metadata, which can be obtained from
* {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets()}.
* If automatic group management with {@link #subscribe(Collection)} is used,
* then the committed offsets must belong to the currently auto-assigned partitions.
* <p>
* This is a synchronous commit and will block until either the commit succeeds or an unrecoverable error is
Expand Down Expand Up @@ -1033,7 +1042,10 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1. If automatic group management with {@link #subscribe(Collection)} is used,
* i.e. {@code nextRecordToBeProcessed.offset()} (or {@link ConsumerRecords#nextOffsets()}).
* You should also add the leader epoch as commit metadata, which can be obtained from
* {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets()}.
* If automatic group management with {@link #subscribe(Collection)} is used,
* then the committed offsets must belong to the currently auto-assigned partitions.
* <p>
* This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
Expand Down Expand Up @@ -1117,7 +1129,10 @@ public void commitAsync(OffsetCommitCallback callback) {
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1. If automatic group management with {@link #subscribe(Collection)} is used,
* i.e. {@code nextRecordToBeProcessed.offset()} (or {@link ConsumerRecords#nextOffsets()}).
* You should also add the leader epoch as commit metadata, which can be obtained from
* {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets()}.
* If automatic group management with {@link #subscribe(Collection)} is used,
* then the committed offsets must belong to the currently auto-assigned partitions.
* <p>
* This is an asynchronous call and will not block. Any errors encountered are either passed to the callback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
Expand Down Expand Up @@ -677,7 +679,9 @@ public void beginTransaction() throws ProducerFencedException {
* Sends a list of specified offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
* committed only if the transaction is committed successfully. The committed offset should
* be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
* be the next message your application will consume, i.e. {@code nextRecordToBeProcessed.offset()}
* (or {@link ConsumerRecords#nextOffsets()}). You should also add the leader epoch as commit metadata,
* which can be obtained from {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets()}.
* <p>
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern. Thus, the specified
Expand Down

0 comments on commit 3918f37

Please sign in to comment.