diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index ccaf3faade..28fee83132 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -155,18 +155,16 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti FeatureSink featureSink = getFeatureSink(store); sinkReadiness = sinkReadiness.and(featureSink.prepareWrite(featureSetSpecs)); + PCollection rowsForStore = + storeAllocatedRows.get(storeTags.get(store)).setCoder(ProtoCoder.of(FeatureRow.class)); // Step 5. Write metrics of successfully validated rows - validatedRows - .get(FEATURE_ROW_OUT) - .apply("WriteInflightMetrics", WriteInflightMetricsTransform.create(store.getName())); + rowsForStore.apply( + "WriteInflightMetrics", WriteInflightMetricsTransform.create(store.getName())); // Step 6. Write FeatureRow to the corresponding Store. WriteResult writeFeatureRows = - storeAllocatedRows - .get(storeTags.get(store)) - .setCoder(ProtoCoder.of(FeatureRow.class)) - .apply("WriteFeatureRowToStore", featureSink.writer()); + rowsForStore.apply("WriteFeatureRowToStore", featureSink.writer()); // Step 7. Write FailedElements to a dead letter table in BigQuery. if (options.getDeadLetterTableSpec() != null) {