Skip to content

Commit

Permalink
KAFKA-18303; Update ShareCoordinator to use new record format (#18396)
Browse files Browse the repository at this point in the history
Following #18261, this patch updates the Share Coordinator to use the new record format.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Andrew Schofield <aschofield@confluent.io>
  • Loading branch information
dajac authored Jan 7, 2025
1 parent 3918f37 commit 7b6e946
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 60 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,8 @@ project(':share-coordinator') {
args = [ "-p", "org.apache.kafka.coordinator.share.generated",
"-o", "${projectDir}/build/generated/main/java/org/apache/kafka/coordinator/share/generated",
"-i", "src/main/resources/common/message",
"-m", "MessageDataGenerator", "JsonConverterGenerator"
"-m", "MessageDataGenerator", "JsonConverterGenerator",
"-t", "CoordinatorRecordTypeGenerator"
]
inputs.dir("src/main/resources/common/message")
.withPropertyName("messages")
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue}
import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnapshotValue, ShareUpdateKey, ShareUpdateValue}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde}
import org.apache.kafka.coordinator.share.generated.{CoordinatorRecordType, ShareSnapshotKey, ShareSnapshotValue, ShareUpdateKey, ShareUpdateValue}
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.coordinator.transaction.{TransactionCoordinatorRecordSerde, TransactionLogConfig}
import org.apache.kafka.metadata.MetadataRecordSerde
Expand Down Expand Up @@ -1119,7 +1119,7 @@ class DumpLogSegmentsTest {
.setGroupId("gs1")
.setTopicId(Uuid.fromString("Uj5wn_FqTXirEASvVZRY1w"))
.setPartition(0),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION),
CoordinatorRecordType.SHARE_SNAPSHOT.id()),
new ApiMessageAndVersion(new ShareSnapshotValue()
.setSnapshotEpoch(0)
.setStateEpoch(0)
Expand All @@ -1132,7 +1132,7 @@ class DumpLogSegmentsTest {
.setDeliveryState(2)
.setDeliveryCount(1)
).asJava),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_VALUE_VERSION)
0.toShort)
))
)

