Skip to content

Commit

Permalink
fix side-input existence deadline (#34046)
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax authored Feb 21, 2025
1 parent cf3c39c commit 5ca14c1
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void process(ProcessContext c) {}

// And digging to check whether the window is ready
when(mockEvaluationContext.createSideInputReader(anyList())).thenReturn(mockSideInputReader);
when(mockSideInputReader.isReady(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(false);
when(mockSideInputReader.isReady(ArgumentMatchers.any(), ArgumentMatchers.any()))
.thenReturn(false);

IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData;
Expand All @@ -48,6 +49,8 @@
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Ordering;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -207,6 +210,11 @@ private <T, SideWindowT extends BoundedWindow> GlobalData fetchGlobalDataFromWin
ByteStringOutputStream windowStream = new ByteStringOutputStream();
windowCoder.encode(sideWindow, windowStream);

Instant firingDeadline =
Ordering.natural()
.min(
sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideWindow),
LateDataUtils.garbageCollectionTime(sideWindow, sideWindowStrategy));
GlobalDataRequest request =
GlobalDataRequest.newBuilder()
.setDataId(
Expand All @@ -216,8 +224,7 @@ private <T, SideWindowT extends BoundedWindow> GlobalData fetchGlobalDataFromWin
.build())
.setStateFamily(stateFamily)
.setExistenceWatermarkDeadline(
WindmillTimeUtils.harnessToWindmillTimestamp(
sideWindowStrategy.getTrigger().getWatermarkThatGuaranteesFiring(sideWindow)))
WindmillTimeUtils.harnessToWindmillTimestamp(firingDeadline))
.build();

try (Closeable ignored = scopedReadStateSupplier.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ public void testPartialGroupByKeyWithCombinerAndSideInputs() throws Exception {
ImmutableList.of(
WindowedValue.valueInGlobalWindow(KV.of("hi", 4)),
WindowedValue.valueInGlobalWindow(KV.of("there", 5))));
when(mockSideInputFetcher.storeIfBlocked(ArgumentMatchers.<WindowedValue<KV<String, Integer>>>any()))
when(mockSideInputFetcher.storeIfBlocked(
ArgumentMatchers.<WindowedValue<KV<String, Integer>>>any()))
.thenReturn(false, false, false, true);

pgbkParDoFn.startBundle(receiver);
Expand Down Expand Up @@ -360,7 +361,8 @@ public void testCreateWithCombinerAndStreamingSideInputs() throws Exception {

when(mockSideInputReader.isEmpty()).thenReturn(false);
when(mockStreamingStepContext.stateInternals()).thenReturn((StateInternals) mockStateInternals);
when(mockStateInternals.state(ArgumentMatchers.<StateNamespace>any(), ArgumentMatchers.<StateTag>any()))
when(mockStateInternals.state(
ArgumentMatchers.<StateNamespace>any(), ArgumentMatchers.<StateTag>any()))
.thenReturn(mockState);
when(mockState.read()).thenReturn(Maps.newHashMap());

Expand Down Expand Up @@ -394,7 +396,8 @@ public void testCoderSizeEstimationWithNonLazyObserver() throws Exception {
return null;
})
.when(mockCoder)
.registerByteSizeObserver(ArgumentMatchers.eq("apple"), ArgumentMatchers.<ElementByteSizeObserver>any());
.registerByteSizeObserver(
ArgumentMatchers.eq("apple"), ArgumentMatchers.<ElementByteSizeObserver>any());
CoderSizeEstimator<String> estimator = new CoderSizeEstimator(mockCoder);
assertEquals(5, estimator.estimateSize("apple"));
}
Expand All @@ -410,7 +413,8 @@ public void testCoderSizeEstimationWithLazyObserver() throws Exception {
return null;
})
.when(mockCoder)
.registerByteSizeObserver(ArgumentMatchers.eq("apple"), ArgumentMatchers.<ElementByteSizeObserver>any());
.registerByteSizeObserver(
ArgumentMatchers.eq("apple"), ArgumentMatchers.<ElementByteSizeObserver>any());

// Encode the input to the output stream
doAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ public RemoteEnvironment createEnvironment(Environment environment, String worke
String containerId = null;
InstructionRequestHandler instructionHandler = null;
try {
LOG.info("Running Docker command: image={}, opts={}, args={}", containerImage, dockerOptsBuilder.build(), argsBuilder.build());
LOG.info(
"Running Docker command: image={}, opts={}, args={}",
containerImage,
dockerOptsBuilder.build(),
argsBuilder.build());
containerId = docker.runImage(containerImage, dockerOptsBuilder.build(), argsBuilder.build());
LOG.debug("Created Docker Container with Container ID {}", containerId);
// Wait on a client from the gRPC server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,19 +587,11 @@ public class BigQueryIO {
private static final String DATASET_REGEXP = "[-\\w.]{1,1024}";

/**
* Regular expression that matches BigQuery Table IDs.
* Supports Unicode characters in categories:
* - L (letter)
* - M (mark)
* - N (number)
* As well as:
* - Underscore (_)
* - Dash (-)
* - Dollar sign ($)
* - At sign (@)
* - Space
* Regular expression that matches BigQuery Table IDs. Supports Unicode characters in categories:
* - L (letter) - M (mark) - N (number) As well as: - Underscore (_) - Dash (-) - Dollar sign ($)
* - At sign (@) - Space
*
* The pattern requires 1-1024 characters matching these categories.
* <p>The pattern requires 1-1024 characters matching these categories.
*/
private static final String TABLE_REGEXP = "[-_\\p{L}\\p{N}\\p{M}$@ ]{1,1024}";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down

0 comments on commit 5ca14c1

Please sign in to comment.