From 5ca14c183b04f98bb469d79cc89cb7f55f86efbe Mon Sep 17 00:00:00 2001 From: reuvenlax Date: Fri, 21 Feb 2025 11:03:36 -0800 Subject: [PATCH] fix side-input existence deadline (#34046) --- .../StatefulParDoEvaluatorFactoryTest.java | 3 ++- .../dataflow/DataflowPipelineJobTest.java | 2 +- .../runners/dataflow/TestDataflowRunnerTest.java | 2 +- .../sideinput/SideInputStateFetcher.java | 11 +++++++++-- .../worker/PartialGroupByKeyParDoFnsTest.java | 12 ++++++++---- .../worker/StreamingDataflowWorkerTest.java | 2 +- .../environment/DockerEnvironmentFactory.java | 6 +++++- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 16 ++++------------ .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 2 +- 9 files changed, 32 insertions(+), 24 deletions(-) diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index 9983eafb2515..7746e48317d5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -193,7 +193,8 @@ public void process(ProcessContext c) {} // And digging to check whether the window is ready when(mockEvaluationContext.createSideInputReader(anyList())).thenReturn(mockSideInputReader); - when(mockSideInputReader.isReady(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(false); + when(mockSideInputReader.isReady(ArgumentMatchers.any(), ArgumentMatchers.any())) + .thenReturn(false); IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9)); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index f2979f2fa9e9..54ba10df9d1c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -23,8 +23,8 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java index 89700079aac3..ed6259a3ee22 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java @@ -24,8 +24,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java index 303cdeb94f8c..48f87337e0a8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java @@ -30,6 +30,7 @@ import java.util.function.Function; import javax.annotation.concurrent.NotThreadSafe; import org.apache.beam.runners.core.InMemoryMultimapSideInputView; +import org.apache.beam.runners.core.LateDataUtils; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData; @@ -48,6 +49,8 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Ordering; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,6 +210,11 @@ private GlobalData fetchGlobalDataFromWin ByteStringOutputStream windowStream = new ByteStringOutputStream(); windowCoder.encode(sideWindow, windowStream); + Instant firingDeadline = + Ordering.natural() + .min( + sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideWindow), + LateDataUtils.garbageCollectionTime(sideWindow, sideWindowStrategy)); GlobalDataRequest request = GlobalDataRequest.newBuilder() .setDataId( @@ -216,8 +224,7 @@ private GlobalData fetchGlobalDataFromWin .build()) .setStateFamily(stateFamily) .setExistenceWatermarkDeadline( - WindmillTimeUtils.harnessToWindmillTimestamp( - sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideWindow))) + WindmillTimeUtils.harnessToWindmillTimestamp(firingDeadline)) .build(); try (Closeable ignored = scopedReadStateSupplier.get()) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFnsTest.java index a24b8d0e254c..19827432e134 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFnsTest.java @@ -248,7 +248,8 @@ public void testPartialGroupByKeyWithCombinerAndSideInputs() throws Exception { ImmutableList.of( WindowedValue.valueInGlobalWindow(KV.of("hi", 4)), WindowedValue.valueInGlobalWindow(KV.of("there", 5)))); - when(mockSideInputFetcher.storeIfBlocked(ArgumentMatchers.>>any())) + when(mockSideInputFetcher.storeIfBlocked( + ArgumentMatchers.>>any())) .thenReturn(false, false, false, true); pgbkParDoFn.startBundle(receiver); @@ -360,7 +361,8 @@ public void testCreateWithCombinerAndStreamingSideInputs() throws Exception { when(mockSideInputReader.isEmpty()).thenReturn(false); when(mockStreamingStepContext.stateInternals()).thenReturn((StateInternals) mockStateInternals); - when(mockStateInternals.state(ArgumentMatchers.any(), ArgumentMatchers.any())) + when(mockStateInternals.state( + ArgumentMatchers.any(), ArgumentMatchers.any())) .thenReturn(mockState); when(mockState.read()).thenReturn(Maps.newHashMap()); @@ -394,7 +396,8 @@ public void testCoderSizeEstimationWithNonLazyObserver() throws Exception { return null; }) .when(mockCoder) - .registerByteSizeObserver(ArgumentMatchers.eq("apple"), ArgumentMatchers.any()); + .registerByteSizeObserver( + ArgumentMatchers.eq("apple"), ArgumentMatchers.any()); CoderSizeEstimator estimator = new CoderSizeEstimator(mockCoder); assertEquals(5, estimator.estimateSize("apple")); } @@ -410,7 +413,8 @@ public void testCoderSizeEstimationWithLazyObserver() throws Exception { return null; }) .when(mockCoder) - .registerByteSizeObserver(ArgumentMatchers.eq("apple"), ArgumentMatchers.any()); + .registerByteSizeObserver( + ArgumentMatchers.eq("apple"), ArgumentMatchers.any()); // Encode the input to the output stream doAnswer( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 31307a199d76..539520a2edc5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -33,9 +33,9 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java index a7826d43e900..c18bd07a342f 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java @@ -125,7 +125,11 @@ public RemoteEnvironment createEnvironment(Environment environment, String worke String containerId = null; InstructionRequestHandler instructionHandler = null; try { - LOG.info("Running Docker command: image={}, opts={}, args={}", containerImage, dockerOptsBuilder.build(), argsBuilder.build()); + LOG.info( + "Running Docker command: image={}, opts={}, args={}", + containerImage, + dockerOptsBuilder.build(), + argsBuilder.build()); containerId = docker.runImage(containerImage, dockerOptsBuilder.build(), argsBuilder.build()); LOG.debug("Created Docker Container with Container ID {}", containerId); // Wait on a client from the gRPC server. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 2f2c2c4b82ef..f986e802f1ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -587,19 +587,11 @@ public class BigQueryIO { private static final String DATASET_REGEXP = "[-\\w.]{1,1024}"; /** - * Regular expression that matches BigQuery Table IDs. - * Supports Unicode characters in categories: - * - L (letter) - * - M (mark) - * - N (number) - * As well as: - * - Underscore (_) - * - Dash (-) - * - Dollar sign ($) - * - At sign (@) - * - Space + * Regular expression that matches BigQuery Table IDs. Supports Unicode characters in categories: + * - L (letter) - M (mark) - N (number) As well as: - Underscore (_) - Dash (-) - Dollar sign ($) + * - At sign (@) - Space * - * The pattern requires 1-1024 characters matching these categories. + *

The pattern requires 1-1024 characters matching these categories. */ private static final String TABLE_REGEXP = "[-_\\p{L}\\p{N}\\p{M}$@ ]{1,1024}"; diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 4db4acf8b462..bc75aace9f77 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -29,8 +29,8 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify;