From 3cd47d22f38edad394212106f7f77e2ef836d7e1 Mon Sep 17 00:00:00 2001 From: pengzirui Date: Thu, 19 Oct 2023 19:57:22 +0800 Subject: [PATCH 1/2] [INLONG-9081][Sort] Pulsar connector in flink 1.15 should running in exclusive mode --- .../org/apache/inlong/sort/pulsar/PulsarTableFactory.java | 6 +++--- .../apache/inlong/sort/pulsar/PulsarTableOptionUtils.java | 6 +++--- .../org/apache/inlong/sort/pulsar/PulsarTableOptions.java | 6 +++--- .../inlong/sort/pulsar/PulsarTableValidationUtils.java | 6 +++--- .../apache/inlong/sort/pulsar/table/PulsarTableSource.java | 5 +++-- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java index 9ca4ac4d6dc..d88f2804052 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java @@ -64,7 +64,7 @@ import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE; -import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID; +import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID; @@ -179,7 +179,7 @@ public Set> optionalOptions() { VALUE_FORMAT, SOURCE_SUBSCRIPTION_NAME, SOURCE_SUBSCRIPTION_TYPE, - SOURCE_START_FROM_MESSAGE_ID, + STARTUP_MODE, SOURCE_START_FROM_PUBLISH_TIME, SOURCE_STOP_AT_MESSAGE_ID, SOURCE_STOP_AFTER_MESSAGE_ID, @@ -203,7 +203,7 @@ public Set> forwardOptions() { SERVICE_URL, SOURCE_SUBSCRIPTION_TYPE, SOURCE_SUBSCRIPTION_NAME, - SOURCE_START_FROM_MESSAGE_ID, + STARTUP_MODE, SOURCE_START_FROM_PUBLISH_TIME, SOURCE_STOP_AT_MESSAGE_ID, SOURCE_STOP_AFTER_MESSAGE_ID, diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java index 102ae75938f..ad51894d394 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java @@ -46,7 +46,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT; -import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID; +import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID; @@ -173,8 +173,8 @@ public static Properties getPulsarPropertiesWithPrefix( } public static StartCursor getStartCursor(ReadableConfig tableOptions) { - if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) { - return parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID)); + if (tableOptions.getOptional(STARTUP_MODE).isPresent()) { + return parseMessageIdStartCursor(tableOptions.get(STARTUP_MODE)); } else if (tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) { return parsePublishTimeStartCursor(tableOptions.get(SOURCE_START_FROM_PUBLISH_TIME)); } else { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java index cbbe8124716..8aeb5413804 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java @@ -79,7 +79,7 @@ private PulsarTableOptions() { * Copied because we want to have a default value for it. */ public static final ConfigOption SOURCE_SUBSCRIPTION_NAME = - ConfigOptions.key("source.subscription-name") + ConfigOptions.key("scan.startup.sub-name") .stringType() .noDefaultValue() .withDescription( @@ -88,8 +88,8 @@ private PulsarTableOptions() { "The subscription name of the consumer that is used by the runtime [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source). This argument is required for constructing the consumer.") .build()); - public static final ConfigOption SOURCE_START_FROM_MESSAGE_ID = - ConfigOptions.key("source.start.message-id") + public static final ConfigOption STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") .stringType() .noDefaultValue() .withDescription( diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java index 62440a462bf..e293200ec75 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java @@ -36,7 +36,7 @@ import static org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getValueDecodingFormat; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT; -import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID; +import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID; @@ -91,12 +91,12 @@ protected static void validateTopicsConfigs(ReadableConfig tableOptions) { } protected static void validateStartCursorConfigs(ReadableConfig tableOptions) { - if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent() + if (tableOptions.getOptional(STARTUP_MODE).isPresent() && tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) { throw new ValidationException( String.format( "Only one of %s and %s can be specified. Detected both of them", - SOURCE_START_FROM_MESSAGE_ID, SOURCE_START_FROM_PUBLISH_TIME)); + STARTUP_MODE, SOURCE_START_FROM_PUBLISH_TIME)); } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java index 9b3bf703823..f177eb7543d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java @@ -19,7 +19,6 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.connector.pulsar.source.PulsarSource; -import org.apache.flink.connector.pulsar.source.PulsarSourceOptions; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; @@ -119,7 +118,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { .setUnboundedStopCursor(stopCursor) .setDeserializationSchema(deserializationSchema) .setProperties(properties) - .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true) + // only support exclusive since shared mode requires pulsar with transaction enabled + // and supporting transaction consumes more resources in pulsar broker + .setSubscriptionType(SubscriptionType.Exclusive) .build(); return SourceProvider.of(source); } From f2ed9958c29bfc05fe436108486c9f95c4c03285 Mon Sep 17 00:00:00 2001 From: pengzirui Date: Thu, 19 Oct 2023 20:04:01 +0800 Subject: [PATCH 2/2] add format --- .../org/apache/inlong/sort/pulsar/PulsarTableFactory.java | 2 +- .../org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java | 2 +- .../apache/inlong/sort/pulsar/PulsarTableValidationUtils.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java index d88f2804052..4adc2e64ba0 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java @@ -64,13 +64,13 @@ import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE; -import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE; +import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.VALUE_FORMAT; import static org.apache.inlong.sort.pulsar.PulsarTableValidationUtils.validatePrimaryKeyConstraints; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java index ad51894d394..a495244776d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java @@ -46,12 +46,12 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT; -import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE; +import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.VALUE_FORMAT; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java index e293200ec75..d49296ab9d7 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java @@ -36,12 +36,12 @@ import static org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getValueDecodingFormat; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT; -import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE; +import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE; import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC; import static org.apache.pulsar.common.naming.TopicName.isValid; @@ -96,7 +96,7 @@ protected static void validateStartCursorConfigs(ReadableConfig tableOptions) { throw new ValidationException( String.format( "Only one of %s and %s can be specified. Detected both of them", - STARTUP_MODE, SOURCE_START_FROM_PUBLISH_TIME)); + STARTUP_MODE, SOURCE_START_FROM_PUBLISH_TIME)); } }