Skip to content

Commit

Permalink
Revert "feat(core): remove the execution state from the scheduler (#1588
Browse files Browse the repository at this point in the history
)"

This reverts commit f7d3d0b.
  • Loading branch information
loicmathieu committed Dec 13, 2024
1 parent 678205c commit dc7fef2
Show file tree
Hide file tree
Showing 15 changed files with 98 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ public class Trigger extends TriggerContext {
@Nullable
private String executionId;

@Nullable
private State.Type executionCurrentState;

@Nullable
private Instant updatedDate;

Expand All @@ -38,7 +35,6 @@ public class Trigger extends TriggerContext {
protected Trigger(TriggerBuilder<?, ?> b) {
super(b);
this.executionId = b.executionId;
this.executionCurrentState = b.executionCurrentState;
this.updatedDate = b.updatedDate;
this.evaluateRunningDate = b.evaluateRunningDate;
}
Expand Down Expand Up @@ -140,7 +136,6 @@ public static Trigger of(Execution execution, Trigger trigger) {
.date(trigger.getDate())
.nextExecutionDate(trigger.getNextExecutionDate())
.executionId(execution.getId())
.executionCurrentState(execution.getState().getCurrent())
.updatedDate(Instant.now())
.backfill(trigger.getBackfill())
.stopAfter(trigger.getStopAfter())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private final PluginDefaultService pluginDefaultService;
private final WorkerGroupService workerGroupService;
private final LogService logService;
protected SchedulerExecutionStateInterface executionState;

// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
private volatile Boolean isReady = false;
Expand Down Expand Up @@ -591,8 +592,10 @@ private boolean isExecutionNotRunning(FlowWithWorkerTrigger f) {
return true;
}

// The execution is not yet started, we skip
if (lastTrigger.getExecutionCurrentState() == null) {
Optional<Execution> execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId());

// executionState hasn't received the execution, we skip
if (execution.isEmpty()) {
if (lastTrigger.getUpdatedDate() != null) {
metricRegistry
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
Expand All @@ -619,14 +622,18 @@ private boolean isExecutionNotRunning(FlowWithWorkerTrigger f) {
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
}

// TODO if we set the state in the trigger after it has been started we can avoid getting the execution and
// check that if an executionId but no state, this means the execution is not started
// we need to have {@code lastTrigger.getExecutionId() == null} to be tell the execution is not running.
// the scheduler will clean the execution from the trigger and we don't keep only terminated state as an end.
if (log.isDebugEnabled()) {
logService.logTrigger(
f.getTriggerContext(),
log,
Level.DEBUG,
"Execution '{}' is still '{}', updated at '{}'",
lastTrigger.getExecutionId(),
lastTrigger.getExecutionCurrentState(),
execution.get().getState().getCurrent(),
lastTrigger.getUpdatedDate()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ public class DefaultScheduler extends AbstractScheduler {
public DefaultScheduler(
ApplicationContext applicationContext,
FlowListenersInterface flowListeners,
SchedulerExecutionStateInterface executionState,
SchedulerTriggerStateInterface triggerState
) {
super(applicationContext, flowListeners);
this.triggerState = triggerState;
this.executionState = executionState;

this.conditionService = applicationContext.getBean(ConditionService.class);
this.flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.kestra.core.schedulers;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;

import java.util.Optional;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
@Inject
private ExecutionRepositoryInterface executionRepository;

@Override
public Optional<Execution> findById(String tenantId, String id) {
return executionRepository.findById(tenantId, id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.kestra.core.schedulers;

import io.kestra.core.models.executions.Execution;

import java.util.Optional;

public interface SchedulerExecutionStateInterface {
Optional<Execution> findById(String tenantId, String id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.kestra.core.models.triggers.Trigger;
import io.kestra.plugin.core.trigger.Schedule;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
import io.kestra.core.runners.Worker;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
Expand All @@ -18,13 +17,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;

class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
Expand All @@ -33,6 +32,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;

@Inject
protected SchedulerExecutionStateInterface executionState;

private static Flow createScheduleFlow() {
Schedule schedule = Schedule.builder()
.id("hourly")
Expand All @@ -58,6 +60,7 @@ private static Flow createScheduleFlow() {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(4);

Flow flow = createScheduleFlow();
Expand All @@ -74,12 +77,22 @@ void schedule() throws Exception {
.when(flowListenersServiceSpy)
.flows();

// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(executionRepositorySpy)
.findById(any(), any());

// start the worker as it execute polling triggers
Worker worker = new Worker(applicationContext, 8, null);
worker.run();

// scheduler
try (AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
triggerState);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)) {
executionRepositorySpy,
triggerState
)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> {
Execution execution = either.getLeft();
Expand All @@ -96,8 +109,6 @@ void schedule() throws Exception {

scheduler.run();
queueCount.await(15, TimeUnit.SECONDS);
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
assertionStop.run();

assertThat(queueCount.getCount(), is(0L));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
@Inject
private SchedulerTriggerStateInterface triggerState;

@Inject
private SchedulerExecutionState schedulerExecutionState;

@Inject
private FlowListeners flowListenersService;

Expand Down Expand Up @@ -188,6 +191,7 @@ private AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
return new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
schedulerExecutionState,
triggerState
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;

@Inject
protected SchedulerExecutionStateInterface executionState;

@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
protected QueueInterface<LogEntry> logQueue;
Expand Down Expand Up @@ -62,10 +65,11 @@ private ZonedDateTime date(int minus) {
.truncatedTo(ChronoUnit.HOURS);
}

protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
executionStateSpy,
triggerState
);
}
Expand All @@ -75,6 +79,7 @@ protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(6);
CountDownLatch invalidLogCount = new CountDownLatch(1);
Set<String> date = new HashSet<>();
Expand Down Expand Up @@ -109,7 +114,7 @@ void schedule() throws Exception {
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
Expand Down Expand Up @@ -169,7 +174,7 @@ void retroSchedule() throws Exception {
triggerState.create(trigger);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();

Await.until(() -> {
Expand Down Expand Up @@ -203,7 +208,7 @@ void recoverALLMissing() throws Exception {
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
Expand Down Expand Up @@ -249,7 +254,7 @@ void recoverLASTMissing() throws Exception {
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
Expand Down Expand Up @@ -293,7 +298,7 @@ void recoverNONEMissing() throws Exception {
triggerState.create(lastTrigger);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();

Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
Expand Down Expand Up @@ -324,7 +329,7 @@ void backfill() throws Exception {
.build();

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();

Await.until(() -> {
Expand Down Expand Up @@ -389,7 +394,7 @@ void disabled() throws Exception {
triggerState.create(trigger);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();

// Wait 3s to see if things happen
Expand Down Expand Up @@ -427,7 +432,7 @@ void stopAfterSchedule() throws Exception {
CountDownLatch queueCount = new CountDownLatch(2);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
Expand Down Expand Up @@ -488,7 +493,7 @@ void failedEvaluationTest() {
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;

public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
Expand All @@ -32,6 +32,9 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;

@Inject
protected SchedulerExecutionStateInterface executionState;

public static Flow createThreadFlow() {
return createThreadFlow(null);
}
Expand Down Expand Up @@ -72,17 +75,23 @@ void thread() throws Exception {

// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);

SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);

doReturn(Collections.singletonList(flow))
.when(flowListenersServiceSpy)
.flows();

// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(schedulerExecutionStateSpy)
.findById(any(), any());

// scheduler
try (
AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
schedulerExecutionStateSpy,
triggerState
);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;

class H2SchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;

class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
Expand Down
Loading

0 comments on commit dc7fef2

Please sign in to comment.