From 822ac6495bf38756ab34b40dacf3a9bbee2c59e3 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Thu, 30 Jan 2025 23:40:04 +0800 Subject: [PATCH] [FLINK-37206][Runtime] Fix initialization of batching timer service in async state operators (#26071) --- .../AbstractAsyncStateStreamOperator.java | 8 +- .../AbstractAsyncStateStreamOperatorV2.java | 8 +- .../AsyncKeyedStateBackendAdaptor.java | 2 - .../BatchExecutionInternalTimeService.java | 14 +- ...chExecutionInternalTimeServiceManager.java | 42 +- ...tionInternalTimeServiceWithAsyncState.java | 87 +++++ ...InternalTimeServiceWithAsyncStateTest.java | 369 ++++++++++++++++++ 7 files changed, 509 insertions(+), 21 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncState.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncStateTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 15191551fb3f5..94dc17a901632 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceWithAsyncState; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing; import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; @@ -307,7 +308,12 @@ public InternalTimerService getInternalTimerService( InternalTimerService service = keyedTimeServiceHandler.getInternalTimerService( name, keySerializer, namespaceSerializer, triggerable); - ((InternalTimerServiceAsyncImpl) service).setup(asyncExecutionController); + if (service instanceof InternalTimerServiceAsyncImpl) { + ((InternalTimerServiceAsyncImpl) service).setup(asyncExecutionController); + } else if (service instanceof BatchExecutionInternalTimeServiceWithAsyncState) { + ((BatchExecutionInternalTimeServiceWithAsyncState) service) + .setup(asyncExecutionController); + } return service; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 407dd1778177e..0036055eb6a95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceWithAsyncState; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing; import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; @@ -286,7 +287,12 @@ public InternalTimerService getInternalTimerService( InternalTimerService service = keyedTimeServiceHandler.getInternalTimerService( name, keySerializer, namespaceSerializer, triggerable); - ((InternalTimerServiceAsyncImpl) service).setup(asyncExecutionController); + if (service instanceof InternalTimerServiceAsyncImpl) { + ((InternalTimerServiceAsyncImpl) service).setup(asyncExecutionController); + } else if (service instanceof BatchExecutionInternalTimeServiceWithAsyncState) { + ((BatchExecutionInternalTimeServiceWithAsyncState) service) + .setup(asyncExecutionController); + } return service; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java index 87cffb259f774..9604b0656a998 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.state.v2.adaptor; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.InternalCheckpointListener; import org.apache.flink.api.common.state.v2.State; @@ -199,7 +198,6 @@ public boolean isSafeToReuseKVState() { return keyedStateBackend.isSafeToReuseKVState(); } - @VisibleForTesting public CheckpointableKeyedStateBackend getKeyedStateBackend() { return keyedStateBackend; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java index de590ffc4e2d4..771afecd2ba48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java @@ -40,25 +40,23 @@ public class BatchExecutionInternalTimeService implements InternalTimerSer private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionInternalTimeService.class); - private final ProcessingTimeService processingTimeService; + final ProcessingTimeService processingTimeService; /** Processing time timers that are currently in-flight. */ - private final KeyGroupedInternalPriorityQueue> - processingTimeTimersQueue; + final KeyGroupedInternalPriorityQueue> processingTimeTimersQueue; /** Event time timers that are currently in-flight. */ - private final KeyGroupedInternalPriorityQueue> - eventTimeTimersQueue; + final KeyGroupedInternalPriorityQueue> eventTimeTimersQueue; /** * The local event time, as denoted by the last received {@link * org.apache.flink.streaming.api.watermark.Watermark Watermark}. */ - private long currentWatermark = Long.MIN_VALUE; + long currentWatermark = Long.MIN_VALUE; - private final Triggerable triggerTarget; + final Triggerable triggerTarget; - private K currentKey; + K currentKey; BatchExecutionInternalTimeService( ProcessingTimeService processingTimeService, Triggerable triggerTarget) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java index 5d027671b4302..228173316317b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.runtime.state.PriorityQueueSetFactory; +import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.KeyContext; @@ -38,7 +39,6 @@ import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * An implementation of a {@link InternalTimeServiceManager} that manages timers with a single @@ -51,8 +51,14 @@ public class BatchExecutionInternalTimeServiceManager private final Map> timerServices = new HashMap<>(); - public BatchExecutionInternalTimeServiceManager(ProcessingTimeService processingTimeService) { + // In batch mode, there is a chance that the operator is {@link AsyncStateProcessing} and we + // should perform correctly when the timer fires. + private final boolean asyncStateProcessingMode; + + public BatchExecutionInternalTimeServiceManager( + ProcessingTimeService processingTimeService, boolean asyncStateProcessingMode) { this.processingTimeService = checkNotNull(processingTimeService); + this.asyncStateProcessingMode = asyncStateProcessingMode; } @Override @@ -66,7 +72,11 @@ public InternalTimerService getInternalTimerService( (BatchExecutionInternalTimeService) timerServices.get(name); if (timerService == null) { timerService = - new BatchExecutionInternalTimeService<>(processingTimeService, triggerable); + asyncStateProcessingMode + ? new BatchExecutionInternalTimeServiceWithAsyncState<>( + processingTimeService, triggerable) + : new BatchExecutionInternalTimeService<>( + processingTimeService, triggerable); timerServices.put(name, timerService); } @@ -93,6 +103,7 @@ public void snapshotToRawKeyedState( throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution"); } + @SuppressWarnings("unchecked") public static InternalTimeServiceManager create( TaskIOMetricGroup taskIOMetricGroup, PriorityQueueSetFactory factory, @@ -102,14 +113,27 @@ public static InternalTimeServiceManager create( ProcessingTimeService processingTimeService, Iterable rawKeyedStates, StreamTaskCancellationContext cancellationContext) { - checkState( - factory instanceof BatchExecutionKeyedStateBackend, - "Batch execution specific time service can work only with BatchExecutionKeyedStateBackend"); + BatchExecutionKeyedStateBackend theFactory = null; + boolean asyncStateProcessingMode = false; + if (factory instanceof BatchExecutionKeyedStateBackend) { + theFactory = (BatchExecutionKeyedStateBackend) factory; + } else if (factory instanceof AsyncKeyedStateBackendAdaptor) { + KeyedStateBackend keyedStateBackend = + ((AsyncKeyedStateBackendAdaptor) factory).getKeyedStateBackend(); + if (keyedStateBackend instanceof BatchExecutionKeyedStateBackend) { + theFactory = (BatchExecutionKeyedStateBackend) keyedStateBackend; + asyncStateProcessingMode = true; + } + } + if (theFactory == null) { + throw new IllegalStateException( + "Batch execution specific time service can work only with BatchExecutionKeyedStateBackend"); + } BatchExecutionInternalTimeServiceManager timeServiceManager = - new BatchExecutionInternalTimeServiceManager<>(processingTimeService); - ((BatchExecutionKeyedStateBackend) factory) - .registerKeySelectionListener(timeServiceManager); + new BatchExecutionInternalTimeServiceManager<>( + processingTimeService, asyncStateProcessingMode); + theFactory.registerKeySelectionListener(timeServiceManager); return timeServiceManager; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncState.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncState.java new file mode 100644 index 0000000000000..da24a9ed804c0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncState.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sorted.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * An implementation of a {@link InternalTimerService} that manages timers with a single active key + * at a time. Can be used in a BATCH execution mode cooperating with async state operators. + */ +@Internal +public class BatchExecutionInternalTimeServiceWithAsyncState + extends BatchExecutionInternalTimeService { + + private AsyncExecutionController asyncExecutionController; + + BatchExecutionInternalTimeServiceWithAsyncState( + ProcessingTimeService processingTimeService, Triggerable triggerTarget) { + super(processingTimeService, triggerTarget); + } + + /** Set up the async execution controller. */ + public void setup(AsyncExecutionController asyncExecutionController) { + if (asyncExecutionController != null) { + this.asyncExecutionController = asyncExecutionController; + } + } + + /** + * Sets the current key. Timers that are due to be fired are collected and will be triggered. + */ + @Override + public void setCurrentKey(K currentKey) throws Exception { + if (currentKey != null && currentKey.equals(this.currentKey)) { + return; + } + currentWatermark = Long.MAX_VALUE; + InternalTimer timer; + while ((timer = eventTimeTimersQueue.poll()) != null) { + final InternalTimer timerToTrigger = timer; + maintainContextAndProcess( + timerToTrigger, () -> triggerTarget.onEventTime(timerToTrigger)); + } + while ((timer = processingTimeTimersQueue.poll()) != null) { + final InternalTimer timerToTrigger = timer; + maintainContextAndProcess( + timerToTrigger, () -> triggerTarget.onProcessingTime(timerToTrigger)); + } + currentWatermark = Long.MIN_VALUE; + this.currentKey = currentKey; + } + + private void maintainContextAndProcess( + InternalTimer timer, ThrowingRunnable runnable) { + // Since we are in middle of processing a record, we need to maintain the context. + final RecordContext previousContext = asyncExecutionController.getCurrentContext(); + RecordContext recordCtx = asyncExecutionController.buildContext(timer, timer.getKey()); + recordCtx.retain(); + asyncExecutionController.setCurrentContext(recordCtx); + asyncExecutionController.syncPointRequestWithCallback(runnable, true); + recordCtx.release(); + asyncExecutionController.setCurrentContext(previousContext); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncStateTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncStateTest.java new file mode 100644 index 0000000000000..18e94316cfce9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceWithAsyncStateTest.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sorted.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.MockStateExecutor; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; +import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; + +/** + * Tests for {@link BatchExecutionInternalTimeServiceManager} and {@link + * BatchExecutionInternalTimeServiceWithAsyncState}. + */ +class BatchExecutionInternalTimeServiceWithAsyncStateTest { + public static final IntSerializer KEY_SERIALIZER = new IntSerializer(); + + BatchExecutionKeyedStateBackend keyedStatedBackend; + InternalTimeServiceManager timeServiceManager; + TestProcessingTimeService processingTimeService; + AsyncExecutionController aec; + + @BeforeEach + public void setup() { + keyedStatedBackend = + new BatchExecutionKeyedStateBackend<>( + KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig()); + processingTimeService = new TestProcessingTimeService(); + aec = + new AsyncExecutionController<>( + new SyncMailboxExecutor(), + (a, b) -> {}, + new MockStateExecutor(), + new DeclarationManager(), + 1, + 100, + 1000, + 1, + null, + null); + timeServiceManager = + BatchExecutionInternalTimeServiceManager.create( + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup() + .getIOMetricGroup(), + new AsyncKeyedStateBackendAdaptor<>(keyedStatedBackend), + null, + this.getClass().getClassLoader(), + new DummyKeyContext(), + processingTimeService, + Collections.emptyList(), + StreamTaskCancellationContext.alwaysRunning()); + } + + @Test + void testForEachEventTimeTimerUnsupported() { + BatchExecutionInternalTimeServiceWithAsyncState timeService = + new BatchExecutionInternalTimeServiceWithAsyncState<>( + new TestProcessingTimeService(), + LambdaTrigger.eventTimeTrigger(timer -> {})); + + assertThatThrownBy( + () -> + timeService.forEachEventTimeTimer( + (o, aLong) -> + fail( + "The forEachEventTimeTimer() should not be supported"))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "The BatchExecutionInternalTimeService should not be used in State Processor API"); + } + + @Test + void testForEachProcessingTimeTimerUnsupported() { + BatchExecutionInternalTimeServiceWithAsyncState timeService = + new BatchExecutionInternalTimeServiceWithAsyncState<>( + new TestProcessingTimeService(), + LambdaTrigger.eventTimeTrigger(timer -> {})); + + assertThatThrownBy( + () -> + timeService.forEachEventTimeTimer( + (o, aLong) -> + fail( + "The forEachProcessingTimeTimer() should not be supported"))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "The BatchExecutionInternalTimeService should not be used in State Processor API"); + } + + @Test + void testFiringEventTimeTimers() throws Exception { + List timers = new ArrayList<>(); + InternalTimerService timerService = + buildTimerService( + LambdaTrigger.eventTimeTrigger(timer -> timers.add(timer.getTimestamp()))); + + keyedStatedBackend.setCurrentKey(1); + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 123); + + // advancing the watermark should not fire timers + timeServiceManager.advanceWatermark(new Watermark(1000)); + timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, 123); + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 150); + + // changing the current key fires all timers + keyedStatedBackend.setCurrentKey(2); + + assertThat(timers).containsExactly(150L); + } + + @Test + void testSettingSameKeyDoesNotFireTimers() { + List timers = new ArrayList<>(); + InternalTimerService timerService = + buildTimerService( + LambdaTrigger.eventTimeTrigger(timer -> timers.add(timer.getTimestamp()))); + + keyedStatedBackend.setCurrentKey(1); + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 123); + keyedStatedBackend.setCurrentKey(1); + + assertThat(timers).isEmpty(); + } + + @Test + void testCurrentWatermark() throws Exception { + List timers = new ArrayList<>(); + TriggerWithTimerServiceAccess eventTimeTrigger = + TriggerWithTimerServiceAccess.eventTimeTrigger( + (timer, timerService) -> { + assertThat(timerService.currentWatermark()).isEqualTo(Long.MAX_VALUE); + timers.add(timer.getTimestamp()); + }); + InternalTimerService timerService = buildTimerService(eventTimeTrigger); + eventTimeTrigger.setTimerService(timerService); + + assertThat(timerService.currentWatermark()).isEqualTo(Long.MIN_VALUE); + keyedStatedBackend.setCurrentKey(1); + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 123); + assertThat(timerService.currentWatermark()).isEqualTo(Long.MIN_VALUE); + + // advancing the watermark to a value different than Long.MAX_VALUE should have no effect + timeServiceManager.advanceWatermark(new Watermark(1000)); + assertThat(timerService.currentWatermark()).isEqualTo(Long.MIN_VALUE); + + // changing the current key fires all timers + keyedStatedBackend.setCurrentKey(2); + assertThat(timerService.currentWatermark()).isEqualTo(Long.MIN_VALUE); + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 124); + + // advancing the watermark to Long.MAX_VALUE should fire remaining key + timeServiceManager.advanceWatermark(Watermark.MAX_WATERMARK); + + assertThat(timers).containsExactly(123L, 124L); + } + + @Test + void testProcessingTimeTimers() { + List timers = new ArrayList<>(); + InternalTimerService timerService = + buildTimerService( + LambdaTrigger.processingTimeTrigger( + timer -> timers.add(timer.getTimestamp()))); + + keyedStatedBackend.setCurrentKey(1); + timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, 150); + + // we should never register physical timers + assertThat(processingTimeService.getNumActiveTimers()).isZero(); + // changing the current key fires all timers + keyedStatedBackend.setCurrentKey(2); + + assertThat(timers).containsExactly(150L); + } + + @Test + void testIgnoringEventTimeTimersFromWithinCallback() { + List timers = new ArrayList<>(); + TriggerWithTimerServiceAccess trigger = + TriggerWithTimerServiceAccess.eventTimeTrigger( + (timer, ts) -> { + timers.add(timer.getTimestamp()); + ts.registerEventTimeTimer( + VoidNamespace.INSTANCE, timer.getTimestamp() + 20); + }); + InternalTimerService timerService = buildTimerService(trigger); + trigger.setTimerService(timerService); + + keyedStatedBackend.setCurrentKey(1); + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 150); + + // we should never register physical timers + assertThat(processingTimeService.getNumActiveTimers()).isZero(); + // changing the current key fires all timers + keyedStatedBackend.setCurrentKey(2); + + // We check that the timer from the callback is ignored + assertThat(timers).containsExactly(150L); + } + + @Test + void testIgnoringProcessingTimeTimersFromWithinCallback() { + List timers = new ArrayList<>(); + TriggerWithTimerServiceAccess trigger = + TriggerWithTimerServiceAccess.processingTimeTrigger( + (timer, ts) -> { + timers.add(timer.getTimestamp()); + ts.registerProcessingTimeTimer( + VoidNamespace.INSTANCE, timer.getTimestamp() + 20); + }); + InternalTimerService timerService = buildTimerService(trigger); + trigger.setTimerService(timerService); + + keyedStatedBackend.setCurrentKey(1); + timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, 150); + + // we should never register physical timers + assertThat(processingTimeService.getNumActiveTimers()).isZero(); + // changing the current key fires all timers + keyedStatedBackend.setCurrentKey(2); + + // We check that the timer from the callback is ignored + assertThat(timers).containsExactly(150L); + } + + private InternalTimerService buildTimerService( + Triggerable trigger) { + InternalTimerService timerService = + timeServiceManager.getInternalTimerService( + "test", KEY_SERIALIZER, new VoidNamespaceSerializer(), trigger); + ((BatchExecutionInternalTimeServiceWithAsyncState) timerService) + .setup(aec); + return timerService; + } + + private static class TriggerWithTimerServiceAccess implements Triggerable { + + private InternalTimerService timerService; + private final BiConsumer, InternalTimerService> eventTimeHandler; + private final BiConsumer, InternalTimerService> + processingTimeHandler; + + private TriggerWithTimerServiceAccess( + BiConsumer, InternalTimerService> eventTimeHandler, + BiConsumer, InternalTimerService> processingTimeHandler) { + this.eventTimeHandler = eventTimeHandler; + this.processingTimeHandler = processingTimeHandler; + } + + public static TriggerWithTimerServiceAccess eventTimeTrigger( + BiConsumer, InternalTimerService> eventTimeHandler) { + return new TriggerWithTimerServiceAccess<>( + eventTimeHandler, + (timer, timeService) -> + fail("We did not expect processing timer to be triggered.")); + } + + public static TriggerWithTimerServiceAccess processingTimeTrigger( + BiConsumer, InternalTimerService> processingTimeHandler) { + return new TriggerWithTimerServiceAccess<>( + (timer, timeService) -> fail("We did not expect event timer to be triggered."), + processingTimeHandler); + } + + public void setTimerService(InternalTimerService timerService) { + this.timerService = timerService; + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + this.eventTimeHandler.accept(timer, timerService); + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + this.processingTimeHandler.accept(timer, timerService); + } + } + + private static class LambdaTrigger implements Triggerable { + + private final Consumer> eventTimeHandler; + private final Consumer> processingTimeHandler; + + public static LambdaTrigger eventTimeTrigger( + Consumer> eventTimeHandler) { + return new LambdaTrigger<>( + eventTimeHandler, + timer -> fail("We did not expect processing timer to be triggered.")); + } + + public static LambdaTrigger processingTimeTrigger( + Consumer> processingTimeHandler) { + return new LambdaTrigger<>( + timer -> fail("We did not expect event timer to be triggered."), + processingTimeHandler); + } + + private LambdaTrigger( + Consumer> eventTimeHandler, + Consumer> processingTimeHandler) { + this.eventTimeHandler = eventTimeHandler; + this.processingTimeHandler = processingTimeHandler; + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + this.eventTimeHandler.accept(timer); + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + this.processingTimeHandler.accept(timer); + } + } + + private static class DummyKeyContext implements KeyContext { + @Override + public void setCurrentKey(Object key) {} + + @Override + public Object getCurrentKey() { + return null; + } + } +}