Expand All @@ -1147,7 +1147,7 @@ class DumpLogSegmentsTest {
.setGroupId("gs1")
.setTopicId(Uuid.fromString("Uj5wn_FqTXirEASvVZRY1w"))
.setPartition(0),
ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION),
CoordinatorRecordType.SHARE_UPDATE.id()),
new ApiMessageAndVersion(new ShareUpdateValue()
.setSnapshotEpoch(0)
.setLeaderEpoch(0)
Expand Down Expand Up @@ -1175,7 +1175,7 @@ class DumpLogSegmentsTest {
.setGroupId("gs1")
.setTopicId(Uuid.fromString("Uj5wn_FqTXirEASvVZRY1w"))
.setPartition(0),
0.toShort
CoordinatorRecordType.SHARE_SNAPSHOT.id()
),
null
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@
import java.util.function.IntSupplier;

public interface ShareCoordinator {
short SHARE_SNAPSHOT_RECORD_KEY_VERSION = 0;
short SHARE_SNAPSHOT_RECORD_VALUE_VERSION = 0;
short SHARE_UPDATE_RECORD_KEY_VERSION = 1;
short SHARE_UPDATE_RECORD_VALUE_VERSION = 1;

/**
* Return the partition index for the given key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
Expand All @@ -33,7 +34,7 @@ public static CoordinatorRecord newShareSnapshotRecord(String groupId, Uuid topi
.setGroupId(groupId)
.setTopicId(topicId)
.setPartition(partitionId),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION),
CoordinatorRecordType.SHARE_SNAPSHOT.id()),
new ApiMessageAndVersion(new ShareSnapshotValue()
.setSnapshotEpoch(offsetData.snapshotEpoch())
.setStateEpoch(offsetData.stateEpoch())
Expand All @@ -46,7 +47,7 @@ public static CoordinatorRecord newShareSnapshotRecord(String groupId, Uuid topi
.setDeliveryCount(batch.deliveryCount())
.setDeliveryState(batch.deliveryState()))
.collect(Collectors.toList())),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_VALUE_VERSION)
(short) 0)
);
}

Expand All @@ -56,7 +57,7 @@ public static CoordinatorRecord newShareSnapshotUpdateRecord(String groupId, Uui
.setGroupId(groupId)
.setTopicId(topicId)
.setPartition(partitionId),
ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION),
CoordinatorRecordType.SHARE_UPDATE.id()),
new ApiMessageAndVersion(new ShareUpdateValue()
.setSnapshotEpoch(offsetData.snapshotEpoch())
.setLeaderEpoch(offsetData.leaderEpoch())
Expand All @@ -68,7 +69,7 @@ public static CoordinatorRecord newShareSnapshotUpdateRecord(String groupId, Uui
.setDeliveryCount(batch.deliveryCount())
.setDeliveryState(batch.deliveryState()))
.collect(Collectors.toList())),
ShareCoordinator.SHARE_UPDATE_RECORD_VALUE_VERSION)
(short) 0)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde {
@Override
protected ApiMessage apiMessageKeyFor(short recordVersion) {
switch (recordVersion) {
case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION:
case 0:
return new ShareSnapshotKey();
case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION:
case 1:
return new ShareUpdateKey();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
Expand All @@ -41,9 +41,9 @@ protected ApiMessage apiMessageKeyFor(short recordVersion) {
@Override
protected ApiMessage apiMessageValueFor(short recordVersion) {
switch (recordVersion) {
case ShareCoordinator.SHARE_SNAPSHOT_RECORD_VALUE_VERSION:
case 0:
return new ShareSnapshotValue();
case ShareCoordinator.SHARE_UPDATE_RECORD_VALUE_VERSION:
case 1:
return new ShareUpdateValue();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
Expand All @@ -38,6 +39,7 @@
import org.apache.kafka.coordinator.common.runtime.CoordinatorShard;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilder;
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
Expand Down Expand Up @@ -206,15 +208,19 @@ public void replay(long offset, long producerId, short producerEpoch, Coordinato
ApiMessageAndVersion key = record.key();
ApiMessageAndVersion value = record.value();

switch (key.version()) {
case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION: // ShareSnapshot
handleShareSnapshot((ShareSnapshotKey) key.message(), (ShareSnapshotValue) messageOrNull(value), offset);
break;
case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: // ShareUpdate
handleShareUpdate((ShareUpdateKey) key.message(), (ShareUpdateValue) messageOrNull(value));
break;
default:
// Noop
try {
switch (CoordinatorRecordType.fromId(key.version())) {
case SHARE_SNAPSHOT:
handleShareSnapshot((ShareSnapshotKey) key.message(), (ShareSnapshotValue) messageOrNull(value), offset);
break;
case SHARE_UPDATE:
handleShareUpdate((ShareUpdateKey) key.message(), (ShareUpdateValue) messageOrNull(value));
break;
default:
// Noop
}
} catch (UnsupportedVersionException ex) {
// Ignore
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// KIP-932 is in development. This schema is subject to non-backwards-compatible changes.

{
"type": "data",
"apiKey": 0,
"type": "coordinator-key",
"name": "ShareSnapshotKey",
"validVersions": "0",
"flexibleVersions": "none",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// KIP-932 is in development. This schema is subject to non-backwards-compatible changes.

{
"type": "data",
"apiKey": 0,
"type": "coordinator-value",
"name": "ShareSnapshotValue",
"validVersions": "0",
"flexibleVersions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
// KIP-932 is in development. This schema is subject to non-backwards-compatible changes.

{
"type": "data",
"apiKey": 1,
"type": "coordinator-key",
"name": "ShareUpdateKey",
"validVersions": "1",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "1",
{ "name": "GroupId", "type": "string", "versions": "0",
"about": "The group id." },
{ "name": "TopicId", "type": "uuid", "versions": "1",
{ "name": "TopicId", "type": "uuid", "versions": "0",
"about": "The topic id." },
{ "name": "Partition", "type": "int32", "versions": "1",
{ "name": "Partition", "type": "int32", "versions": "0",
"about": "The partition index." }
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// KIP-932 is in development. This schema is subject to non-backwards-compatible changes.

{
"type": "data",
"apiKey": 1,
"type": "coordinator-value",
"name": "ShareUpdateValue",
"validVersions": "0",
"flexibleVersions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testNewShareSnapshotRecord() {
.setGroupId(groupId)
.setTopicId(topicId)
.setPartition(partitionId),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION),
(short) 0),
new ApiMessageAndVersion(
new ShareSnapshotValue()
.setSnapshotEpoch(0)
Expand All @@ -70,7 +70,7 @@ public void testNewShareSnapshotRecord() {
.setLastOffset(10L)
.setDeliveryState((byte) 0)
.setDeliveryCount((short) 1))),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_VALUE_VERSION));
(short) 0));

assertEquals(expectedRecord, record);
}
Expand Down Expand Up @@ -100,7 +100,7 @@ public void testNewShareUpdateRecord() {
.setGroupId(groupId)
.setTopicId(topicId)
.setPartition(partitionId),
ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION),
(short) 1),
new ApiMessageAndVersion(
new ShareUpdateValue()
.setSnapshotEpoch(0)
Expand All @@ -112,7 +112,7 @@ public void testNewShareUpdateRecord() {
.setLastOffset(10L)
.setDeliveryState((byte) 0)
.setDeliveryCount((short) 1))),
ShareCoordinator.SHARE_UPDATE_RECORD_VALUE_VERSION));
(short) 0));

assertEquals(expectedRecord, record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
Expand Down Expand Up @@ -75,7 +76,7 @@ public void testSerializeNullValue() {
.setGroupId("group")
.setTopicId(Uuid.randomUuid())
.setPartition(1),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION
CoordinatorRecordType.SHARE_SNAPSHOT.id()
),
null
);
Expand Down Expand Up @@ -104,7 +105,7 @@ public void testDeserializeWithTombstoneForValue() {
.setGroupId("groupId")
.setTopicId(Uuid.randomUuid())
.setPartition(1),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION
CoordinatorRecordType.SHARE_SNAPSHOT.id()
);
ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());

Expand Down Expand Up @@ -145,7 +146,7 @@ public void testDeserializeWithValueEmptyBuffer() {
.setGroupId("foo")
.setTopicId(Uuid.randomUuid())
.setPartition(1),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION
CoordinatorRecordType.SHARE_SNAPSHOT.id()
);
ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());

Expand Down Expand Up @@ -181,7 +182,7 @@ public void testDeserializeWithInvalidValueBytes() {
.setGroupId("foo")
.setTopicId(Uuid.randomUuid())
.setPartition(1),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION
CoordinatorRecordType.SHARE_SNAPSHOT.id()
);
ByteBuffer keyBuffer = MessageUtil.toVersionPrefixedByteBuffer(key.version(), key.message());

Expand Down Expand Up @@ -228,7 +229,7 @@ private static CoordinatorRecord getShareSnapshotRecord(String groupId, Uuid top
.setGroupId(groupId)
.setTopicId(topicId)
.setPartition(partitionId),
ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION
CoordinatorRecordType.SHARE_SNAPSHOT.id()
),
new ApiMessageAndVersion(
new ShareSnapshotValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKeyJsonConverter;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
Expand Down Expand Up @@ -101,13 +103,16 @@ private JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {

private Optional<ApiMessage> readToSnapshotMessageKey(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= ShareSnapshotKey.LOWEST_SUPPORTED_VERSION
&& version <= ShareSnapshotKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new ShareSnapshotKey(new ByteBufferAccessor(byteBuffer), version));
} else if (version >= ShareUpdateKey.LOWEST_SUPPORTED_VERSION
&& version <= ShareUpdateKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new ShareUpdateKey(new ByteBufferAccessor(byteBuffer), version));
} else {
try {
switch (CoordinatorRecordType.fromId(version)) {
case SHARE_SNAPSHOT:
return Optional.of(new ShareSnapshotKey(new ByteBufferAccessor(byteBuffer), version));
case SHARE_UPDATE:
return Optional.of(new ShareUpdateKey(new ByteBufferAccessor(byteBuffer), version));
default:
return Optional.empty();
}
} catch (UnsupportedVersionException ex) {
return Optional.empty();
}
}
Expand Down Expand Up @@ -155,13 +160,17 @@ private Optional<ApiMessage> readToSnapshotMessageValue(ByteBuffer byteBuffer, s
// Check the key version here as that will determine which type
// of value record to fetch. Both share update and share snapshot
// value records can have the same version.
if (keyVersion >= ShareSnapshotKey.LOWEST_SUPPORTED_VERSION
&& keyVersion <= ShareSnapshotKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new ShareSnapshotValue(new ByteBufferAccessor(byteBuffer), version));
} else if (keyVersion >= ShareUpdateKey.LOWEST_SUPPORTED_VERSION
&& keyVersion <= ShareUpdateKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new ShareUpdateValue(new ByteBufferAccessor(byteBuffer), version));
try {
switch (CoordinatorRecordType.fromId(keyVersion)) {
case SHARE_SNAPSHOT:
return Optional.of(new ShareSnapshotValue(new ByteBufferAccessor(byteBuffer), version));
case SHARE_UPDATE:
return Optional.of(new ShareUpdateValue(new ByteBufferAccessor(byteBuffer), version));
default:
return Optional.empty();
}
} catch (UnsupportedVersionException ex) {
return Optional.empty();
}
return Optional.empty();
}
}

0 comments on commit 7b6e946

Please sign in to comment.