Skip to content

Commit

Permalink
Added observation API
Browse files Browse the repository at this point in the history
  • Loading branch information
marcingrzejszczak committed Feb 14, 2022
1 parent 1530d04 commit be4a3d8
Show file tree
Hide file tree
Showing 13 changed files with 493 additions and 18 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
<spring-retry.version>1.3.1</spring-retry.version>
<spring-integration.version>6.0.0-M1</spring-integration.version>
<micrometer.version>2.0.0-SNAPSHOT</micrometer.version>
<micrometer-tracing.version>1.0.0-SNAPSHOT</micrometer-tracing.version>
<jackson.version>2.13.1</jackson.version>

<!-- optional production dependencies -->
Expand Down
6 changes: 6 additions & 0 deletions spring-batch-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@
<version>${jakarta.inject-api.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-test</artifactId>
<version>${micrometer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

import io.micrometer.api.instrument.LongTaskTimer;
import io.micrometer.api.instrument.Tag;
import io.micrometer.api.instrument.Timer;
import io.micrometer.api.instrument.observation.Observation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
Expand Down Expand Up @@ -304,8 +306,9 @@ public final void execute(JobExecution execution) {
LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer("job.active", "Active jobs",
Tag.of("name", execution.getJobInstance().getJobName()));
LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start();
Timer.Sample timerSample = BatchMetrics.createTimerSample();
try {
Observation observation = BatchMetrics.createObservation(BatchJobObservation.BATCH_JOB_OBSERVATION.getName())
.contextualName(execution.getJobInstance().getJobName()).start();
try (Observation.Scope scope = observation.openScope()) {

jobParametersValidator.validate(execution.getJobParameters());

Expand Down Expand Up @@ -361,11 +364,7 @@ public final void execute(JobExecution execution) {
ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job.");
execution.setExitStatus(exitStatus.and(newExitStatus));
}

timerSample.stop(BatchMetrics.createTimer("job", "Job duration",
Tag.of("name", execution.getJobInstance().getJobName()),
Tag.of("status", execution.getExitStatus().getExitCode())
));
stopTaggedObservation(execution, observation);
longTaskTimerSample.stop();
execution.setEndTime(new Date());

Expand All @@ -384,6 +383,23 @@ public final void execute(JobExecution execution) {

}

private void stopTaggedObservation(JobExecution execution, Observation observation) {
observation.lowCardinalityTag(BatchJobObservation.JobLowCardinalityTags.JOB_NAME.of(execution.getJobInstance().getJobName()))
.lowCardinalityTag(BatchJobObservation.JobLowCardinalityTags.JOB_STATUS.of(execution.getExitStatus().getExitCode()))
.highCardinalityTag(BatchJobObservation.JobHighCardinalityTags.JOB_INSTANCE_ID.of(String.valueOf(execution.getJobInstance().getInstanceId())))
.highCardinalityTag(BatchJobObservation.JobHighCardinalityTags.JOB_EXECUTION_ID.of(String.valueOf(execution.getId())));
List<Throwable> throwables = execution.getFailureExceptions();
if (!throwables.isEmpty()) {
observation.error(mergedThrowables(throwables));
}
observation.stop();
}

private IllegalStateException mergedThrowables(List<Throwable> throwables) {
return new IllegalStateException(
throwables.stream().map(Throwable::toString).collect(Collectors.joining("\n")));
}

/**
* Convenience method for subclasses to delegate the handling of a specific
* step in the context of the current {@link JobExecution}. Clients of this
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.core.job;

import io.micrometer.api.instrument.docs.DocumentedObservation;
import io.micrometer.api.instrument.docs.TagKey;

enum BatchJobObservation implements DocumentedObservation {

/**
* Observation created around a Job execution.
*/
BATCH_JOB_OBSERVATION {
@Override
public String getName() {
return "spring.batch.job";
}

@Override
public String getContextualName() {
return "%s";
}

@Override
public TagKey[] getLowCardinalityTagKeys() {
return JobLowCardinalityTags.values();
}

@Override
public TagKey[] getHighCardinalityTagKeys() {
return JobHighCardinalityTags.values();
}

@Override
public String getPrefix() {
return "spring.batch";
}
};

enum JobLowCardinalityTags implements TagKey {

/**
* Name of the Spring Batch job.
*/
JOB_NAME {
@Override
public String getKey() {
return "spring.batch.job.name";
}
},

/**
* Job status.
*/
JOB_STATUS {
@Override
public String getKey() {
return "spring.batch.job.status";
}
}

}

enum JobHighCardinalityTags implements TagKey {

/**
* ID of the Spring Batch job instance.
*/
JOB_INSTANCE_ID {
@Override
public String getKey() {
return "spring.batch.job.instanceId";
}
},

/**
* ID of the Spring Batch execution.
*/
JOB_EXECUTION_ID {
@Override
public String getKey() {
return "spring.batch.job.executionId";
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import java.util.concurrent.TimeUnit;

import io.micrometer.api.instrument.LongTaskTimer;
import io.micrometer.api.instrument.MeterRegistry;
import io.micrometer.api.instrument.Metrics;
import io.micrometer.api.instrument.Tag;
import io.micrometer.api.instrument.Timer;
import io.micrometer.api.instrument.observation.Observation;
import io.micrometer.api.instrument.observation.TimerObservationHandler;
import io.micrometer.api.instrument.simple.SimpleMeterRegistry;

import org.springframework.lang.Nullable;

Expand Down Expand Up @@ -52,6 +56,14 @@ public final class BatchMetrics {

private BatchMetrics() {}

private static final MeterRegistry simpleMeterRegistry = new SimpleMeterRegistry().withTimerObservationHandler();

static {
// TODO: This shouldn't be necessary - we need to fix it in Micrometer
Metrics.globalRegistry.observationConfig().observationHandler(new TimerObservationHandler(Metrics.globalRegistry));
Metrics.globalRegistry.add(simpleMeterRegistry);
}

/**
* Create a {@link Timer}.
* @param name of the timer. Will be prefixed with {@link BatchMetrics#METRICS_PREFIX}.
Expand All @@ -66,6 +78,15 @@ public static Timer createTimer(String name, String description, Tag... tags) {
.register(Metrics.globalRegistry);
}

/**
* Create a new {@link Observation}. It's not started, you must
* explicitly call {@link Observation#start()} to start it.
* @return a new timer sample instance
*/
public static Observation createObservation(String name) {
return Observation.createNotStarted(name, Metrics.globalRegistry);
}

/**
* Create a new {@link Timer.Sample}.
* @return a new timer sample instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

import java.time.Duration;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

import io.micrometer.api.instrument.Tag;
import io.micrometer.api.instrument.Timer;
import io.micrometer.api.instrument.observation.Observation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
Expand Down Expand Up @@ -192,15 +193,16 @@ public final void execute(StepExecution stepExecution) throws JobInterruptedExce
}
stepExecution.setStartTime(new Date());
stepExecution.setStatus(BatchStatus.STARTED);
Timer.Sample sample = BatchMetrics.createTimerSample();
Observation observation = BatchMetrics.createObservation(BatchStepObservation.BATCH_STEP_OBSERVATION.getName())
.contextualName(stepExecution.getStepName()).start();
getJobRepository().update(stepExecution);

// Start with a default value that will be trumped by anything
ExitStatus exitStatus = ExitStatus.EXECUTING;

doExecutionRegistration(stepExecution);

try {
try (Observation.Scope scope = observation.openScope()) {
getCompositeListener().beforeStep(stepExecution);
open(stepExecution.getExecutionContext());

Expand Down Expand Up @@ -260,12 +262,7 @@ public final void execute(StepExecution stepExecution) throws JobInterruptedExce
logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
+ "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}

sample.stop(BatchMetrics.createTimer("step", "Step duration",
Tag.of("job.name", stepExecution.getJobExecution().getJobInstance().getJobName()),
Tag.of("name", stepExecution.getStepName()),
Tag.of("status", stepExecution.getExitStatus().getExitCode())
));
stopTaggedObservation(stepExecution, observation);
stepExecution.setEndTime(new Date());
stepExecution.setExitStatus(exitStatus);
Duration stepExecutionDuration = BatchMetrics.calculateDuration(stepExecution.getStartTime(), stepExecution.getEndTime());
Expand Down Expand Up @@ -299,6 +296,23 @@ public final void execute(StepExecution stepExecution) throws JobInterruptedExce
}
}

private void stopTaggedObservation(StepExecution stepExecution, Observation observation) {
observation.lowCardinalityTag(BatchStepObservation.StepLowCardinalityTags.STEP_NAME.of(stepExecution.getStepName()))
.lowCardinalityTag(BatchStepObservation.StepLowCardinalityTags.JOB_NAME.of(stepExecution.getJobExecution().getJobInstance().getJobName()))
.lowCardinalityTag(BatchStepObservation.StepLowCardinalityTags.STEP_STATUS.of(stepExecution.getExitStatus().getExitCode()))
.highCardinalityTag(BatchStepObservation.StepHighCardinalityTags.STEP_EXECUTION_ID.of(String.valueOf(stepExecution.getId())));
List<Throwable> throwables = stepExecution.getFailureExceptions();
if (!throwables.isEmpty()) {
observation.error(mergedThrowables(throwables));
}
observation.stop();
}

private IllegalStateException mergedThrowables(List<Throwable> throwables) {
return new IllegalStateException(
throwables.stream().map(Throwable::toString).collect(Collectors.joining("\n")));
}

/**
* Releases the most recent {@link StepExecution}
*/
Expand Down
Loading

0 comments on commit be4a3d8

Please sign in to comment.