Skip to content

Commit

Permalink
Apply a fixed window before writing row metrics (#590)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidheryanto authored and khorshuheng committed Apr 15, 2020
1 parent 3c8b2fd commit 08cdfb4
Show file tree
Hide file tree
Showing 8 changed files with 413 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,14 @@ public void processElement(
ProcessContext context,
@Element KV<String, Iterable<FeatureRow>> featureSetRefToFeatureRows) {
if (statsDClient == null) {
log.error("StatsD client is null, likely because it encounters an error during setup");
return;
}

String featureSetRef = featureSetRefToFeatureRows.getKey();
if (featureSetRef == null) {
log.error(
"Feature set reference in the feature row is null. Please check the input feature rows from previous steps");
return;
}
String[] colonSplits = featureSetRef.split(":");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
Expand Down Expand Up @@ -73,52 +74,47 @@ public PDone expand(PCollectionTuple input) {
.setStoreName(getStoreName())
.build()));

input
.get(getSuccessTag())
.apply(
"WriteRowMetrics",
ParDo.of(
WriteRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));
// Fixed window is applied so the metric collector will not be overwhelmed with the metrics
// data. For validation, only summaries of the values are usually required vs the actual
// values.
PCollection<KV<String, Iterable<FeatureRow>>> validRowsGroupedByRef =
input
.get(getSuccessTag())
.apply(
"FixedWindow",
Window.into(
FixedWindows.of(
Duration.standardSeconds(
options.getWindowSizeInSecForFeatureValueMetric()))))
.apply(
"ConvertToKV_FeatureSetRefToFeatureRow",
ParDo.of(
new DoFn<FeatureRow, KV<String, FeatureRow>>() {
@ProcessElement
public void processElement(
ProcessContext c, @Element FeatureRow featureRow) {
c.output(KV.of(featureRow.getFeatureSet(), featureRow));
}
}))
.apply("GroupByFeatureSetRef", GroupByKey.create());

// 1. Apply a fixed window
// 2. Group feature row by feature set reference
// 3. Calculate min, max, mean, percentiles of numerical values of features in the window
// and
// 4. Send the aggregate value to StatsD metric collector.
//
// NOTE: window is applied here so the metric collector will not be overwhelmed with
// metrics data. And for metric data, only statistic of the values are usually required
// vs the actual values.
input
.get(getSuccessTag())
.apply(
"FixedWindow",
Window.into(
FixedWindows.of(
Duration.standardSeconds(
options.getWindowSizeInSecForFeatureValueMetric()))))
.apply(
"ConvertTo_FeatureSetRefToFeatureRow",
ParDo.of(
new DoFn<FeatureRow, KV<String, FeatureRow>>() {
@ProcessElement
public void processElement(ProcessContext c, @Element FeatureRow featureRow) {
c.output(KV.of(featureRow.getFeatureSet(), featureRow));
}
}))
.apply("GroupByFeatureSetRef", GroupByKey.create())
.apply(
"WriteFeatureValueMetrics",
ParDo.of(
WriteFeatureValueMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));
validRowsGroupedByRef.apply(
"WriteRowMetrics",
ParDo.of(
WriteRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));

validRowsGroupedByRef.apply(
"WriteFeatureValueMetrics",
ParDo.of(
WriteFeatureValueMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));

return PDone.in(input.getPipeline());
case "none":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,26 @@
package feast.ingestion.transform.metrics;

import com.google.auto.value.AutoValue;
import com.google.protobuf.util.Timestamps;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;
import com.timgroup.statsd.StatsDClientException;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FieldProto.Field;
import feast.types.ValueProto.Value;
import feast.types.ValueProto.Value.ValCase;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.slf4j.Logger;

@AutoValue
public abstract class WriteRowMetricsDoFn extends DoFn<FeatureRow, Void> {
public abstract class WriteRowMetricsDoFn extends DoFn<KV<String, Iterable<FeatureRow>>, Void> {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class);

Expand All @@ -39,12 +48,38 @@ public abstract class WriteRowMetricsDoFn extends DoFn<FeatureRow, Void> {
public static final String FEATURE_TAG_KEY = "feast_feature_name";
public static final String INGESTION_JOB_NAME_KEY = "ingestion_job_name";

public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MIN = "feature_row_lag_ms_min";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MAX = "feature_row_lag_ms_max";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MEAN = "feature_row_lag_ms_mean";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_90 =
"feature_row_lag_ms_percentile_90";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_95 =
"feature_row_lag_ms_percentile_95";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_99 =
"feature_row_lag_ms_percentile_99";

public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_MIN = "feature_value_lag_ms_min";
public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_MAX = "feature_value_lag_ms_max";
public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_MEAN = "feature_value_lag_ms_mean";
public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_90 =
"feature_value_lag_ms_percentile_90";
public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_95 =
"feature_value_lag_ms_percentile_95";
public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_99 =
"feature_value_lag_ms_percentile_99";

public static final String COUNT_NAME_FEATURE_ROW_INGESTED = "feature_row_ingested_count";
public static final String COUNT_NAME_FEATURE_VALUE_MISSING = "feature_value_missing_count";

public abstract String getStoreName();

public abstract String getStatsdHost();

public abstract int getStatsdPort();

@Nullable
public abstract Clock getClock();

public static WriteRowMetricsDoFn create(
String newStoreName, String newStatsdHost, int newStatsdPort) {
return newBuilder()
Expand All @@ -69,79 +104,147 @@ public abstract static class Builder {

public abstract Builder setStatsdPort(int statsdPort);

/**
* setClock will override the default system clock used to calculate feature row lag.
*
* @param clock Clock instance
*/
public abstract Builder setClock(Clock clock);

public abstract WriteRowMetricsDoFn build();
}

@Setup
public void setup() {
statsd = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort());
// Note that exception may be thrown during StatsD client instantiation but no exception
// will be thrown when sending metrics (mimicking the UDP protocol behaviour).
// https://jar-download.com/artifacts/com.datadoghq/java-dogstatsd-client/2.1.1/documentation
// https://github.com/DataDog/java-dogstatsd-client#unix-domain-socket-support
try {
statsd = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort());
} catch (Exception e) {
log.error("StatsD client cannot be started: " + e.getMessage());
}
}

@SuppressWarnings("DuplicatedCode")
@ProcessElement
public void processElement(ProcessContext c) {
public void processElement(
ProcessContext c, @Element KV<String, Iterable<FeatureRow>> featureSetRefToFeatureRows) {
if (statsd == null) {
log.error("StatsD client is null, likely because it encounters an error during setup");
return;
}

try {
FeatureRow row = c.element();
long eventTimestamp = com.google.protobuf.util.Timestamps.toMillis(row.getEventTimestamp());

String[] split = row.getFeatureSet().split(":");
String featureSetProject = split[0].split("/")[0];
String featureSetName = split[0].split("/")[1];
String featureSetVersion = split[1];

statsd.histogram(
"feature_row_lag_ms",
System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

statsd.histogram(
"feature_row_event_time_epoch_ms",
eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

for (Field field : row.getFieldsList()) {
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
statsd.histogram(
"feature_value_lag_ms",
System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
FEATURE_TAG_KEY + ":" + field.getName(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
String featureSetRef = featureSetRefToFeatureRows.getKey();
if (featureSetRef == null) {
log.error(
"Feature set reference in the feature row is null. Please check the input feature rows from previous steps");
return;
}
String[] colonSplits = featureSetRef.split(":");
if (colonSplits.length != 2) {
log.error(
"Skip writing feature row metrics because the feature set reference '{}' does not"
+ "follow the required format <project>/<feature_set_name>:<version>",
featureSetRef);
return;
}
String[] slashSplits = colonSplits[0].split("/");
if (slashSplits.length != 2) {
log.error(
"Skip writing feature row metrics because the feature set reference '{}' does not"
+ "follow the required format <project>/<feature_set_name>:<version>",
featureSetRef);
return;
}

String featureSetProject = slashSplits[0];
String featureSetName = slashSplits[1];
String featureSetVersion = colonSplits[1];

// featureRowLagStats is stats for feature row lag for feature set "featureSetName"
DescriptiveStatistics featureRowLagStats = new DescriptiveStatistics();
// featureNameToLagStats is stats for feature lag for all features in feature set
// "featureSetName"
Map<String, DescriptiveStatistics> featureNameToLagStats = new HashMap<>();
// featureNameToMissingCount is count for "value_not_set" for all features in feature set
// "featureSetName"
Map<String, Long> featureNameToMissingCount = new HashMap<>();

for (FeatureRow featureRow : featureSetRefToFeatureRows.getValue()) {
long currentTime = getClock() == null ? System.currentTimeMillis() : getClock().millis();
long featureRowLag = currentTime - Timestamps.toMillis(featureRow.getEventTimestamp());
featureRowLagStats.addValue(featureRowLag);

for (Field field : featureRow.getFieldsList()) {
String featureName = field.getName();
Value featureValue = field.getValue();
if (!featureNameToLagStats.containsKey(featureName)) {
// Ensure map contains the "featureName" key
featureNameToLagStats.put(featureName, new DescriptiveStatistics());
}
if (!featureNameToMissingCount.containsKey(featureName)) {
// Ensure map contains the "featureName" key
featureNameToMissingCount.put(featureName, 0L);
}
if (featureValue.getValCase().equals(ValCase.VAL_NOT_SET)) {
featureNameToMissingCount.put(
featureName, featureNameToMissingCount.get(featureName) + 1);
} else {
statsd.count(
"feature_value_missing_count",
1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
FEATURE_TAG_KEY + ":" + field.getName(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
featureNameToLagStats.get(featureName).addValue(featureRowLag);
}
}
}

String[] tags = {
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName(),
};

statsd.count(COUNT_NAME_FEATURE_ROW_INGESTED, featureRowLagStats.getN(), tags);
// DescriptiveStatistics returns invalid NaN value for getMin(), getMax(), ... when there is no
// items in the stats.
if (featureRowLagStats.getN() > 0) {
statsd.gauge(GAUGE_NAME_FEATURE_ROW_LAG_MS_MIN, featureRowLagStats.getMin(), tags);
statsd.gauge(GAUGE_NAME_FEATURE_ROW_LAG_MS_MAX, featureRowLagStats.getMax(), tags);
statsd.gauge(GAUGE_NAME_FEATURE_ROW_LAG_MS_MEAN, featureRowLagStats.getMean(), tags);
statsd.gauge(
GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_90, featureRowLagStats.getPercentile(90), tags);
statsd.gauge(
GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_95, featureRowLagStats.getPercentile(95), tags);
statsd.gauge(
GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_99, featureRowLagStats.getPercentile(99), tags);
}

for (Entry<String, DescriptiveStatistics> entry : featureNameToLagStats.entrySet()) {
String featureName = entry.getKey();
String[] tagsWithFeatureName = ArrayUtils.add(tags, FEATURE_TAG_KEY + ":" + featureName);
DescriptiveStatistics stats = entry.getValue();
if (stats.getN() > 0) {
statsd.gauge(GAUGE_NAME_FEATURE_VALUE_LAG_MS_MIN, stats.getMin(), tagsWithFeatureName);
statsd.gauge(GAUGE_NAME_FEATURE_VALUE_LAG_MS_MAX, stats.getMax(), tagsWithFeatureName);
statsd.gauge(GAUGE_NAME_FEATURE_VALUE_LAG_MS_MEAN, stats.getMean(), tagsWithFeatureName);
statsd.gauge(
GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_90,
stats.getPercentile(90),
tagsWithFeatureName);
statsd.gauge(
GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_95,
stats.getPercentile(95),
tagsWithFeatureName);
statsd.gauge(
GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_99,
stats.getPercentile(99),
tagsWithFeatureName);
}
statsd.count(
"feature_row_ingested_count",
1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

} catch (StatsDClientException e) {
log.warn("Unable to push metrics to server", e);
COUNT_NAME_FEATURE_VALUE_MISSING,
featureNameToMissingCount.get(featureName),
tagsWithFeatureName);
}
}
}
Loading

0 comments on commit 08cdfb4

Please sign in to comment.