From 08cdfb4a2d1694bed250897a6ea72a89bc14dc60 Mon Sep 17 00:00:00 2001 From: David Heryanto Date: Tue, 31 Mar 2020 15:30:44 +0800 Subject: [PATCH] Apply a fixed window before writing row metrics (#590) --- .../metrics/WriteFeatureValueMetricsDoFn.java | 3 + .../metrics/WriteMetricsTransform.java | 86 ++++--- .../metrics/WriteRowMetricsDoFn.java | 225 +++++++++++++----- .../WriteFeatureValueMetricsDoFnTest.java | 87 +++---- .../metrics/WriteRowMetricsDoFnTest.java | 88 +++++++ .../src/test/java/feast/test/TestUtil.java | 63 +++++ .../transform/WriteRowMetricsDoFnTest.input | 4 + .../transform/WriteRowMetricsDoFnTest.output | 23 ++ 8 files changed, 413 insertions(+), 166 deletions(-) create mode 100644 ingestion/src/test/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFnTest.java create mode 100644 ingestion/src/test/resources/feast/ingestion/transform/WriteRowMetricsDoFnTest.input create mode 100644 ingestion/src/test/resources/feast/ingestion/transform/WriteRowMetricsDoFnTest.output diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java index 8574d2414c..395fe0c7f9 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java @@ -119,11 +119,14 @@ public void processElement( ProcessContext context, @Element KV> featureSetRefToFeatureRows) { if (statsDClient == null) { + log.error("StatsD client is null, likely because it encounters an error during setup"); return; } String featureSetRef = featureSetRefToFeatureRows.getKey(); if (featureSetRef == null) { + log.error( + "Feature set reference in the feature row is null. Please check the input feature rows from previous steps"); return; } String[] colonSplits = featureSetRef.split(":"); diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java index 10322ac812..8a5869d78e 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; @@ -73,52 +74,47 @@ public PDone expand(PCollectionTuple input) { .setStoreName(getStoreName()) .build())); - input - .get(getSuccessTag()) - .apply( - "WriteRowMetrics", - ParDo.of( - WriteRowMetricsDoFn.newBuilder() - .setStatsdHost(options.getStatsdHost()) - .setStatsdPort(options.getStatsdPort()) - .setStoreName(getStoreName()) - .build())); + // Fixed window is applied so the metric collector will not be overwhelmed with the metrics + // data. For validation, only summaries of the values are usually required vs the actual + // values. + PCollection>> validRowsGroupedByRef = + input + .get(getSuccessTag()) + .apply( + "FixedWindow", + Window.into( + FixedWindows.of( + Duration.standardSeconds( + options.getWindowSizeInSecForFeatureValueMetric())))) + .apply( + "ConvertToKV_FeatureSetRefToFeatureRow", + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement( + ProcessContext c, @Element FeatureRow featureRow) { + c.output(KV.of(featureRow.getFeatureSet(), featureRow)); + } + })) + .apply("GroupByFeatureSetRef", GroupByKey.create()); - // 1. Apply a fixed window - // 2. Group feature row by feature set reference - // 3. Calculate min, max, mean, percentiles of numerical values of features in the window - // and - // 4. Send the aggregate value to StatsD metric collector. - // - // NOTE: window is applied here so the metric collector will not be overwhelmed with - // metrics data. And for metric data, only statistic of the values are usually required - // vs the actual values. - input - .get(getSuccessTag()) - .apply( - "FixedWindow", - Window.into( - FixedWindows.of( - Duration.standardSeconds( - options.getWindowSizeInSecForFeatureValueMetric())))) - .apply( - "ConvertTo_FeatureSetRefToFeatureRow", - ParDo.of( - new DoFn>() { - @ProcessElement - public void processElement(ProcessContext c, @Element FeatureRow featureRow) { - c.output(KV.of(featureRow.getFeatureSet(), featureRow)); - } - })) - .apply("GroupByFeatureSetRef", GroupByKey.create()) - .apply( - "WriteFeatureValueMetrics", - ParDo.of( - WriteFeatureValueMetricsDoFn.newBuilder() - .setStatsdHost(options.getStatsdHost()) - .setStatsdPort(options.getStatsdPort()) - .setStoreName(getStoreName()) - .build())); + validRowsGroupedByRef.apply( + "WriteRowMetrics", + ParDo.of( + WriteRowMetricsDoFn.newBuilder() + .setStatsdHost(options.getStatsdHost()) + .setStatsdPort(options.getStatsdPort()) + .setStoreName(getStoreName()) + .build())); + + validRowsGroupedByRef.apply( + "WriteFeatureValueMetrics", + ParDo.of( + WriteFeatureValueMetricsDoFn.newBuilder() + .setStatsdHost(options.getStatsdHost()) + .setStatsdPort(options.getStatsdPort()) + .setStoreName(getStoreName()) + .build())); return PDone.in(input.getPipeline()); case "none": diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java index 2cd1ee94ec..2fe1f2e7f0 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java @@ -17,17 +17,26 @@ package feast.ingestion.transform.metrics; import com.google.auto.value.AutoValue; +import com.google.protobuf.util.Timestamps; import com.timgroup.statsd.NonBlockingStatsDClient; import com.timgroup.statsd.StatsDClient; -import com.timgroup.statsd.StatsDClientException; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; import feast.types.ValueProto.Value.ValCase; +import java.time.Clock; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.slf4j.Logger; @AutoValue -public abstract class WriteRowMetricsDoFn extends DoFn { +public abstract class WriteRowMetricsDoFn extends DoFn>, Void> { private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class); @@ -39,12 +48,38 @@ public abstract class WriteRowMetricsDoFn extends DoFn { public static final String FEATURE_TAG_KEY = "feast_feature_name"; public static final String INGESTION_JOB_NAME_KEY = "ingestion_job_name"; + public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MIN = "feature_row_lag_ms_min"; + public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MAX = "feature_row_lag_ms_max"; + public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MEAN = "feature_row_lag_ms_mean"; + public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_90 = + "feature_row_lag_ms_percentile_90"; + public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_95 = + "feature_row_lag_ms_percentile_95"; + public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_99 = + "feature_row_lag_ms_percentile_99"; + + public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_MIN = "feature_value_lag_ms_min"; + public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_MAX = "feature_value_lag_ms_max"; + public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_MEAN = "feature_value_lag_ms_mean"; + public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_90 = + "feature_value_lag_ms_percentile_90"; + public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_95 = + "feature_value_lag_ms_percentile_95"; + public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_99 = + "feature_value_lag_ms_percentile_99"; + + public static final String COUNT_NAME_FEATURE_ROW_INGESTED = "feature_row_ingested_count"; + public static final String COUNT_NAME_FEATURE_VALUE_MISSING = "feature_value_missing_count"; + public abstract String getStoreName(); public abstract String getStatsdHost(); public abstract int getStatsdPort(); + @Nullable + public abstract Clock getClock(); + public static WriteRowMetricsDoFn create( String newStoreName, String newStatsdHost, int newStatsdPort) { return newBuilder() @@ -69,79 +104,147 @@ public abstract static class Builder { public abstract Builder setStatsdPort(int statsdPort); + /** + * setClock will override the default system clock used to calculate feature row lag. + * + * @param clock Clock instance + */ + public abstract Builder setClock(Clock clock); + public abstract WriteRowMetricsDoFn build(); } @Setup public void setup() { - statsd = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort()); + // Note that exception may be thrown during StatsD client instantiation but no exception + // will be thrown when sending metrics (mimicking the UDP protocol behaviour). + // https://jar-download.com/artifacts/com.datadoghq/java-dogstatsd-client/2.1.1/documentation + // https://github.com/DataDog/java-dogstatsd-client#unix-domain-socket-support + try { + statsd = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort()); + } catch (Exception e) { + log.error("StatsD client cannot be started: " + e.getMessage()); + } } + @SuppressWarnings("DuplicatedCode") @ProcessElement - public void processElement(ProcessContext c) { + public void processElement( + ProcessContext c, @Element KV> featureSetRefToFeatureRows) { + if (statsd == null) { + log.error("StatsD client is null, likely because it encounters an error during setup"); + return; + } - try { - FeatureRow row = c.element(); - long eventTimestamp = com.google.protobuf.util.Timestamps.toMillis(row.getEventTimestamp()); - - String[] split = row.getFeatureSet().split(":"); - String featureSetProject = split[0].split("/")[0]; - String featureSetName = split[0].split("/")[1]; - String featureSetVersion = split[1]; - - statsd.histogram( - "feature_row_lag_ms", - System.currentTimeMillis() - eventTimestamp, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject, - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); - - statsd.histogram( - "feature_row_event_time_epoch_ms", - eventTimestamp, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject, - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); - - for (Field field : row.getFieldsList()) { - if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { - statsd.histogram( - "feature_value_lag_ms", - System.currentTimeMillis() - eventTimestamp, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject, - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, - FEATURE_TAG_KEY + ":" + field.getName(), - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + String featureSetRef = featureSetRefToFeatureRows.getKey(); + if (featureSetRef == null) { + log.error( + "Feature set reference in the feature row is null. Please check the input feature rows from previous steps"); + return; + } + String[] colonSplits = featureSetRef.split(":"); + if (colonSplits.length != 2) { + log.error( + "Skip writing feature row metrics because the feature set reference '{}' does not" + + "follow the required format /:", + featureSetRef); + return; + } + String[] slashSplits = colonSplits[0].split("/"); + if (slashSplits.length != 2) { + log.error( + "Skip writing feature row metrics because the feature set reference '{}' does not" + + "follow the required format /:", + featureSetRef); + return; + } + + String featureSetProject = slashSplits[0]; + String featureSetName = slashSplits[1]; + String featureSetVersion = colonSplits[1]; + + // featureRowLagStats is stats for feature row lag for feature set "featureSetName" + DescriptiveStatistics featureRowLagStats = new DescriptiveStatistics(); + // featureNameToLagStats is stats for feature lag for all features in feature set + // "featureSetName" + Map featureNameToLagStats = new HashMap<>(); + // featureNameToMissingCount is count for "value_not_set" for all features in feature set + // "featureSetName" + Map featureNameToMissingCount = new HashMap<>(); + + for (FeatureRow featureRow : featureSetRefToFeatureRows.getValue()) { + long currentTime = getClock() == null ? System.currentTimeMillis() : getClock().millis(); + long featureRowLag = currentTime - Timestamps.toMillis(featureRow.getEventTimestamp()); + featureRowLagStats.addValue(featureRowLag); + + for (Field field : featureRow.getFieldsList()) { + String featureName = field.getName(); + Value featureValue = field.getValue(); + if (!featureNameToLagStats.containsKey(featureName)) { + // Ensure map contains the "featureName" key + featureNameToLagStats.put(featureName, new DescriptiveStatistics()); + } + if (!featureNameToMissingCount.containsKey(featureName)) { + // Ensure map contains the "featureName" key + featureNameToMissingCount.put(featureName, 0L); + } + if (featureValue.getValCase().equals(ValCase.VAL_NOT_SET)) { + featureNameToMissingCount.put( + featureName, featureNameToMissingCount.get(featureName) + 1); } else { - statsd.count( - "feature_value_missing_count", - 1, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject, - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, - FEATURE_TAG_KEY + ":" + field.getName(), - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + featureNameToLagStats.get(featureName).addValue(featureRowLag); } } + } + + String[] tags = { + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject, + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName(), + }; + + statsd.count(COUNT_NAME_FEATURE_ROW_INGESTED, featureRowLagStats.getN(), tags); + // DescriptiveStatistics returns invalid NaN value for getMin(), getMax(), ... when there is no + // items in the stats. + if (featureRowLagStats.getN() > 0) { + statsd.gauge(GAUGE_NAME_FEATURE_ROW_LAG_MS_MIN, featureRowLagStats.getMin(), tags); + statsd.gauge(GAUGE_NAME_FEATURE_ROW_LAG_MS_MAX, featureRowLagStats.getMax(), tags); + statsd.gauge(GAUGE_NAME_FEATURE_ROW_LAG_MS_MEAN, featureRowLagStats.getMean(), tags); + statsd.gauge( + GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_90, featureRowLagStats.getPercentile(90), tags); + statsd.gauge( + GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_95, featureRowLagStats.getPercentile(95), tags); + statsd.gauge( + GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_99, featureRowLagStats.getPercentile(99), tags); + } + for (Entry entry : featureNameToLagStats.entrySet()) { + String featureName = entry.getKey(); + String[] tagsWithFeatureName = ArrayUtils.add(tags, FEATURE_TAG_KEY + ":" + featureName); + DescriptiveStatistics stats = entry.getValue(); + if (stats.getN() > 0) { + statsd.gauge(GAUGE_NAME_FEATURE_VALUE_LAG_MS_MIN, stats.getMin(), tagsWithFeatureName); + statsd.gauge(GAUGE_NAME_FEATURE_VALUE_LAG_MS_MAX, stats.getMax(), tagsWithFeatureName); + statsd.gauge(GAUGE_NAME_FEATURE_VALUE_LAG_MS_MEAN, stats.getMean(), tagsWithFeatureName); + statsd.gauge( + GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_90, + stats.getPercentile(90), + tagsWithFeatureName); + statsd.gauge( + GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_95, + stats.getPercentile(95), + tagsWithFeatureName); + statsd.gauge( + GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_99, + stats.getPercentile(99), + tagsWithFeatureName); + } statsd.count( - "feature_row_ingested_count", - 1, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject, - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); - - } catch (StatsDClientException e) { - log.warn("Unable to push metrics to server", e); + COUNT_NAME_FEATURE_VALUE_MISSING, + featureNameToMissingCount.get(featureName), + tagsWithFeatureName); } } } diff --git a/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java index 7f48760110..cc65f2cff9 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java @@ -19,6 +19,9 @@ import static org.junit.Assert.fail; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Timestamps; +import feast.test.TestUtil.DummyStatsDServer; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FeatureRowProto.FeatureRow.Builder; import feast.types.FieldProto.Field; @@ -32,13 +35,10 @@ import feast.types.ValueProto.Value; import java.io.BufferedReader; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.SocketException; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -100,11 +100,12 @@ public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedExce fail(String.format("Expected StatsD metric not found:\n%s", expected)); } } + statsDServer.stop(); } // Test utility method to read expected StatsD metrics output from a text file. @SuppressWarnings("SameParameterValue") - private List readTestOutput(String path) throws IOException { + public static List readTestOutput(String path) throws IOException { URL url = Thread.currentThread().getContextClassLoader().getResource(path); if (url == null) { throw new IllegalArgumentException( @@ -123,9 +124,19 @@ private List readTestOutput(String path) throws IOException { return lines; } + public static Map> readTestInput(String path) throws IOException { + return readTestInput(path, null); + } + // Test utility method to create test feature row data from a text file. + // If tsOverride is not null, all the feature row will have the same timestamp "tsOverride". + // Else if there exist a "timestamp" column with RFC3339 format, the feature row will be assigned + // that timestamp. + // Else no timestamp will be assigned (the feature row will have the default proto Timestamp + // object). @SuppressWarnings("SameParameterValue") - private Map> readTestInput(String path) throws IOException { + public static Map> readTestInput(String path, Timestamp tsOverride) + throws IOException { Map> data = new HashMap<>(); URL url = Thread.currentThread().getContextClassLoader().getResource(path); if (url == null) { @@ -162,6 +173,13 @@ private Map> readTestInput(String path) throws IOEx continue; } String colName = colNames.get(i); + if (colName.equals("timestamp")) { + Instant instant = Instant.parse(colVal); + featureRowBuilder.setEventTimestamp( + Timestamps.fromNanos(instant.getEpochSecond() * 1_000_000_000 + instant.getNano())); + continue; + } + Field.Builder fieldBuilder = Field.newBuilder().setName(colName); if (!colVal.isEmpty()) { switch (colName) { @@ -245,6 +263,9 @@ private Map> readTestInput(String path) throws IOEx data.put(featureRowBuilder.getFeatureSet(), new ArrayList<>()); } List featureRowsByFeatureSetRef = data.get(featureRowBuilder.getFeatureSet()); + if (tsOverride != null) { + featureRowBuilder.setEventTimestamp(tsOverride); + } featureRowsByFeatureSetRef.add(featureRowBuilder.build()); } @@ -258,58 +279,4 @@ private Map> readTestInput(String path) throws IOEx } return dataWithIterable; } - - // Modified version of - // https://github.com/tim-group/java-statsd-client/blob/master/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java - @SuppressWarnings("CatchMayIgnoreException") - private static final class DummyStatsDServer { - - private final List messagesReceived = new ArrayList(); - private final DatagramSocket server; - - public DummyStatsDServer(int port) { - try { - server = new DatagramSocket(port); - } catch (SocketException e) { - throw new IllegalStateException(e); - } - new Thread( - () -> { - try { - while (true) { - final DatagramPacket packet = new DatagramPacket(new byte[65535], 65535); - server.receive(packet); - messagesReceived.add( - new String(packet.getData(), StandardCharsets.UTF_8).trim() + "\n"); - Thread.sleep(50); - } - - } catch (Exception e) { - } - }) - .start(); - } - - public void stop() { - server.close(); - } - - public void waitForMessage() { - while (messagesReceived.isEmpty()) { - try { - Thread.sleep(50L); - } catch (InterruptedException e) { - } - } - } - - public List messagesReceived() { - List out = new ArrayList<>(); - for (String msg : messagesReceived) { - String[] lines = msg.split("\n"); - out.addAll(Arrays.asList(lines)); - } - return out; - } - } } diff --git a/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFnTest.java b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFnTest.java new file mode 100644 index 0000000000..6e3caff56b --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFnTest.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.metrics; + +import static feast.ingestion.transform.metrics.WriteFeatureValueMetricsDoFnTest.readTestInput; +import static feast.ingestion.transform.metrics.WriteFeatureValueMetricsDoFnTest.readTestOutput; +import static org.junit.Assert.fail; + +import feast.test.TestUtil.DummyStatsDServer; +import feast.types.FeatureRowProto.FeatureRow; +import java.io.IOException; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.junit.Rule; +import org.junit.Test; + +public class WriteRowMetricsDoFnTest { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private static final int STATSD_SERVER_PORT = 17255; + private final DummyStatsDServer statsDServer = new DummyStatsDServer(STATSD_SERVER_PORT); + + @Test + public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedException { + PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); + pipelineOptions.setJobName("job"); + Map> input = + readTestInput("feast/ingestion/transform/WriteRowMetricsDoFnTest.input"); + List expectedLines = + readTestOutput("feast/ingestion/transform/WriteRowMetricsDoFnTest.output"); + + pipeline + .apply(Create.of(input)) + .apply( + ParDo.of( + WriteRowMetricsDoFn.newBuilder() + .setStatsdHost("localhost") + .setStatsdPort(STATSD_SERVER_PORT) + .setStoreName("store") + .setClock(Clock.fixed(Instant.ofEpochSecond(1585548645), ZoneId.of("UTC"))) + .build())); + pipeline.run(pipelineOptions).waitUntilFinish(); + // Wait until StatsD has finished processed all messages, 3 sec is a reasonable duration + // based on empirical testing. + Thread.sleep(3000); + + List actualLines = statsDServer.messagesReceived(); + for (String expected : expectedLines) { + boolean matched = false; + for (String actual : actualLines) { + if (actual.equals(expected)) { + matched = true; + break; + } + } + if (!matched) { + System.out.println("Print actual metrics output for debugging:"); + for (String line : actualLines) { + System.out.println(line); + } + fail(String.format("Expected StatsD metric not found:\n%s", expected)); + } + } + statsDServer.stop(); + } +} diff --git a/ingestion/src/test/java/feast/test/TestUtil.java b/ingestion/src/test/java/feast/test/TestUtil.java index 5c16d7e9e3..3cad39e3ec 100644 --- a/ingestion/src/test/java/feast/test/TestUtil.java +++ b/ingestion/src/test/java/feast/test/TestUtil.java @@ -36,6 +36,12 @@ import feast.types.ValueProto.Value; import feast.types.ValueProto.ValueType; import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -332,6 +338,63 @@ static void start(int zookeeperPort, String zookeeperDataDir) { } } + // Modified version of + // https://github.com/tim-group/java-statsd-client/blob/master/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java + @SuppressWarnings("CatchMayIgnoreException") + public static class DummyStatsDServer { + + private final List messagesReceived = new ArrayList(); + private final DatagramSocket server; + + public DummyStatsDServer(int port) { + try { + server = new DatagramSocket(port); + } catch (SocketException e) { + throw new IllegalStateException(e); + } + new Thread( + () -> { + try { + while (true) { + final DatagramPacket packet = new DatagramPacket(new byte[65535], 65535); + server.receive(packet); + messagesReceived.add( + new String(packet.getData(), StandardCharsets.UTF_8).trim() + "\n"); + // The sleep duration here is shorter than that used in waitForMessage() at + // 50ms. + // Otherwise sometimes some messages seem to be lost, leading to flaky tests. + Thread.sleep(15L); + } + + } catch (Exception e) { + } + }) + .start(); + } + + public void stop() { + server.close(); + } + + public void waitForMessage() { + while (messagesReceived.isEmpty()) { + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + } + } + } + + public List messagesReceived() { + List out = new ArrayList<>(); + for (String msg : messagesReceived) { + String[] lines = msg.split("\n"); + out.addAll(Arrays.asList(lines)); + } + return out; + } + } + /** * Create a field object with given name and type. * diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteRowMetricsDoFnTest.input b/ingestion/src/test/resources/feast/ingestion/transform/WriteRowMetricsDoFnTest.input new file mode 100644 index 0000000000..4d42f5bc4c --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteRowMetricsDoFnTest.input @@ -0,0 +1,4 @@ +featuresetref,int32,int64,timestamp +project/featureset:1,1,5,2020-03-30T06:10:38Z +project/featureset:1,5,8,2020-03-30T06:10:43Z +project/featureset:1,6,,2020-03-30T06:10:42Z \ No newline at end of file diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteRowMetricsDoFnTest.output b/ingestion/src/test/resources/feast/ingestion/transform/WriteRowMetricsDoFnTest.output new file mode 100644 index 0000000000..318ce8eb08 --- /dev/null +++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteRowMetricsDoFnTest.output @@ -0,0 +1,23 @@ +feast_ingestion.feature_row_ingested_count:3|c|#ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_row_lag_ms_min:2000|g|#ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_row_lag_ms_max:7000|g|#ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_row_lag_ms_mean:4000|g|#ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_row_lag_ms_percentile_90:7000|g|#ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_row_lag_ms_percentile_95:7000|g|#ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_row_lag_ms_percentile_99:7000|g|#ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_lag_ms_min:2000|g|#feast_feature_name:int32,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_lag_ms_max:7000|g|#feast_feature_name:int32,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_lag_ms_mean:4000|g|#feast_feature_name:int32,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_lag_ms_percentile_90:7000|g|#feast_feature_name:int32,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_lag_ms_percentile_95:7000|g|#feast_feature_name:int32,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_lag_ms_percentile_99:7000|g|#feast_feature_name:int32,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_missing_count:0|c|#feast_feature_name:int32,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store + +feast_ingestion.feature_value_lag_ms_min:2000|g|#feast_feature_name:int64,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_lag_ms_max:7000|g|#feast_feature_name:int64,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_lag_ms_mean:4500|g|#feast_feature_name:int64,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_lag_ms_percentile_90:7000|g|#feast_feature_name:int64,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_lag_ms_percentile_95:7000|g|#feast_feature_name:int64,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_lag_ms_percentile_99:7000|g|#feast_feature_name:int64,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store +feast_ingestion.feature_value_missing_count:1|c|#feast_feature_name:int64,ingestion_job_name:job,feast_featureSet_version:1,feast_featureSet_name:featureset,feast_project_name:project,feast_store:store \ No newline at end of file