-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Connector-V2] Support extract partition from SeaTunnelRow fields #3085
Conversation
@hailin0 PTAL |
List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames()); | ||
String key; | ||
if (fields.contains(keyField)) { | ||
key = element.getField(fields.indexOf(keyField)).toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to check null ?
|
||
@Override | ||
public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) { | ||
return new ProducerRecord<>(topic, key.getBytes(), jsonSerializationSchema.serialize(row)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check key is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check key is null?
I think we don not need check key
is null, because if the key
is null, kafka will send it to a random partition(random select a new partition per topic.metadata.refresh.ms
).
The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry, you need check key
is null. Because you use key.toBytes()
. You can update to
return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));
docs/en/connector-v2/sink/Kafka.md
Outdated
@@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on | |||
| bootstrap.servers | string | yes | - | | |||
| kafka.* | kafka producer config | no | - | | |||
| semantic | string | no | NON | | |||
| key | string | no | - | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| key | string | no | - | | |
| partition_key | string | no |- | |
docs/en/connector-v2/sink/Kafka.md
Outdated
@@ -50,6 +51,21 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b | |||
|
|||
NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated. | |||
|
|||
### key [string] | |||
|
|||
Determine the partition of the kafka send message based on the key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add support config filed name
docs/en/connector-v2/sink/Kafka.md
Outdated
@@ -50,6 +51,23 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b | |||
|
|||
NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated. | |||
|
|||
### partition_key [string] | |||
|
|||
Determine the partition of the kafka send message based on the key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace with Configure which field is used as the key of the kafka message
better.
docs/en/connector-v2/sink/Kafka.md
Outdated
- Add kafka sink doc | ||
- New feature : Kafka specified partition to send | ||
- New feature : Determine the partition that kafka send based on the message content | ||
- New feature : Determine the partition that kafka send messag based on the message content | ||
- New feature : Determine the partition of the kafka send message based on the field name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace with Configure which field is used as the key of the kafka message better
?
|
||
@Override | ||
public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) { | ||
return new ProducerRecord<>(topic, key.getBytes(), jsonSerializationSchema.serialize(row)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry, you need check key
is null. Because you use key.toBytes()
. You can update to
return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));
Object field = element.getField(fields.indexOf(keyField)); | ||
//If the field is null, send the message to the same partition | ||
if (field == null) { | ||
key = "null"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not a good way. because there may be a lot of null
values. If you case null
to "null", it means all of null
values will be write to a same partition.
You can keep null
and update the code in seaTunnelRowSerializer.serializeRowByKey(key, element)
like this
return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));
if the key is null, kafka will send it to a random partition(random select a new partition per topic.metadata.refresh.ms).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It' s a good way, thanks for your advice.
@@ -74,6 +92,7 @@ public KafkaSinkWriter( | |||
List<KafkaSinkState> kafkaStates) { | |||
this.context = context; | |||
this.pluginConfig = pluginConfig; | |||
this.seaTunnelRowType = seaTunnelRowType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.seaTunnelRowType = seaTunnelRowType; | |
this.partitionExtractor = createPartitionExtractor(pluginConfig, seaTunnelRowType); |
Add this method
private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
SeaTunnelRowType seaTunnelRowType) {
if (!pluginConfig.hasPath(PARTITION_KEY)){
return row -> null;
}
String partitionKey = pluginConfig.getString(PARTITION_KEY);
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
if (!fieldNames.contains(partitionKey)) {
return row -> partitionKey;
}
int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
return row -> {
Object partitionFieldValue = row.getField(partitionFieldIndex);
if (partitionFieldValue != null) {
return partitionFieldValue.toString();
}
return null;
};
}
String keyField = pluginConfig.getString(PARTITION_KEY); | ||
List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames()); | ||
String key; | ||
if (fields.contains(keyField)) { | ||
key = element.getField(fields.indexOf(keyField)).toString(); | ||
} else { | ||
key = keyField; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String keyField = pluginConfig.getString(PARTITION_KEY); | |
List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames()); | |
String key; | |
if (fields.contains(keyField)) { | |
key = element.getField(fields.indexOf(keyField)).toString(); | |
} else { | |
key = keyField; | |
} | |
String key = partitionExtractor.apply(element); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to @EricJoy2048 and @hailin0 the good suggestion, I will fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
docs/en/connector-v2/sink/Kafka.md
Outdated
@@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on | |||
| bootstrap.servers | string | yes | - | | |||
| kafka.* | kafka producer config | no | - | | |||
| semantic | string | no | NON | | |||
| partition_key | string | no | - | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| partition_key | string | no | - | | |
| partition_key | string | no | - | |
checkstyle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Purpose of this pull request
issue:#2787
The key is specified by field name.Determine the partition of the kafka send message based on the key.
Check list
New License Guide