Skip to content

Commit

Permalink
feat(core,jdbc): small trigger / scheduler improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Dec 10, 2024
1 parent 364c74d commit 70dd343
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,6 @@ private void handle() {
)
.build()
)
.peek(f -> {
if (f.getTriggerContext().getEvaluateRunningDate() != null || !isExecutionNotRunning(f)) {
this.triggerState.unlock(f.getTriggerContext());
}
})
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
.filter(this::isExecutionNotRunning)
.map(FlowWithWorkerTriggerNextDate::of)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
package io.kestra.core.schedulers;

import java.util.function.Consumer;

/**
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
* See AbstractScheduler.handle().
*/
public interface ScheduleContextInterface {
/**
* Do trigger retrieval and updating in a single transaction.
*/
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ public interface SchedulerTriggerStateInterface {

Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;

List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);

/**
* Required for Kafka
* Used by the JDBC implementation: find triggers in all tenants.
*/
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<FlowWithSource> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);

/**
* Required for Kafka
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
*/
void unlock(Trigger trigger);
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<FlowWithSource> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void run() {
public void handleNext(List<FlowWithSource> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);

schedulerContext.startTransaction(scheduleContextInterface -> {
schedulerContext.doInTransaction(scheduleContextInterface -> {
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);

consumer.accept(triggers, scheduleContextInterface);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@ public JdbcSchedulerContext(JooqDSLContextWrapper dslContextWrapper) {
this.dslContextWrapper = dslContextWrapper;
}

public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
@Override
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
this.dslContextWrapper.transaction(configuration -> {
this.context = DSL.using(configuration);

consumer.accept(this);

this.commit();
this.context.commit();
});
}

public void commit() {
this.context.commit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,4 @@ public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<FlowWithSource> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
throw new NotImplementedException();
}

@Override
public void unlock(Trigger trigger) {}
}

0 comments on commit 70dd343

Please sign in to comment.