Skip to content

Commit

Permalink
Filter out extra fields, deduplicate fields in ingestion (#404)
Browse files Browse the repository at this point in the history
* Filter out duplicate fields, ignore extra fields

* Add test for filtering out extra fields
  • Loading branch information
Chen Zhiling authored and feast-ci-bot committed Jan 6, 2020
1 parent 8a0f53b commit 3e25841
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FieldProto;
import feast.types.ValueProto.Value.ValCase;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.TupleTag;

Expand Down Expand Up @@ -57,15 +61,12 @@ public void processElement(ProcessContext context) {
String error = null;
FeatureRow featureRow = context.element();
FeatureSet featureSet = getFeatureSets().getOrDefault(featureRow.getFeatureSet(), null);
List<FieldProto.Field> fields = new ArrayList<>();
if (featureSet != null) {

for (FieldProto.Field field : featureRow.getFieldsList()) {
Field fieldSpec = featureSet.getField(field.getName());
if (fieldSpec == null) {
error =
String.format(
"FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.",
field.getName(), featureSet.getReference());
// skip
break;
}
// If value is set in the FeatureRow, make sure the value type matches
Expand All @@ -81,6 +82,9 @@ public void processElement(ProcessContext context) {
break;
}
}
if (!fields.contains(field)) {
fields.add(field);
}
}
} else {
error =
Expand All @@ -107,6 +111,10 @@ public void processElement(ProcessContext context) {
}
context.output(getFailureTag(), failedElement.build());
} else {
featureRow = featureRow.toBuilder()
.clearFields()
.addAllFields(fields)
.build();
context.output(getSuccessTag(), featureRow);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import feast.ingestion.values.FailedElement;
import feast.test.TestUtil;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FieldProto.Field;
import feast.types.ValueProto.Value;
import feast.types.ValueProto.ValueType.Enum;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -138,4 +140,65 @@ public void shouldWriteSuccessAndFailureTagsCorrectly() {

p.run();
}

@Test
public void shouldExcludeUnregisteredFields() {
FeatureSet fs1 =
FeatureSet.newBuilder()
.setSpec(
FeatureSetSpec.newBuilder()
.setName("feature_set")
.setVersion(1)
.setProject("myproject")
.addEntities(
EntitySpec.newBuilder()
.setName("entity_id_primary")
.setValueType(Enum.INT32)
.build())
.addEntities(
EntitySpec.newBuilder()
.setName("entity_id_secondary")
.setValueType(Enum.STRING)
.build())
.addFeatures(
FeatureSpec.newBuilder()
.setName("feature_1")
.setValueType(Enum.STRING)
.build())
.addFeatures(
FeatureSpec.newBuilder()
.setName("feature_2")
.setValueType(Enum.INT64)
.build()))
.build();

Map<String, FeatureSet> featureSets = new HashMap<>();
featureSets.put("myproject/feature_set:1", fs1);

List<FeatureRow> input = new ArrayList<>();
List<FeatureRow> expected = new ArrayList<>();

FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1);
expected.add(randomRow);
input.add(randomRow.toBuilder()
.addFields(Field.newBuilder()
.setName("extra")
.setValue(Value.newBuilder().setStringVal("hello")))
.build()
);

PCollectionTuple output =
p.apply(Create.of(input))
.setCoder(ProtoCoder.of(FeatureRow.class))
.apply(
ValidateFeatureRows.newBuilder()
.setFailureTag(FAILURE_TAG)
.setSuccessTag(SUCCESS_TAG)
.setFeatureSets(featureSets)
.build());

PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected);

p.run();
}
}

0 comments on commit 3e25841

Please sign in to comment.