diff --git a/core/src/main/java/feast/core/validators/SpecValidator.java b/core/src/main/java/feast/core/validators/SpecValidator.java index 6433746263..743f21e1b0 100644 --- a/core/src/main/java/feast/core/validators/SpecValidator.java +++ b/core/src/main/java/feast/core/validators/SpecValidator.java @@ -224,7 +224,8 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException case "pubsub": checkPubSubImportSpecOption(spec); break; - case "file": + case "file.csv": + case "file.json": checkFileImportSpecOption(spec); checkArgument( !spec.getSchema().getEntityIdColumn().equals(""), @@ -263,9 +264,6 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException private void checkFileImportSpecOption(ImportSpec spec) throws IllegalArgumentException { try { - checkArgument( - Lists.newArrayList("json", "csv").contains(spec.getOptionsOrThrow("format")), - "File format must be of type 'json' or 'csv'"); checkArgument(!spec.getOptionsOrDefault("path", "").equals(""), "File path cannot be empty"); } catch (NullPointerException | IllegalArgumentException e) { throw new IllegalArgumentException( diff --git a/core/src/test/java/feast/core/validators/SpecValidatorTest.java b/core/src/test/java/feast/core/validators/SpecValidatorTest.java index d82f878499..e09e31c6d0 100644 --- a/core/src/test/java/feast/core/validators/SpecValidatorTest.java +++ b/core/src/test/java/feast/core/validators/SpecValidatorTest.java @@ -670,10 +670,10 @@ public void fileImportSpecWithoutSupportedFileFormatShouldThrowIllegalArgumentEx featureGroupInfoRepository, featureInfoRepository); ImportSpec input = - ImportSpec.newBuilder().setType("file").putOptions("format", "notSupported").build(); + ImportSpec.newBuilder().setType("file.wat?").build(); exception.expect(IllegalArgumentException.class); exception.expectMessage( - "Validation for import spec failed: Invalid options: File format must be of type 'json' or 'csv'"); + "Validation for import spec failed: Type file.wat? not supported"); validator.validateImportSpec(input); } @@ -685,7 +685,7 @@ public void fileImportSpecWithoutValidPathShouldThrowIllegalArgumentException() entityInfoRepository, featureGroupInfoRepository, featureInfoRepository); - ImportSpec input = ImportSpec.newBuilder().setType("file").putOptions("format", "csv").build(); + ImportSpec input = ImportSpec.newBuilder().setType("file.csv").build(); exception.expect(IllegalArgumentException.class); exception.expectMessage( "Validation for import spec failed: Invalid options: File path cannot be empty"); @@ -702,8 +702,7 @@ public void fileImportSpecWithoutEntityIdColumnInSchemaShouldThrowIllegalArgumen featureInfoRepository); ImportSpec input = ImportSpec.newBuilder() - .setType("file") - .putOptions("format", "csv") + .setType("file.csv") .putOptions("path", "gs://asdasd") .build(); exception.expect(IllegalArgumentException.class); diff --git a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowCsvIO.java b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowCsvIO.java deleted file mode 100644 index fb1bfddb30..0000000000 --- a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowCsvIO.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.ingestion.transform; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import feast.ingestion.metrics.FeastMetrics; -import feast.ingestion.model.Values; -import feast.ingestion.transform.CsvIO.StringMap; -import feast.ingestion.util.DateUtil; -import feast.specs.ImportSpecProto.Field; -import feast.specs.ImportSpecProto.ImportSpec; -import feast.types.FeatureProto.Feature; -import feast.types.FeatureRowProto.FeatureRow; -import lombok.Builder; -import lombok.NonNull; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; - -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -public class FeatureRowCsvIO { - - public static Read read(ImportSpec importSpec) { - return Read.builder().importSpec(importSpec).build(); - } - - /** - * Transform for processing CSV text jsonfiles and producing FeatureRow messages. CSV jsonfiles - * with headers are not supported. - * - *

This transform asserts that the import spec is for only one entity, as all columns must have - * the same entity. There must be the same number of columns in the CSV jsonfiles as in the import - * spec. - * - *

The output is a PCollection of {@link feast.types.FeatureRowProto.FeatureRow FeatureRows}, - * where every feature and entity {@link feast.types.ValueProto.Value Value} in the FeatureRow is - * a string (set via Value.stringVal). - */ - @Builder - public static class Read extends FeatureIO.Read { - - @NonNull private final ImportSpec importSpec; - - @Override - public PCollection expand(PInput input) { - final List fieldNames = Lists.newArrayList(); - final Map fields = Maps.newHashMap(); - for (Field field : importSpec.getSchema().getFieldsList()) { - String displayName = !field.getName().isEmpty() ? field.getName() : field.getFeatureId(); - fieldNames.add(displayName); - fields.put(displayName, field); - } - - String path = importSpec.getOptionsMap().get("path"); - Preconditions.checkArgument( - !Strings.isNullOrEmpty(path), "path must be set as option in ImportSpec for CSV import"); - Preconditions.checkArgument( - importSpec.getEntitiesList().size() == 1, "exactly 1 entity must be set for CSV import"); - String entity = importSpec.getEntities(0); - String entityIdColumn = importSpec.getSchema().getEntityIdColumn(); - Preconditions.checkArgument( - !Strings.isNullOrEmpty(entityIdColumn), "entity id column must be set"); - String timestampColumn = importSpec.getSchema().getTimestampColumn(); - Preconditions.checkArgument( - importSpec.getSchema().getFieldsList().size() > 0, - "CSV import needs schema with a least one field specified"); - - PCollection stringMaps = input.getPipeline().apply(CsvIO.read(path, fieldNames)); - - return stringMaps.apply( - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext context) { - FeatureRow.Builder builder = FeatureRow.newBuilder(); - try { - StringMap stringMap = context.element(); - builder.setEntityName(entity); - - - for (Entry entry : stringMap.entrySet()) { - String name = entry.getKey(); - String value = entry.getValue(); - Field field = fields.get(name); - - // A feature can only be one of these things - if (entityIdColumn.equals(name)) { - builder.setEntityKey(value); - } else if (timestampColumn.equals(name)) { - builder.setEventTimestamp(DateUtil.toTimestamp(value)); - } else if (!Strings.isNullOrEmpty(field.getFeatureId())) { - String featureId = field.getFeatureId(); - builder.addFeatures( - Feature.newBuilder().setId(featureId).setValue(Values.ofString(value))); - } - // else silently ignore this column - } - if (!importSpec - .getSchema() - .getTimestampValue() - .equals(com.google.protobuf.Timestamp.getDefaultInstance())) { - // This overrides any column event timestamp. - builder.setEventTimestamp(importSpec.getSchema().getTimestampValue()); - } - context.output(builder.build()); - } catch (Exception e) { - FeastMetrics.inc(builder.build(), "input_errors"); - } - } - })); - } - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowJsonTextIO.java b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowJsonTextIO.java deleted file mode 100644 index 5814334d86..0000000000 --- a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowJsonTextIO.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.ingestion.transform; - -import com.google.common.base.Preconditions; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; -import feast.specs.ImportSpecProto.ImportSpec; -import feast.types.FeatureRowProto.FeatureRow; -import lombok.Builder; -import lombok.NonNull; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; - -public class FeatureRowJsonTextIO { - - public static Read read(ImportSpec importSpec) { - return Read.builder().importSpec(importSpec).build(); - } - - /** - * Transform for processing JSON text jsonfiles and that contain JSON serialised FeatureRow messages, - * one per line. - * - *

This transform asserts that the import spec is for only one entity, as all columns must have - * the same entity. - * - *

The output is a PCollection of {@link feast.types.FeatureRowProto.FeatureRow FeatureRows}. - */ - @Builder - public static class Read extends FeatureIO.Read { - - @NonNull private final ImportSpec importSpec; - - @Override - public PCollection expand(PInput input) { - String path = importSpec.getOptionsOrDefault("path", null); - Preconditions.checkNotNull(path, "Path must be set in for file import"); - PCollection jsonLines = input.getPipeline().apply(TextIO.read().from(path)); - - return jsonLines.apply( - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext context) { - String line = context.element(); - FeatureRow.Builder builder = FeatureRow.newBuilder(); - try { - // TODO use proto registry so that it can read Any feature values, not just - // primitives. - JsonFormat.parser().merge(line, builder); - context.output(builder.build()); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - } - })); - } - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java deleted file mode 100644 index eeb282759d..0000000000 --- a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowKafkaIO.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.ingestion.transform; - -import static com.google.common.base.Preconditions.checkArgument; - -import feast.ingestion.deserializer.FeatureRowDeserializer; -import feast.ingestion.deserializer.FeatureRowKeyDeserializer; -import feast.options.Options; -import feast.options.OptionsParser; -import feast.specs.ImportSpecProto.ImportSpec; -import feast.types.FeatureRowProto.FeatureRow; -import feast.types.FeatureRowProto.FeatureRowKey; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import javax.validation.constraints.NotEmpty; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.io.kafka.KafkaRecord; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; - -public class FeatureRowKafkaIO { - static final String KAFKA_TYPE = "kafka"; - - /** - * Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages - * from kafka one or more kafka topics. - */ - public static Read read(ImportSpec importSpec) { - return new Read(importSpec); - } - - public static class KafkaReadOptions implements Options { - @NotEmpty public String server; - @NotEmpty public String topics; - } - - public static class Read extends FeatureIO.Read { - - private ImportSpec importSpec; - - private Read(ImportSpec importSpec) { - this.importSpec = importSpec; - } - - @Override - public PCollection expand(PInput input) { - - checkArgument(importSpec.getType().equals(KAFKA_TYPE)); - - KafkaReadOptions options = - OptionsParser.parse(importSpec.getOptionsMap(), KafkaReadOptions.class); - - List topicsList = new ArrayList<>(Arrays.asList(options.topics.split(","))); - - KafkaIO.Read kafkaIOReader = - KafkaIO.read() - .withBootstrapServers(options.server) - .withTopics(topicsList) - .withKeyDeserializer(FeatureRowKeyDeserializer.class) - .withValueDeserializer(FeatureRowDeserializer.class); - - PCollection> featureRowRecord = - input.getPipeline().apply(kafkaIOReader); - - PCollection featureRow = - featureRowRecord.apply( - ParDo.of( - new DoFn, FeatureRow>() { - @ProcessElement - public void processElement(ProcessContext processContext) { - KafkaRecord record = processContext.element(); - processContext.output(record.getKV().getValue()); - } - })); - return featureRow; - } - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowPubSubIO.java b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowPubSubIO.java deleted file mode 100644 index bb0d2c335b..0000000000 --- a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowPubSubIO.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.ingestion.transform; - -import static com.google.common.base.Preconditions.checkArgument; - -import javax.validation.constraints.AssertTrue; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; -import feast.options.Options; -import feast.options.OptionsParser; -import feast.specs.ImportSpecProto.ImportSpec; -import feast.types.FeatureRowProto.FeatureRow; - -public class FeatureRowPubSubIO { - - public static final String PUB_SUB_TYPE = "pubsub"; - - public static Read read(ImportSpec importSpec) { - return new Read(importSpec); - } - - /** - * Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} - * proto messages from Cloud PubSub. - * - *

This transform accepts multiple entities in the import spec and expects no columns to be - * specified as it does not need to construct FeatureRows, merely pass them on. - * - *

Because Feast ingestion is stateless, the message event time is simply the processing time, - * there is no need to override it based on any property of the message. - */ - public static class Read extends FeatureIO.Read { - - private ImportSpec importSpec; - - private Read(ImportSpec importSpec) { - this.importSpec = importSpec; - } - - @Override - public PCollection expand(PInput input) { - checkArgument(importSpec.getType().equals(PUB_SUB_TYPE)); - PubSubReadOptions options = - OptionsParser.parse(importSpec.getOptionsMap(), PubSubReadOptions.class); - - PubsubIO.Read read = readProtos(); - - if (options.subscription != null) { - read = read.fromSubscription(options.subscription); - } else if (options.topic != null) { - read = read.fromTopic(options.topic); - } - return input.getPipeline().apply(read); - } - - PubsubIO.Read readProtos() { - return PubsubIO.readProtos(FeatureRow.class); - } - - public static class PubSubReadOptions implements Options { - public String subscription; - public String topic; - - @AssertTrue(message = "subscription or topic must be set") - boolean isValid() { - return subscription != null || topic != null; - } - } - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/ReadFeaturesTransform.java b/ingestion/src/main/java/feast/ingestion/transform/ReadFeaturesTransform.java index af7945bc77..5da8f6e753 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ReadFeaturesTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ReadFeaturesTransform.java @@ -17,10 +17,10 @@ package feast.ingestion.transform; -import com.google.common.base.Enums; import com.google.common.base.Preconditions; -import feast.storage.bigquery.FeatureRowBigQueryIO; import com.google.inject.Inject; +import feast.source.FeatureSourceFactory; +import feast.source.FeatureSourceFactoryService; import feast.specs.ImportSpecProto.ImportSpec; import feast.types.FeatureRowProto.FeatureRow; import org.apache.beam.sdk.transforms.PTransform; @@ -29,43 +29,31 @@ public class ReadFeaturesTransform extends PTransform> { - private ImportSpec importSpec; + private ImportSpec importSpec; - @Inject - public ReadFeaturesTransform(ImportSpec importSpec) { - this.importSpec = importSpec; - } + @Inject + public ReadFeaturesTransform(ImportSpec importSpec) { + this.importSpec = importSpec; + } - @Override - public PCollection expand(PInput input) { - return input.getPipeline().apply("Read " + importSpec.getType(), getTransform()); - } + @Override + public PCollection expand(PInput input) { + return input.getPipeline().apply("Read " + importSpec.getType(), getTransform()); + } + + public PTransform> getTransform() { + String type = importSpec.getType(); + Preconditions.checkArgument(!type.isEmpty(), "type missing in import spec"); - public PTransform> getTransform() { - String type = importSpec.getType(); - Preconditions.checkArgument(!type.isEmpty(), "type missing in import spec"); - Preconditions.checkArgument(Enums.getIfPresent(FeatureEnums.InputSource.class, type.toUpperCase()).isPresent(), "The type defined is invalid or not supported"); - switch (FeatureEnums.InputSource.valueOf(type.toUpperCase())) { - case FILE: - String format = importSpec.getOptionsOrDefault("format", null); - Preconditions.checkNotNull(format, "format option missing from import spec of type file"); - switch (FeatureEnums.FileFormat.valueOf(format.toUpperCase())) { - case CSV: - return FeatureRowCsvIO.read(importSpec); - case JSON: - return FeatureRowJsonTextIO.read(importSpec); - default: - throw new IllegalArgumentException("Unknown format in import spec" + type); - } - case BIGQUERY: - return FeatureRowBigQueryIO.read(importSpec); - case PUBSUB: - return FeatureRowPubSubIO.read(importSpec); - case KAFKA: - return FeatureRowKafkaIO.read(importSpec); - default: - throw new IllegalArgumentException("Unknown type in import spec" + type); - } + FeatureSourceFactory featureSourceFactory = null; + for (FeatureSourceFactory factory : FeatureSourceFactoryService.getAll()) { + if (type.equals(factory.getType())) { + featureSourceFactory = factory; + } } + Preconditions + .checkNotNull(featureSourceFactory, "No FeatureSourceFactory found for type " + type); + return featureSourceFactory.create(importSpec); + } } diff --git a/ingestion/src/main/java/feast/options/Options.java b/ingestion/src/main/java/feast/options/Options.java index 920674bb28..47077810c3 100644 --- a/ingestion/src/main/java/feast/options/Options.java +++ b/ingestion/src/main/java/feast/options/Options.java @@ -19,5 +19,9 @@ import java.io.Serializable; -/** iterface for identifying classes that can use the OptionsParser for extra type safety */ -public interface Options extends Serializable {} +/** + * interface for identifying classes that can use the OptionsParser for extra type safety + */ +public interface Options extends Serializable { + +} diff --git a/ingestion/src/main/java/feast/options/OptionsParser.java b/ingestion/src/main/java/feast/options/OptionsParser.java index 33001ca063..7eab269637 100644 --- a/ingestion/src/main/java/feast/options/OptionsParser.java +++ b/ingestion/src/main/java/feast/options/OptionsParser.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.module.jsonSchema.JsonSchema; import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator; import com.google.common.collect.Lists; +import feast.source.csv.CsvFileFeatureSource.CsvFileFeatureSourceOptions; import java.io.IOException; import java.util.List; import java.util.Map; @@ -32,6 +33,7 @@ import javax.validation.ValidatorFactory; public class OptionsParser { + private static final ObjectMapper mapper = new ObjectMapper(); private static final Validator validator; @@ -41,7 +43,9 @@ public class OptionsParser { } } - /** Return a json schema string representing an options class for error messages */ + /** + * Return a json schema string representing an options class for error messages + */ static String getJsonSchema(Class optionsClass) { JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper); JsonSchema schema = null; @@ -54,7 +58,9 @@ static String getJsonSchema(Class optionsClass) { } } - /** Construct a class from string options and validate with any javax validation annotations */ + /** + * Construct a class from string options and validate with any javax validation annotations + */ public static T parse(Map optionsMap, Class clazz) { List messages = Lists.newArrayList(); T options; diff --git a/ingestion/src/main/java/feast/source/FeatureSource.java b/ingestion/src/main/java/feast/source/FeatureSource.java new file mode 100644 index 0000000000..4514b2de37 --- /dev/null +++ b/ingestion/src/main/java/feast/source/FeatureSource.java @@ -0,0 +1,10 @@ +package feast.source; + +import feast.types.FeatureRowProto.FeatureRow; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; + +public abstract class FeatureSource extends PTransform> { + +} diff --git a/ingestion/src/main/java/feast/source/FeatureSourceFactory.java b/ingestion/src/main/java/feast/source/FeatureSourceFactory.java new file mode 100644 index 0000000000..1de49cf342 --- /dev/null +++ b/ingestion/src/main/java/feast/source/FeatureSourceFactory.java @@ -0,0 +1,14 @@ +package feast.source; + +import feast.specs.ImportSpecProto.ImportSpec; + +/** + * A FeatureSourceFactory creates FeatureSource instances, which can read FeatureRow messages from a + * source location. + */ +public interface FeatureSourceFactory { + + String getType(); + + FeatureSource create(ImportSpec importSpec); +} diff --git a/ingestion/src/main/java/feast/source/FeatureSourceFactoryService.java b/ingestion/src/main/java/feast/source/FeatureSourceFactoryService.java new file mode 100644 index 0000000000..e4dd994dcc --- /dev/null +++ b/ingestion/src/main/java/feast/source/FeatureSourceFactoryService.java @@ -0,0 +1,44 @@ +package feast.source; + +import avro.shaded.com.google.common.collect.Lists; +import com.google.common.collect.Iterators; +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class FeatureSourceFactoryService { + + private static ServiceLoader serviceLoader = ServiceLoader + .load(FeatureSourceFactory.class); + private static List manuallyRegistered = new ArrayList<>(); + + static { + for (FeatureSourceFactory source : getAll()) { + log.info("FeatureSourceFactory type found: " + source.getType()); + } + } + + public static List getAll() { + return Lists.newArrayList( + Iterators.concat(manuallyRegistered.iterator(), serviceLoader.iterator())); + } + + /** + * Get store of the given subclass. + */ + public static T get(Class clazz) { + for (FeatureSourceFactory store : getAll()) { + if (clazz.isInstance(store)) { + //noinspection unchecked + return (T) store; + } + } + return null; + } + + public static void register(FeatureSourceFactory store) { + manuallyRegistered.add(store); + } +} diff --git a/ingestion/src/main/java/feast/source/bigquery/BigQueryFeatureSource.java b/ingestion/src/main/java/feast/source/bigquery/BigQueryFeatureSource.java new file mode 100644 index 0000000000..b4b3973b5c --- /dev/null +++ b/ingestion/src/main/java/feast/source/bigquery/BigQueryFeatureSource.java @@ -0,0 +1,91 @@ +package feast.source.bigquery; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.common.base.Preconditions; +import feast.options.Options; +import feast.options.OptionsParser; +import feast.source.FeatureSource; +import feast.source.FeatureSourceFactory; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.types.FeatureRowProto.FeatureRow; +import java.util.List; +import javax.validation.constraints.NotEmpty; +import lombok.AllArgsConstructor; +import lombok.NonNull; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; + + +/** + * Transform for processing BigQuery tables and producing FeatureRow messages. + * + *

This transform asserts that the import spec is for only one entity, as all columns must have + * the same entity. The columns names in the import spec will be used for selecting columns from the + * BigQuery table, but it still scans the whole row. + * + *

The output is a PCollection of {@link feast.types.FeatureRowProto.FeatureRow FeatureRows}, + * where each feature and the entity key {@link feast.types.ValueProto.Value Value} in the + * FeatureRow is taken from a column in the BigQuery table and set with the closest type to the + * BigQuery column schema that is available in {@link feast.types.ValueProto.ValueType ValueType}. + * + *

Note a small gotcha is that because Integers and Numerics in BigQuery are 64 bits, these are + * always cast to INT64 and DOUBLE respectively. + * + *

Downstream these will fail validation if the corresponding {@link + * feast.specs.FeatureSpecProto.FeatureSpec FeatureSpec} has a 32 bit type. + */ +@AllArgsConstructor +public class BigQueryFeatureSource extends FeatureSource { + + private static final String BIGQUERY_FEATURE_SOURCE_TYPE = "bigquery"; + + @NonNull + private final ImportSpec importSpec; + + @Override + public PCollection expand(PInput input) { + BigQuerySourceOptions options = OptionsParser + .parse(importSpec.getOptionsMap(), BigQuerySourceOptions.class); + + List entities = importSpec.getEntitiesList(); + Preconditions.checkArgument( + entities.size() == 1, "exactly 1 entity must be set for BigQuery import"); + + String url = String.format("%s:%s.%s", options.project, options.dataset, options.table); + + return input + .getPipeline() + .apply( + BigQueryIO.read(new BigQueryToFeatureRowFn(importSpec)).from(url)); + } + + + public static class BigQuerySourceOptions implements Options { + + @NotEmpty + public String project; + @NotEmpty + public String dataset; + @NotEmpty + public String table; + } + + + @AutoService(FeatureSourceFactory.class) + public static class Factory implements FeatureSourceFactory { + + @Override + public String getType() { + return BIGQUERY_FEATURE_SOURCE_TYPE; + } + + @Override + public FeatureSource create(ImportSpec importSpec) { + checkArgument(importSpec.getType().equals(getType())); + return new BigQueryFeatureSource(importSpec); + } + } +} diff --git a/ingestion/src/main/java/feast/storage/bigquery/FeatureRowFromBigQuerySchemaAndRecordFn.java b/ingestion/src/main/java/feast/source/bigquery/BigQueryToFeatureRowFn.java similarity index 92% rename from ingestion/src/main/java/feast/storage/bigquery/FeatureRowFromBigQuerySchemaAndRecordFn.java rename to ingestion/src/main/java/feast/source/bigquery/BigQueryToFeatureRowFn.java index 243c496cc5..795735410d 100644 --- a/ingestion/src/main/java/feast/storage/bigquery/FeatureRowFromBigQuerySchemaAndRecordFn.java +++ b/ingestion/src/main/java/feast/source/bigquery/BigQueryToFeatureRowFn.java @@ -15,7 +15,7 @@ * */ -package feast.storage.bigquery; +package feast.source.bigquery; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableSchema; @@ -23,6 +23,7 @@ import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.common.collect.Maps; import com.google.protobuf.Timestamp; +import feast.storage.bigquery.ValueBigQueryBuilder; import java.util.Map; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; @@ -40,16 +41,13 @@ * This is a serializable function used with the BigQueryIO for fetching feature rows directly from * BigQuery */ -public class FeatureRowFromBigQuerySchemaAndRecordFn +public class BigQueryToFeatureRowFn implements SerializableFunction { - private static final Logger LOGGER = - LoggerFactory.getLogger(FeatureRowFromBigQuerySchemaAndRecordFn.class); - private final ImportSpec importSpec; private final Map fields; - public FeatureRowFromBigQuerySchemaAndRecordFn(ImportSpec importSpec) { + public BigQueryToFeatureRowFn(ImportSpec importSpec) { this.importSpec = importSpec; fields = Maps.newHashMap(); for (Field field : importSpec.getSchema().getFieldsList()) { diff --git a/ingestion/src/main/java/feast/source/csv/CsvFileFeatureSource.java b/ingestion/src/main/java/feast/source/csv/CsvFileFeatureSource.java new file mode 100644 index 0000000000..837050fc68 --- /dev/null +++ b/ingestion/src/main/java/feast/source/csv/CsvFileFeatureSource.java @@ -0,0 +1,161 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.source.csv; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import feast.ingestion.metrics.FeastMetrics; +import feast.ingestion.model.Values; +import feast.ingestion.util.DateUtil; +import feast.options.Options; +import feast.options.OptionsParser; +import feast.source.FeatureSource; +import feast.source.FeatureSourceFactory; +import feast.source.csv.CsvIO.StringMap; +import feast.specs.ImportSpecProto.Field; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.specs.ImportSpecProto.Schema; +import feast.types.FeatureProto.Feature; +import feast.types.FeatureRowProto.FeatureRow; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import javax.validation.constraints.NotEmpty; +import lombok.AllArgsConstructor; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; + + +/** + * Transform for processing CSV text jsonfiles and producing FeatureRow messages. CSV jsonfiles with + * headers are not supported. + * + *

This transform asserts that the import spec is for only one entity, as all columns must have + * the same entity. There must be the same number of columns in the CSV jsonfiles as in the import + * spec. + * + *

The output is a PCollection of {@link feast.types.FeatureRowProto.FeatureRow FeatureRows}, + * where every feature and entity {@link feast.types.ValueProto.Value Value} in the FeatureRow is a + * string (set via Value.stringVal). + */ +@AllArgsConstructor +public class CsvFileFeatureSource extends FeatureSource { + + public static final String CSV_FILE_FEATURE_SOURCE_TYPE = "file.csv"; + + private final ImportSpec importSpec; + + @Override + public PCollection expand(PInput input) { + CsvFileFeatureSourceOptions options = OptionsParser + .parse(importSpec.getOptionsMap(), CsvFileFeatureSourceOptions.class); + List entities = importSpec.getEntitiesList(); + Preconditions.checkArgument( + entities.size() == 1, "exactly 1 entity must be set for CSV import"); + Schema schema = importSpec.getSchema(); + String entity = entities.get(0); + + final List fieldNames = Lists.newArrayList(); + final Map fields = Maps.newHashMap(); + for (Field field : schema.getFieldsList()) { + String displayName = !field.getName().isEmpty() ? field.getName() : field.getFeatureId(); + fieldNames.add(displayName); + fields.put(displayName, field); + } + + String path = options.path; + String entityIdColumn = schema.getEntityIdColumn(); + Preconditions.checkArgument( + !Strings.isNullOrEmpty(entityIdColumn), "entity id column must be set"); + String timestampColumn = schema.getTimestampColumn(); + Preconditions.checkArgument( + schema.getFieldsList().size() > 0, + "CSV import needs schema with a least one field specified"); + + PCollection stringMaps = input.getPipeline().apply(CsvIO.read(path, fieldNames)); + + return stringMaps.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext context) { + FeatureRow.Builder builder = FeatureRow.newBuilder(); + try { + StringMap stringMap = context.element(); + builder.setEntityName(entity); + + for (Entry entry : stringMap.entrySet()) { + String name = entry.getKey(); + String value = entry.getValue(); + Field field = fields.get(name); + + // A feature can only be one of these things + if (entityIdColumn.equals(name)) { + builder.setEntityKey(value); + } else if (timestampColumn.equals(name)) { + builder.setEventTimestamp(DateUtil.toTimestamp(value)); + } else if (!Strings.isNullOrEmpty(field.getFeatureId())) { + String featureId = field.getFeatureId(); + builder.addFeatures( + Feature.newBuilder().setId(featureId).setValue(Values.ofString(value))); + } + // else silently ignore this column + } + if (!schema.getTimestampValue() + .equals(com.google.protobuf.Timestamp.getDefaultInstance())) { + // This overrides any column event timestamp. + builder.setEventTimestamp(schema.getTimestampValue()); + } + context.output(builder.build()); + } catch (Exception e) { + FeastMetrics.inc(builder.build(), "input_errors"); + } + } + })); + + } + + + public static class CsvFileFeatureSourceOptions implements Options { + + @NotEmpty + public String path; + } + + @AutoService(FeatureSourceFactory.class) + public static class Factory implements FeatureSourceFactory { + + @Override + public String getType() { + return CSV_FILE_FEATURE_SOURCE_TYPE; + } + + @Override + public FeatureSource create(ImportSpec importSpec) { + checkArgument(importSpec.getType().equals(getType())); + return new CsvFileFeatureSource(importSpec); + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/CsvIO.java b/ingestion/src/main/java/feast/source/csv/CsvIO.java similarity index 99% rename from ingestion/src/main/java/feast/ingestion/transform/CsvIO.java rename to ingestion/src/main/java/feast/source/csv/CsvIO.java index 4e8cec196a..e6cf0f9dc9 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/CsvIO.java +++ b/ingestion/src/main/java/feast/source/csv/CsvIO.java @@ -15,7 +15,7 @@ * */ -package feast.ingestion.transform; +package feast.source.csv; import java.io.IOException; import java.io.Serializable; diff --git a/ingestion/src/main/java/feast/source/json/JsonFileFeatureSource.java b/ingestion/src/main/java/feast/source/json/JsonFileFeatureSource.java new file mode 100644 index 0000000000..a38a2e0b8c --- /dev/null +++ b/ingestion/src/main/java/feast/source/json/JsonFileFeatureSource.java @@ -0,0 +1,101 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.source.json; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import feast.options.Options; +import feast.options.OptionsParser; +import feast.source.FeatureSource; +import feast.source.FeatureSourceFactory; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.types.FeatureRowProto.FeatureRow; +import javax.validation.constraints.NotEmpty; +import lombok.AllArgsConstructor; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; + + +/** + * Transform for processing JSON text jsonfiles and that contain JSON serialised FeatureRow + * messages, one per line. + * + *

This transform asserts that the import spec is for only one entity, as all columns must + * have the same entity. + * + *

The output is a PCollection of {@link feast.types.FeatureRowProto.FeatureRow FeatureRows}. + */ +@AllArgsConstructor +public class JsonFileFeatureSource extends FeatureSource { + + public static final String JSON_FILE_FEATURE_SOURCE_TYPE = "file.json"; + + private final ImportSpec importSpec; + + @Override + public PCollection expand(PInput input) { + JsonFileFeatureSourceOptions options = OptionsParser + .parse(importSpec.getOptionsMap(), JsonFileFeatureSourceOptions.class); + PCollection jsonLines = input.getPipeline().apply(TextIO.read().from(options.path)); + return jsonLines.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext context) { + String line = context.element(); + FeatureRow.Builder builder = FeatureRow.newBuilder(); + try { + JsonFormat.parser().merge(line, builder); + context.output(builder.build()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + })); + } + + public static class JsonFileFeatureSourceOptions implements Options { + + @NotEmpty + public String path; + } + + @AutoService(FeatureSourceFactory.class) + public static class Factory implements FeatureSourceFactory { + + @Override + public String getType() { + return JSON_FILE_FEATURE_SOURCE_TYPE; + } + + @Override + public FeatureSource create(ImportSpec importSpec) { + checkArgument(importSpec.getType().equals(getType())); + + return new JsonFileFeatureSource(importSpec); + } + } + +} + diff --git a/ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java b/ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java new file mode 100644 index 0000000000..c163088716 --- /dev/null +++ b/ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java @@ -0,0 +1,108 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.source.kafka; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import feast.options.Options; +import feast.options.OptionsParser; +import feast.source.FeatureSource; +import feast.source.FeatureSourceFactory; +import feast.source.kafka.deserializer.FeatureRowDeserializer; +import feast.source.kafka.deserializer.FeatureRowKeyDeserializer; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FeatureRowProto.FeatureRowKey; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.validation.constraints.NotEmpty; +import lombok.AllArgsConstructor; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.io.kafka.KafkaRecord; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; + +/** + * Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages + * from kafka one or more kafka topics. + */ +@AllArgsConstructor +public class KafkaFeatureSource extends FeatureSource { + + public static final String KAFKA_FEATURE_SOURCE_TYPE = "kafka"; + private ImportSpec importSpec; + + + @Override + public PCollection expand(PInput input) { + checkArgument(importSpec.getType().equals(KAFKA_FEATURE_SOURCE_TYPE)); + + KafkaReadOptions options = + OptionsParser.parse(importSpec.getOptionsMap(), KafkaReadOptions.class); + + List topicsList = new ArrayList<>(Arrays.asList(options.topics.split(","))); + + KafkaIO.Read kafkaIOReader = + KafkaIO.read() + .withBootstrapServers(options.server) + .withTopics(topicsList) + .withKeyDeserializer(FeatureRowKeyDeserializer.class) + .withValueDeserializer(FeatureRowDeserializer.class); + + PCollection> featureRowRecord = + input.getPipeline().apply(kafkaIOReader); + + return + featureRowRecord.apply( + ParDo.of( + new DoFn, FeatureRow>() { + @ProcessElement + public void processElement(ProcessContext processContext) { + KafkaRecord record = processContext.element(); + processContext.output(record.getKV().getValue()); + } + })); + } + + public static class KafkaReadOptions implements Options { + + @NotEmpty + public String server; + @NotEmpty + public String topics; + } + + @AutoService(FeatureSourceFactory.class) + public static class Factory implements FeatureSourceFactory { + + @Override + public String getType() { + return KAFKA_FEATURE_SOURCE_TYPE; + } + + @Override + public FeatureSource create(ImportSpec importSpec) { + checkArgument(importSpec.getType().equals(getType())); + return new KafkaFeatureSource(importSpec); + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java b/ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowDeserializer.java similarity index 97% rename from ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java rename to ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowDeserializer.java index c6ab40831f..e75a17e64f 100644 --- a/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java +++ b/ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowDeserializer.java @@ -15,7 +15,7 @@ * */ -package feast.ingestion.deserializer; +package feast.source.kafka.deserializer; import com.google.protobuf.InvalidProtocolBufferException; import feast.types.FeatureRowProto.FeatureRow; diff --git a/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java b/ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowKeyDeserializer.java similarity index 97% rename from ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java rename to ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowKeyDeserializer.java index 57574d6982..4e77d7c99d 100644 --- a/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java +++ b/ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowKeyDeserializer.java @@ -15,7 +15,7 @@ * */ -package feast.ingestion.deserializer; +package feast.source.kafka.deserializer; import com.google.protobuf.InvalidProtocolBufferException; import feast.types.FeatureRowProto.FeatureRowKey; diff --git a/ingestion/src/main/java/feast/source/pubsub/PubSubFeatureSource.java b/ingestion/src/main/java/feast/source/pubsub/PubSubFeatureSource.java new file mode 100644 index 0000000000..209ba2b5ea --- /dev/null +++ b/ingestion/src/main/java/feast/source/pubsub/PubSubFeatureSource.java @@ -0,0 +1,101 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.source.pubsub; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import feast.options.Options; +import feast.options.OptionsParser; +import feast.source.FeatureSource; +import feast.source.FeatureSourceFactory; +import feast.specs.ImportSpecProto.ImportSpec; +import feast.types.FeatureRowProto.FeatureRow; +import javax.validation.constraints.AssertTrue; +import lombok.Builder; +import lombok.NonNull; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.hadoop.hbase.util.Strings; + + +/** + * Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages + * from Cloud PubSub. + * + *

This transform accepts multiple entities in the import spec and expects no columns to be + * specified as it does not need to construct FeatureRows, merely pass them on. + * + *

Because Feast ingestion is stateless, the message event time is simply the processing time, + * there is no need to override it based on any property of the message. + */ +@Builder +public class PubSubFeatureSource extends FeatureSource { + + public static final String PUBSUB_FEATURE_SOURCE_TYPE = "pubsub"; + + @NonNull + private ImportSpec importSpec; + + @Override + public PCollection expand(PInput input) { + checkArgument(importSpec.getType().equals(PUBSUB_FEATURE_SOURCE_TYPE)); + PubSubReadOptions options = + OptionsParser.parse(importSpec.getOptionsMap(), PubSubReadOptions.class); + + PubsubIO.Read read = readProtos(); + + if (!Strings.isEmpty(options.subscription)) { + read = read.fromSubscription(options.subscription); + } else if (!Strings.isEmpty(options.topic)) { + read = read.fromTopic(options.topic); + } + return input.getPipeline().apply(read); + } + + PubsubIO.Read readProtos() { + return PubsubIO.readProtos(FeatureRow.class); + } + + public static class PubSubReadOptions implements Options { + + public String subscription; + public String topic; + + @AssertTrue(message = "subscription or topic must be set") + boolean isValid() { + return !Strings.isEmpty(subscription) || !Strings.isEmpty(topic); + } + } + + @AutoService(FeatureSourceFactory.class) + public static class Factory implements FeatureSourceFactory { + + @Override + public String getType() { + return PUBSUB_FEATURE_SOURCE_TYPE; + } + + @Override + public FeatureSource create(ImportSpec importSpec) { + checkArgument(importSpec.getType().equals(getType())); + return new PubSubFeatureSource(importSpec); + } + } +} diff --git a/ingestion/src/main/java/feast/storage/FeatureSource.java b/ingestion/src/main/java/feast/storage/FeatureSource.java deleted file mode 100644 index 78f5db5580..0000000000 --- a/ingestion/src/main/java/feast/storage/FeatureSource.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.storage; - -import feast.ingestion.transform.FeatureIO; -import feast.specs.ImportSpecProto.ImportSpec; - -public interface FeatureSource { - FeatureIO.Read create(ImportSpec importSpec); - - String getType(); -} diff --git a/ingestion/src/main/java/feast/storage/bigquery/BigQuerySourceOptions.java b/ingestion/src/main/java/feast/storage/bigquery/BigQuerySourceOptions.java deleted file mode 100644 index aff8cded6d..0000000000 --- a/ingestion/src/main/java/feast/storage/bigquery/BigQuerySourceOptions.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.storage.bigquery; - -import javax.validation.constraints.NotEmpty; -import feast.options.Options; - -public class BigQuerySourceOptions implements Options { - @NotEmpty public String project; - @NotEmpty public String dataset; - @NotEmpty public String table; -} diff --git a/ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java b/ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java index 5384a0744b..b5e039cc9e 100644 --- a/ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java +++ b/ingestion/src/main/java/feast/storage/bigquery/FeatureRowBigQueryIO.java @@ -18,20 +18,16 @@ package feast.storage.bigquery; import com.google.api.services.bigquery.model.TableSchema; -import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.inject.Inject; import feast.ingestion.model.Specs; import feast.ingestion.transform.FeatureIO; import feast.ingestion.transform.SplitFeatures.SingleOutputSplit; -import feast.options.OptionsParser; import feast.specs.EntitySpecProto.EntitySpec; import feast.specs.FeatureSpecProto.FeatureSpec; -import feast.specs.ImportSpecProto.ImportSpec; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import feast.types.FeatureRowProto.FeatureRow; import feast.types.GranularityProto.Granularity; -import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -43,59 +39,12 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.Duration; @Slf4j public class FeatureRowBigQueryIO { - public static Read read(ImportSpec importSpec) { - // TODO: Allow and convert from /projects/{project}/datasets/{dataset}/tables/{table}, to - // {project}:{dataset}.{table} - return new Read(importSpec); - } - - /** - * Transform for processing BigQuery tables and producing FeatureRow messages. - * - *

This transform asserts that the import spec is for only one entity, as all columns must have - * the same entity. The columns names in the import spec will be used for selecting columns from - * the BigQuery table, but it still scans the whole row. - * - *

The output is a PCollection of {@link feast.types.FeatureRowProto.FeatureRow FeatureRows}, - * where each feature and the entity key {@link feast.types.ValueProto.Value Value} in the - * FeatureRow is taken from a column in the BigQuery table and set with the closest type to the - * BigQuery column schema that is available in {@link feast.types.ValueProto.ValueType ValueType}. - * - *

Note a small gotcha is that because Integers and Numerics in BigQuery are 64 bits, these are - * always cast to INT64 and DOUBLE respectively. - * - *

Downstream these will fail validation if the corresponding {@link - * feast.specs.FeatureSpecProto.FeatureSpec FeatureSpec} has a 32 bit type. - */ - @AllArgsConstructor - public static class Read extends FeatureIO.Read { - - private final ImportSpec importSpec; - - @Override - public PCollection expand(PInput input) { - Preconditions.checkArgument(importSpec.getType().equals("bigquery")); - BigQuerySourceOptions options = - OptionsParser.parse(importSpec.getOptionsMap(), BigQuerySourceOptions.class); - - String url = String.format("%s:%s.%s", options.project, options.dataset, options.table); - - Preconditions.checkArgument( - importSpec.getEntitiesCount() == 1, "BigQuery read must have only one entity"); - return input - .getPipeline() - .apply( - BigQueryIO.read(new FeatureRowFromBigQuerySchemaAndRecordFn(importSpec)).from(url)); - } - } - public static class Write extends FeatureIO.Write { private final BigQueryStoreOptions bigQueryOptions; diff --git a/ingestion/src/main/java/feast/storage/file/json/JsonFileFeatureIO.java b/ingestion/src/main/java/feast/storage/file/json/JsonFileFeatureIO.java index 0a2f0d32b2..0ebbbcb540 100644 --- a/ingestion/src/main/java/feast/storage/file/json/JsonFileFeatureIO.java +++ b/ingestion/src/main/java/feast/storage/file/json/JsonFileFeatureIO.java @@ -20,21 +20,14 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.util.JsonFormat; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.NonNull; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; import feast.ingestion.transform.FeatureIO; import feast.storage.file.FileStoreOptions; import feast.storage.file.TextFileFeatureIO; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -import feast.types.FeatureRowProto.FeatureRow; +import lombok.AllArgsConstructor; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; public class JsonFileFeatureIO { @@ -46,43 +39,6 @@ public static Write writeRow(FileStoreOptions options) { return new Write(options, (rowExtended) -> rowExtended.getRow()); } - /** - * Transform for processing JSON text jsonfiles and that contain JSON serialised FeatureRow - * messages, one per line. - * - *

This transform asserts that the import spec is for only one entity, as all columns must have - * the same entity. - * - *

The output is a PCollection of {@link feast.types.FeatureRowProto.FeatureRow - * FeatureRows}. - */ - @Builder - public static class Read extends FeatureIO.Read { - - @NonNull private final JsonFileSourceOptions options; - - @Override - public PCollection expand(PInput input) { - PCollection jsonLines = input.getPipeline().apply(TextIO.read().from(options.path)); - - return jsonLines.apply( - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext context) { - String line = context.element(); - FeatureRow.Builder builder = FeatureRow.newBuilder(); - try { - JsonFormat.parser().merge(line, builder); - context.output(builder.build()); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - } - })); - } - } - @AllArgsConstructor public static class Write extends FeatureIO.Write { diff --git a/ingestion/src/main/java/feast/storage/file/json/JsonFileSourceOptions.java b/ingestion/src/main/java/feast/storage/file/json/JsonFileSourceOptions.java deleted file mode 100644 index 5f805e22bf..0000000000 --- a/ingestion/src/main/java/feast/storage/file/json/JsonFileSourceOptions.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.storage.file.json; - -import javax.validation.constraints.NotEmpty; -import feast.options.Options; - -class JsonFileSourceOptions implements Options { - @NotEmpty String path; -} diff --git a/ingestion/src/main/java/feast/storage/file/json/JsonFileStores.java b/ingestion/src/main/java/feast/storage/file/json/JsonFileStores.java index c6e70d9703..11dda1e3b7 100644 --- a/ingestion/src/main/java/feast/storage/file/json/JsonFileStores.java +++ b/ingestion/src/main/java/feast/storage/file/json/JsonFileStores.java @@ -17,40 +17,20 @@ package feast.storage.file.json; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.auto.service.AutoService; -import lombok.AllArgsConstructor; import feast.ingestion.model.Specs; -import feast.ingestion.transform.FeatureIO.Read; import feast.ingestion.transform.FeatureIO.Write; import feast.options.OptionsParser; -import feast.specs.ImportSpecProto.ImportSpec; import feast.specs.StorageSpecProto.StorageSpec; import feast.storage.ErrorsStore; -import feast.storage.FeatureSource; import feast.storage.ServingStore; import feast.storage.WarehouseStore; import feast.storage.file.FileStoreOptions; +import lombok.AllArgsConstructor; public class JsonFileStores { - private static final String JSON_FILES_TYPE = "file.json"; - - public static class JsonFileFeatureSource implements FeatureSource { - - @Override - public Read create(ImportSpec importSpec) { - checkArgument(importSpec.getType().equals(getType())); - JsonFileSourceOptions options = - OptionsParser.parse(importSpec.getOptionsMap(), JsonFileSourceOptions.class); - return new JsonFileFeatureIO.Read(options); - } - @Override - public String getType() { - return JSON_FILES_TYPE; - } - } + private static final String JSON_FILES_TYPE = "file.json"; @AutoService(WarehouseStore.class) @AllArgsConstructor diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java index 61f44f4d44..4a2fab2c6d 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java @@ -89,9 +89,8 @@ public void testImportCSV() throws IOException { ImportSpec importSpec = ProtoUtil.decodeProtoYaml( "---\n" - + "type: file\n" + + "type: file.csv\n" + "options:\n" - + " format: csv\n" + " # path: to be overwritten in tests\n" + "entities:\n" + " - testEntity\n" @@ -179,7 +178,7 @@ public void testImportCSVUnknownServingStoreError() throws IOException { ImportSpec importSpec = ProtoUtil.decodeProtoYaml( "---\n" - + "type: file\n" + + "type: file.csv\n" + "options:\n" + " format: csv\n" + " # path: to be overwritten in tests\n" @@ -190,7 +189,8 @@ public void testImportCSVUnknownServingStoreError() throws IOException { + " timestampValue: 2018-09-25T00:00:00.000Z\n" + " fields:\n" + " - name: id\n" - + " - featureId: testEntity.none.unknownInt32\n" // Unknown store is not available + + " - featureId: testEntity.none.unknownInt32\n" + // Unknown store is not available + " - featureId: testEntity.none.testString\n" + "\n", ImportSpec.getDefaultInstance()); @@ -217,9 +217,8 @@ public void testImportWithErrors() throws IOException { ImportSpec importSpec = ProtoUtil.decodeProtoYaml( "---\n" - + "type: file\n" + + "type: file.csv\n" + "options:\n" - + " format: csv\n" + " # path: to be overwritten in tests\n" + "entities:\n" + " - testEntity\n" diff --git a/ingestion/src/test/java/feast/storage/bigquery/FeatureRowFromBigQuerySchemaAndRecordFnTest.java b/ingestion/src/test/java/feast/source/bigquery/BQToFeatureRowFnTest.java similarity index 94% rename from ingestion/src/test/java/feast/storage/bigquery/FeatureRowFromBigQuerySchemaAndRecordFnTest.java rename to ingestion/src/test/java/feast/source/bigquery/BQToFeatureRowFnTest.java index 433cdecf56..4c809fb5cc 100644 --- a/ingestion/src/test/java/feast/storage/bigquery/FeatureRowFromBigQuerySchemaAndRecordFnTest.java +++ b/ingestion/src/test/java/feast/source/bigquery/BQToFeatureRowFnTest.java @@ -15,7 +15,7 @@ * */ -package feast.storage.bigquery; +package feast.source.bigquery; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; @@ -26,6 +26,7 @@ import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.common.collect.Lists; import com.google.protobuf.Timestamp; +import feast.source.bigquery.BigQueryToFeatureRowFn; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; import org.joda.time.DateTime; @@ -39,7 +40,7 @@ import feast.specs.ImportSpecProto.Schema; import feast.types.FeatureRowProto.FeatureRow; -public class FeatureRowFromBigQuerySchemaAndRecordFnTest { +public class BQToFeatureRowFnTest { @Test public void testImportSpecFieldsMissingFromBQTable() { // TODO what if a field in the import spec is not in the bq schema @@ -77,7 +78,7 @@ public void testStringEntityKey() { new TableFieldSchema().setName("bq_timestamp").setType(LegacySQLTypeName.TIMESTAMP.name()), new TableFieldSchema().setName("bq_value").setType(LegacySQLTypeName.INTEGER.name()))); SchemaAndRecord schemaAndRecord = new SchemaAndRecord(record, tableSchema); - FeatureRow row = new FeatureRowFromBigQuerySchemaAndRecordFn(importSpec).apply(schemaAndRecord); + FeatureRow row = new BigQueryToFeatureRowFn(importSpec).apply(schemaAndRecord); Assert.assertEquals(now, row.getEventTimestamp()); Assert.assertEquals("abcd", row.getEntityKey()); Assert.assertEquals("testEntity", row.getEntityName()); @@ -120,7 +121,7 @@ public void testInt64EntityKey() { new TableFieldSchema().setName("bq_timestamp").setType(LegacySQLTypeName.TIMESTAMP.name()), new TableFieldSchema().setName("bq_value").setType(LegacySQLTypeName.INTEGER.name()))); SchemaAndRecord schemaAndRecord = new SchemaAndRecord(record, tableSchema); - FeatureRow row = new FeatureRowFromBigQuerySchemaAndRecordFn(importSpec).apply(schemaAndRecord); + FeatureRow row = new BigQueryToFeatureRowFn(importSpec).apply(schemaAndRecord); Assert.assertEquals(now, row.getEventTimestamp()); Assert.assertEquals("1234", row.getEntityKey()); Assert.assertEquals("testEntity", row.getEntityName()); diff --git a/ingestion/src/test/java/feast/storage/bigquery/BigQuerySourceOptionsTest.java b/ingestion/src/test/java/feast/source/bigquery/BigQuerySourceOptionsTest.java similarity index 95% rename from ingestion/src/test/java/feast/storage/bigquery/BigQuerySourceOptionsTest.java rename to ingestion/src/test/java/feast/source/bigquery/BigQuerySourceOptionsTest.java index d2e87dc3ab..27b9842e0d 100644 --- a/ingestion/src/test/java/feast/storage/bigquery/BigQuerySourceOptionsTest.java +++ b/ingestion/src/test/java/feast/source/bigquery/BigQuerySourceOptionsTest.java @@ -15,11 +15,12 @@ * */ -package feast.storage.bigquery; +package feast.source.bigquery; +import feast.options.OptionsParser; +import feast.source.bigquery.BigQueryFeatureSource.BigQuerySourceOptions; import org.junit.Assert; import org.junit.Test; -import feast.options.OptionsParser; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; public class BigQuerySourceOptionsTest { diff --git a/ingestion/src/test/java/feast/ingestion/transform/CsvIOTest.java b/ingestion/src/test/java/feast/source/csv/CsvIOTest.java similarity index 87% rename from ingestion/src/test/java/feast/ingestion/transform/CsvIOTest.java rename to ingestion/src/test/java/feast/source/csv/CsvIOTest.java index aefe5f00e3..5e5217af96 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/CsvIOTest.java +++ b/ingestion/src/test/java/feast/source/csv/CsvIOTest.java @@ -15,18 +15,16 @@ * */ -package feast.ingestion.transform; +package feast.source.csv; import static org.junit.Assert.*; import static org.hamcrest.CoreMatchers.*; import com.google.common.collect.Lists; -import feast.ingestion.transform.CsvIO.CSVLineParser; -import feast.ingestion.transform.CsvIO.StringMap; +import feast.source.csv.CsvIO.CSVLineParser; +import feast.source.csv.CsvIO.StringMap; import java.util.List; import junit.framework.TestCase; -import feast.ingestion.transform.CsvIO.CSVLineParser; -import feast.ingestion.transform.CsvIO.StringMap; public class CsvIOTest extends TestCase { diff --git a/ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java b/ingestion/src/test/java/feast/source/kafka/deserializer/KafkaFeatureRowDeserializerTest.java similarity index 99% rename from ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java rename to ingestion/src/test/java/feast/source/kafka/deserializer/KafkaFeatureRowDeserializerTest.java index 4b9ece362d..3436b8d8b7 100644 --- a/ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java +++ b/ingestion/src/test/java/feast/source/kafka/deserializer/KafkaFeatureRowDeserializerTest.java @@ -15,7 +15,7 @@ * */ -package feast.ingestion.deserializer; +package feast.source.kafka.deserializer; import com.google.protobuf.MessageLite; import feast.types.FeatureRowProto.FeatureRow;