Skip to content

Commit

Permalink
[Beam#25046]add support for flink 1.16 for Beam Flink Runner (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yananhao12 authored Feb 24, 2023
1 parent bddfd86 commit de71d13
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 6 deletions.
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ tasks.register("javaPreCommit") {
dependsOn(":runners:flink:1.14:job-server:build")
dependsOn(":runners:flink:1.15:build")
dependsOn(":runners:flink:1.15:job-server:build")
dependsOn(":runners:flink:1.16:build")
dependsOn(":runners:flink:1.16:job-server:build")
dependsOn(":runners:google-cloud-dataflow-java:build")
dependsOn(":runners:google-cloud-dataflow-java:examples-streaming:build")
dependsOn(":runners:google-cloud-dataflow-java:examples:build")
Expand Down Expand Up @@ -339,6 +341,7 @@ tasks.register("javaPostCommitSickbay") {
dependsOn(":runners:flink:1.13:validatesRunnerSickbay")
dependsOn(":runners:flink:1.14:validatesRunnerSickbay")
dependsOn(":runners:flink:1.15:validatesRunnerSickbay")
dependsOn(":runners:flink:1.16:validatesRunnerSickbay")
dependsOn(":runners:spark:3:job-server:validatesRunnerSickbay")
dependsOn(":runners:direct-java:validatesRunnerSickbay")
dependsOn(":runners:portability:java:validatesRunnerSickbay")
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ javaVersion=1.8
docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_

flink_versions=1.12,1.13,1.14,1.15
flink_versions=1.12,1.13,1.14,1.15,1.16

34 changes: 34 additions & 0 deletions runners/flink/1.16/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '..'

/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_version = '1.16.0'
// Version specific code overrides.
main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", "${basePath}/1.15/src/main/java", './src/main/java']
test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", "${basePath}/1.15/src/test/java", './src/test/java']
main_resources_overrides = []
test_resources_overrides = []
archives_base_name = 'beam-runners-flink-1.16'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"
26 changes: 26 additions & 0 deletions runners/flink/1.16/job-server-container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server-container'

project.ext {
resource_path = basePath
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server_container.gradle"
31 changes: 31 additions & 0 deletions runners/flink/1.16/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server'

project.ext {
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-flink-1.16-job-server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server.gradle"
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -51,7 +52,7 @@ public void preSubmit() throws Exception {

@After
public void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultDir);
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultDir);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.test.util.TestBaseUtils;

/** Reads from a bounded source in batch execution. */
public class ReadSourceTest extends JavaProgramTestBase {
Expand All @@ -52,7 +53,7 @@ protected void preSubmit() throws Exception {

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void preSubmit() throws Exception {

@After
public void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultDir);
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultDir);
}

/** DoFn extracting user and timestamp. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void preSubmit() throws Exception {

@After
public void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultDir);
TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultDir);
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ def _add_argparse_args(cls, parser):
class FlinkRunnerOptions(PipelineOptions):

# These should stay in sync with gradle.properties.
PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15']
PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15', '1.16']

@classmethod
def _add_argparse_args(cls, parser):
Expand Down
4 changes: 4 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ include(":runners:flink:1.14:job-server-container")
include(":runners:flink:1.15")
include(":runners:flink:1.15:job-server")
include(":runners:flink:1.15:job-server-container")
// Flink 1.16
include(":runners:flink:1.16")
include(":runners:flink:1.16:job-server")
include(":runners:flink:1.16:job-server-container")
/* End Flink Runner related settings */
include(":runners:twister2")
include(":runners:google-cloud-dataflow-java")
Expand Down

0 comments on commit de71d13

Please sign in to comment.