From a4af9a10758f06ded927ece37484b03ec9d100ed Mon Sep 17 00:00:00 2001 From: Zhenqiu Huang Date: Sat, 10 Aug 2024 19:35:31 -0700 Subject: [PATCH] [FLINK-34468][Connector/Cassandra] Adding support for Flink 1.19 --- .github/workflows/push_pr.yml | 3 ++- .gitignore | 1 + .../dcfaa83d-a12c-48e1-9e51-b8d3808cd287 | 8 +------- .../cassandra/source/CassandraSourceITCase.java | 6 ------ .../cassandra/CassandraConnectorITCase.java | 13 +------------ pom.xml | 2 +- 6 files changed, 6 insertions(+), 27 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 63639e12..5bce6875 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -25,8 +25,9 @@ jobs: compile_and_test: strategy: matrix: - flink: [ 1.19.0 ] + flink: [ 1.20.0 ] include: + - flink: 1.19.0 - flink: 1.18.1 uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils diff --git a/.gitignore b/.gitignore index acbe2176..1fa6f3af 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .eslintcache .cache +.java-version scalastyle-output.xml .classpath .idea/* diff --git a/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 b/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 index 5c9a448d..ad4fffe9 100644 --- a/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 +++ b/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 @@ -1,10 +1,4 @@ Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method in (CassandraSource.java:138) -Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method in (CassandraSource.java:124) -Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method in (CassandraSource.java:125) -Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method in (CassandraSource.java:126) -Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method in (CassandraSource.java:127) -Method calls method in (CassandraSource.java:145) -Method calls method in (CassandraSource.java:149) Method is annotated with in (CassandraSource.java:0) Method is annotated with in (CassandraSplitReader.java:0) -Method is annotated with in (SplitsGenerator.java:0) \ No newline at end of file +Method is annotated with in (SplitsGenerator.java:0) diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java index d6eecb17..e203fa8f 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java @@ -176,7 +176,6 @@ public void testGenerateSplitsWithTooHighMaximumSplitSize( } // overridden to use unordered checks - @Override protected void checkResultWithSemantic( CloseableIterator resultIterator, List> testData, @@ -197,7 +196,6 @@ protected void checkResultWithSemantic( } @Disabled("Not a unbounded source") - @Override public void testSourceMetrics( TestEnvironment testEnv, DataStreamSourceExternalContext externalContext, @@ -205,28 +203,24 @@ public void testSourceMetrics( throws Exception {} @Disabled("Not a unbounded source") - @Override public void testSavepoint( TestEnvironment testEnv, DataStreamSourceExternalContext externalContext, CheckpointingMode semantic) {} @Disabled("Not a unbounded source") - @Override public void testScaleUp( TestEnvironment testEnv, DataStreamSourceExternalContext externalContext, CheckpointingMode semantic) {} @Disabled("Not a unbounded source") - @Override public void testScaleDown( TestEnvironment testEnv, DataStreamSourceExternalContext externalContext, CheckpointingMode semantic) {} @Disabled("Not a unbounded source") - @Override public void testTaskManagerFailure( TestEnvironment testEnv, DataStreamSourceExternalContext externalContext, diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index f12e595d..6f9dcd77 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; -import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.testutils.junit.extensions.retry.RetryExtension; @@ -80,10 +79,7 @@ @SuppressWarnings("serial") @Testcontainers @ExtendWith(RetryExtension.class) -class CassandraConnectorITCase - extends WriteAheadSinkTestBase< - Tuple3, - CassandraTupleWriteAheadSink>> { +class CassandraConnectorITCase { private static final CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(false); @@ -284,7 +280,6 @@ void testAnnotatePojoWithTable() { // Exactly-once Tests // ------------------------------------------------------------------------ - @Override protected CassandraTupleWriteAheadSink> createSink() throws Exception { return new CassandraTupleWriteAheadSink<>( @@ -295,17 +290,14 @@ protected CassandraTupleWriteAheadSink> createS new CassandraCommitter(cassandraTestEnvironment.getBuilderForReading())); } - @Override protected TupleTypeInfo> createTypeInfo() { return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class); } - @Override protected Tuple3 generateValue(int counter, int checkpointID) { return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID); } - @Override protected void verifyResultsIdealCircumstances( CassandraTupleWriteAheadSink> sink) { @@ -325,7 +317,6 @@ protected void verifyResultsIdealCircumstances( .isEmpty(); } - @Override protected void verifyResultsDataPersistenceUponMissedNotify( CassandraTupleWriteAheadSink> sink) { @@ -345,7 +336,6 @@ protected void verifyResultsDataPersistenceUponMissedNotify( .isEmpty(); } - @Override protected void verifyResultsDataDiscardingUponRestore( CassandraTupleWriteAheadSink> sink) { @@ -368,7 +358,6 @@ protected void verifyResultsDataDiscardingUponRestore( .isEmpty(); } - @Override protected void verifyResultsWhenReScaling( CassandraTupleWriteAheadSink> sink, int startElementCounter, diff --git a/pom.xml b/pom.xml index 858bc049..73308742 100644 --- a/pom.xml +++ b/pom.xml @@ -42,7 +42,7 @@ under the License. - 1.18.0 + 1.20.0 3.1.0-1.17 19.0