diff --git a/core/src/main/java/feast/core/config/FeastProperties.java b/core/src/main/java/feast/core/config/FeastProperties.java
index eb50728baf..6dad278242 100644
--- a/core/src/main/java/feast/core/config/FeastProperties.java
+++ b/core/src/main/java/feast/core/config/FeastProperties.java
@@ -103,7 +103,7 @@ public static class Runner {
        * Job runner configuration options. See the following for options
        * https://api.docs.feast.dev/grpc/feast.core.pb.html#Runner
        */
-      Map<String, String> options = new HashMap<>();
+      Map<String, Object> options = new HashMap<>();
 
       /**
        * Gets the job runner type as an enum.
diff --git a/core/src/main/java/feast/core/config/JobConfig.java b/core/src/main/java/feast/core/config/JobConfig.java
index 69636963be..30023de064 100644
--- a/core/src/main/java/feast/core/config/JobConfig.java
+++ b/core/src/main/java/feast/core/config/JobConfig.java
@@ -16,11 +16,16 @@
  */
 package feast.core.config;
 
+import com.google.gson.Gson;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.JsonFormat;
 import feast.core.config.FeastProperties.JobProperties;
 import feast.core.job.JobManager;
 import feast.core.job.dataflow.DataflowJobManager;
 import feast.core.job.direct.DirectJobRegistry;
 import feast.core.job.direct.DirectRunnerJobManager;
+import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
+import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -31,6 +36,7 @@
 @Slf4j
 @Configuration
 public class JobConfig {
+  private final Gson gson = new Gson();
 
   /**
    * Get a JobManager according to the runner type and Dataflow configuration.
@@ -39,18 +45,28 @@ public class JobConfig {
    */
   @Bean
   @Autowired
-  public JobManager getJobManager(FeastProperties feastProperties) {
+  public JobManager getJobManager(FeastProperties feastProperties)
+      throws InvalidProtocolBufferException {
 
     JobProperties jobProperties = feastProperties.getJobs();
     FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner();
-    Map<String, String> runnerConfigOptions = runner.getOptions();
+    Map<String, Object> runnerConfigOptions = runner.getOptions();
+    String configJson = gson.toJson(runnerConfigOptions);
+
     FeastProperties.MetricsProperties metrics = jobProperties.getMetrics();
 
     switch (runner.getType()) {
       case DATAFLOW:
-        return new DataflowJobManager(runnerConfigOptions, metrics);
+        DataflowRunnerConfigOptions.Builder dataflowRunnerConfigOptions =
+            DataflowRunnerConfigOptions.newBuilder();
+        JsonFormat.parser().merge(configJson, dataflowRunnerConfigOptions);
+        return new DataflowJobManager(dataflowRunnerConfigOptions.build(), metrics);
       case DIRECT:
-        return new DirectRunnerJobManager(runnerConfigOptions, new DirectJobRegistry(), metrics);
+        DirectRunnerConfigOptions.Builder directRunnerConfigOptions =
+            DirectRunnerConfigOptions.newBuilder();
+        JsonFormat.parser().merge(configJson, directRunnerConfigOptions);
+        return new DirectRunnerJobManager(
+            directRunnerConfigOptions.build(), new DirectJobRegistry(), metrics);
       default:
         throw new IllegalArgumentException("Unsupported runner: " + runner);
     }
diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
index 7b9df0abd5..2c3da255f5 100644
--- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
+++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
@@ -33,12 +33,12 @@
 import feast.core.job.Runner;
 import feast.core.job.option.FeatureSetJsonByteConverter;
 import feast.core.model.*;
-import feast.core.util.TypeConversion;
 import feast.ingestion.ImportJob;
 import feast.ingestion.options.BZip2Compressor;
 import feast.ingestion.options.ImportOptions;
 import feast.ingestion.options.OptionCompressor;
 import feast.proto.core.FeatureSetProto;
+import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
 import feast.proto.core.SourceProto;
 import feast.proto.core.StoreProto;
 import java.io.IOException;
@@ -46,7 +46,6 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -61,21 +60,20 @@ public class DataflowJobManager implements JobManager {
   private final String projectId;
   private final String location;
   private final Dataflow dataflow;
-  private final Map<String, String> defaultOptions;
+  private final DataflowRunnerConfig defaultOptions;
   private final MetricsProperties metrics;
 
   public DataflowJobManager(
-      Map<String, String> runnerConfigOptions, MetricsProperties metricsProperties) {
+      DataflowRunnerConfigOptions runnerConfigOptions, MetricsProperties metricsProperties) {
     this(runnerConfigOptions, metricsProperties, getGoogleCredential());
   }
 
   public DataflowJobManager(
-      Map<String, String> runnerConfigOptions,
+      DataflowRunnerConfigOptions runnerConfigOptions,
       MetricsProperties metricsProperties,
       Credential credential) {
 
-    DataflowRunnerConfig config = new DataflowRunnerConfig(runnerConfigOptions);
-
+    defaultOptions = new DataflowRunnerConfig(runnerConfigOptions);
     Dataflow dataflow = null;
     try {
       dataflow =
@@ -89,11 +87,10 @@ public DataflowJobManager(
       throw new IllegalStateException("Unable to initialize DataflowJobManager", e);
     }
 
-    this.defaultOptions = runnerConfigOptions;
     this.dataflow = dataflow;
     this.metrics = metricsProperties;
-    this.projectId = config.getProject();
-    this.location = config.getRegion();
+    this.projectId = defaultOptions.getProject();
+    this.location = defaultOptions.getRegion();
   }
 
   private static Credential getGoogleCredential() {
@@ -270,9 +267,9 @@ private ImportOptions getPipelineOptions(
       List<FeatureSetProto.FeatureSet> featureSets,
       StoreProto.Store sink,
       boolean update)
-      throws IOException {
-    String[] args = TypeConversion.convertMapToArgs(defaultOptions);
-    ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
+      throws IOException, IllegalAccessException {
+    ImportOptions pipelineOptions =
+        PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class);
 
     OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
         new BZip2Compressor<>(new FeatureSetJsonByteConverter());
diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java b/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java
index 6fe93ca80c..85628d2cd0 100644
--- a/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java
+++ b/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java
@@ -16,9 +16,9 @@
  */
 package feast.core.job.dataflow;
 
-import java.lang.reflect.Field;
-import java.util.Map;
-import java.util.Set;
+import feast.core.job.option.RunnerConfig;
+import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
+import java.util.*;
 import javax.validation.*;
 import javax.validation.constraints.NotBlank;
 import lombok.Getter;
@@ -27,77 +27,65 @@
 /** DataflowRunnerConfig contains configuration fields for the Dataflow job runner. */
 @Getter
 @Setter
-public class DataflowRunnerConfig {
-
-  public DataflowRunnerConfig(Map<String, String> runnerConfigOptions) {
-
-    // Try to find all fields in DataflowRunnerConfig inside the runnerConfigOptions and map it into
-    // this object
-    for (Field field : DataflowRunnerConfig.class.getFields()) {
-      String fieldName = field.getName();
-      try {
-        if (!runnerConfigOptions.containsKey(fieldName)) {
-          continue;
-        }
-        String value = runnerConfigOptions.get(fieldName);
-
-        if (Boolean.class.equals(field.getType())) {
-          field.set(this, Boolean.valueOf(value));
-          continue;
-        }
-        if (field.getType() == Integer.class) {
-          field.set(this, Integer.valueOf(value));
-          continue;
-        }
-        field.set(this, value);
-      } catch (IllegalAccessException e) {
-        throw new RuntimeException(
-            String.format(
-                "Could not successfully convert DataflowRunnerConfig for key: %s", fieldName),
-            e);
-      }
-    }
+public class DataflowRunnerConfig extends RunnerConfig {
+
+  public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
+    this.project = runnerConfigOptions.getProject();
+    this.region = runnerConfigOptions.getRegion();
+    this.zone = runnerConfigOptions.getZone();
+    this.serviceAccount = runnerConfigOptions.getServiceAccount();
+    this.network = runnerConfigOptions.getNetwork();
+    this.subnetwork = runnerConfigOptions.getSubnetwork();
+    this.workerMachineType = runnerConfigOptions.getWorkerMachineType();
+    this.autoscalingAlgorithm = runnerConfigOptions.getAutoscalingAlgorithm();
+    this.usePublicIps = runnerConfigOptions.getUsePublicIps();
+    this.tempLocation = runnerConfigOptions.getTempLocation();
+    this.maxNumWorkers = runnerConfigOptions.getMaxNumWorkers();
+    this.deadLetterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
+    this.labels = runnerConfigOptions.getLabelsMap();
     validate();
   }
 
   /* Project id to use when launching jobs. */
-  @NotBlank public String project;
+  @NotBlank String project;
 
   /* The Google Compute Engine region for creating Dataflow jobs. */
-  @NotBlank public String region;
+  @NotBlank String region;
 
   /* GCP availability zone for operations. */
-  @NotBlank public String zone;
+  @NotBlank String zone;
 
   /* Run the job as a specific service account, instead of the default GCE robot. */
-  public String serviceAccount;
+  String serviceAccount;
 
   /* GCE network for launching workers. */
-  @NotBlank public String network;
+  @NotBlank String network;
 
   /* GCE subnetwork for launching workers. */
-  @NotBlank public String subnetwork;
+  @NotBlank String subnetwork;
 
   /* Machine type to create Dataflow worker VMs as. */
-  public String workerMachineType;
+  String workerMachineType;
 
   /* The autoscaling algorithm to use for the workerpool. */
-  public String autoscalingAlgorithm;
+  String autoscalingAlgorithm;
 
   /* Specifies whether worker pools should be started with public IP addresses. */
-  public Boolean usePublicIps;
+  Boolean usePublicIps;
 
   /**
    * A pipeline level default location for storing temporary files. Support Google Cloud Storage
    * locations, e.g. gs://bucket/object
    */
-  @NotBlank public String tempLocation;
+  @NotBlank String tempLocation;
 
   /* The maximum number of workers to use for the workerpool. */
-  public Integer maxNumWorkers;
+  Integer maxNumWorkers;
 
   /* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
-  public String deadLetterTableSpec;
+  String deadLetterTableSpec;
+
+  Map<String, String> labels;
 
   /** Validates Dataflow runner configuration options */
   public void validate() {
diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java b/core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java
new file mode 100644
index 0000000000..ebd327f2f7
--- /dev/null
+++ b/core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java
@@ -0,0 +1,36 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.core.job.direct;
+
+import feast.core.job.option.RunnerConfig;
+import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
+
+public class DirectRunnerConfig extends RunnerConfig {
+  /**
+   * Controls the amount of target parallelism the DirectRunner will use. Defaults to the greater of
+   * the number of available processors and 3. Must be a value greater than zero.
+   */
+  Integer targetParallelism;
+
+  /* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
+  String deadletterTableSpec;
+
+  public DirectRunnerConfig(DirectRunnerConfigOptions runnerConfigOptions) {
+    this.deadletterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
+    this.targetParallelism = runnerConfigOptions.getTargetParallelism();
+  }
+}
diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java
index 2e2b43047e..715adbdd43 100644
--- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java
+++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java
@@ -27,18 +27,17 @@
 import feast.core.model.Job;
 import feast.core.model.JobStatus;
 import feast.core.model.Project;
-import feast.core.util.TypeConversion;
 import feast.ingestion.ImportJob;
 import feast.ingestion.options.BZip2Compressor;
 import feast.ingestion.options.ImportOptions;
 import feast.ingestion.options.OptionCompressor;
 import feast.proto.core.FeatureSetProto;
+import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
 import feast.proto.core.StoreProto;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.beam.runners.direct.DirectRunner;
 import org.apache.beam.sdk.PipelineResult;
@@ -49,15 +48,15 @@ public class DirectRunnerJobManager implements JobManager {
 
   private final Runner RUNNER_TYPE = Runner.DIRECT;
 
-  protected Map<String, String> defaultOptions;
+  private DirectRunnerConfig defaultOptions;
   private final DirectJobRegistry jobs;
   private MetricsProperties metrics;
 
   public DirectRunnerJobManager(
-      Map<String, String> defaultOptions,
+      DirectRunnerConfigOptions directRunnerConfigOptions,
       DirectJobRegistry jobs,
       MetricsProperties metricsProperties) {
-    this.defaultOptions = defaultOptions;
+    this.defaultOptions = new DirectRunnerConfig(directRunnerConfigOptions);
     this.jobs = jobs;
     this.metrics = metricsProperties;
   }
@@ -95,9 +94,9 @@ public Job startJob(Job job) {
 
   private ImportOptions getPipelineOptions(
       String jobName, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink)
-      throws IOException {
-    String[] args = TypeConversion.convertMapToArgs(defaultOptions);
-    ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
+      throws IOException, IllegalAccessException {
+    ImportOptions pipelineOptions =
+        PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class);
 
     OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
         new BZip2Compressor<>(new FeatureSetJsonByteConverter());
diff --git a/core/src/main/java/feast/core/job/option/RunnerConfig.java b/core/src/main/java/feast/core/job/option/RunnerConfig.java
new file mode 100644
index 0000000000..4b937074a3
--- /dev/null
+++ b/core/src/main/java/feast/core/job/option/RunnerConfig.java
@@ -0,0 +1,75 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.core.job.option;
+
+import feast.core.util.TypeConversion;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Value class containing the application-default configuration for a runner. When a job is started
+ * by core, all fields in the object will be converted into --key=value args to seed the beam
+ * pipeline options.
+ */
+public abstract class RunnerConfig {
+
+  /**
+   * Converts the fields in this class to a list of --key=value args to be passed to a {@link
+   * org.apache.beam.sdk.options.PipelineOptionsFactory}.
+   *
+   * <p>Ignores values that are proto-default (e.g. empty string, 0).
+   *
+   * @return Array of string args in the format --key=value.
+   * @throws IllegalAccessException
+   */
+  public String[] toArgs() throws IllegalAccessException {
+    List<String> args = new ArrayList<>();
+    for (Field field : this.getClass().getFields()) {
+      if (field.get(this) == null) {
+        continue;
+      }
+      Class<?> type = field.getType();
+      if (Map.class.equals(type)) {
+        String jsonString =
+            TypeConversion.convertMapToJsonString((Map<String, String>) field.get(this));
+        args.add(String.format("--%s=%s", field.getName(), jsonString));
+        continue;
+      }
+
+      if (String.class.equals(type)) {
+        String val = (String) field.get(this);
+        if (!val.equals("")) {
+          args.add(String.format("--%s=%s", field.getName(), val));
+        }
+        continue;
+      }
+
+      if (Integer.class.equals(type)) {
+        Integer val = (Integer) field.get(this);
+        if (val != 0) {
+          args.add(String.format("--%s=%d", field.getName(), val));
+        }
+        continue;
+      }
+
+      args.add(String.format("--%s=%s", field.getName(), field.get(this)));
+    }
+    return args.toArray(String[]::new);
+  }
+}
diff --git a/core/src/main/java/feast/core/util/TypeConversion.java b/core/src/main/java/feast/core/util/TypeConversion.java
index 6ee990fc1c..e6b7ef33cb 100644
--- a/core/src/main/java/feast/core/util/TypeConversion.java
+++ b/core/src/main/java/feast/core/util/TypeConversion.java
@@ -16,12 +16,10 @@
  */
 package feast.core.util;
 
-import com.google.common.base.Strings;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import java.lang.reflect.Type;
 import java.util.*;
-import java.util.Map.Entry;
 
 public class TypeConversion {
   private static Gson gson = new Gson();
@@ -72,18 +70,4 @@ public static Map<String, String> convertJsonStringToMap(String jsonString) {
   public static String convertMapToJsonString(Map<String, String> map) {
     return gson.toJson(map);
   }
-
-  /**
-   * Convert a map of key value pairs to a array of java arguments in format --key=value
-   *
-   * @param map
-   * @return array of string arguments
-   */
-  public static String[] convertMapToArgs(Map<String, String> map) {
-    List<String> args = new ArrayList<>();
-    for (Entry<String, String> arg : map.entrySet()) {
-      args.add(Strings.lenientFormat("--%s=%s", arg.getKey(), arg.getValue()));
-    }
-    return args.toArray(new String[] {});
-  }
 }
diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java
index 632c9d22a2..ea9caa91ff 100644
--- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java
+++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java
@@ -40,6 +40,8 @@
 import feast.proto.core.FeatureSetProto;
 import feast.proto.core.FeatureSetProto.FeatureSetMeta;
 import feast.proto.core.FeatureSetProto.FeatureSetSpec;
+import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
+import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions.Builder;
 import feast.proto.core.SourceProto;
 import feast.proto.core.SourceProto.KafkaSourceConfig;
 import feast.proto.core.SourceProto.SourceType;
@@ -49,9 +51,7 @@
 import feast.proto.core.StoreProto.Store.Subscription;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.sdk.PipelineResult.State;
@@ -70,19 +70,21 @@ public class DataflowJobManagerTest {
 
   @Mock private Dataflow dataflow;
 
-  private Map<String, String> defaults;
+  private DataflowRunnerConfigOptions defaults;
   private DataflowJobManager dfJobManager;
 
   @Before
   public void setUp() {
     initMocks(this);
-    defaults = new HashMap<>();
-    defaults.put("project", "project");
-    defaults.put("region", "region");
-    defaults.put("zone", "zone");
-    defaults.put("tempLocation", "tempLocation");
-    defaults.put("network", "network");
-    defaults.put("subnetwork", "subnetwork");
+    Builder optionsBuilder = DataflowRunnerConfigOptions.newBuilder();
+    optionsBuilder.setProject("project");
+    optionsBuilder.setRegion("region");
+    optionsBuilder.setZone("zone");
+    optionsBuilder.setTempLocation("tempLocation");
+    optionsBuilder.setNetwork("network");
+    optionsBuilder.setSubnetwork("subnetwork");
+    optionsBuilder.putLabels("orchestrator", "feast");
+    defaults = optionsBuilder.build();
     MetricsProperties metricsProperties = new MetricsProperties();
     metricsProperties.setEnabled(false);
     Credential credential = null;
@@ -137,6 +139,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {
     expectedPipelineOptions.setRegion("region");
     expectedPipelineOptions.setUpdate(false);
     expectedPipelineOptions.setAppName("DataflowJobManager");
+    expectedPipelineOptions.setLabels(defaults.getLabelsMap());
     expectedPipelineOptions.setJobName(jobName);
     expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store)));
 
diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java
index 42d3189a73..0128f5aa0b 100644
--- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java
+++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java
@@ -42,6 +42,7 @@
 import feast.ingestion.options.OptionCompressor;
 import feast.proto.core.FeatureSetProto;
 import feast.proto.core.FeatureSetProto.FeatureSetSpec;
+import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
 import feast.proto.core.SourceProto;
 import feast.proto.core.SourceProto.KafkaSourceConfig;
 import feast.proto.core.SourceProto.SourceType;
@@ -51,9 +52,7 @@
 import feast.proto.core.StoreProto.Store.Subscription;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import org.apache.beam.runners.direct.DirectRunner;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -71,12 +70,12 @@ public class DirectRunnerJobManagerTest {
   @Mock private DirectJobRegistry directJobRegistry;
 
   private DirectRunnerJobManager drJobManager;
-  private Map<String, String> defaults;
+  private DirectRunnerConfigOptions defaults;
 
   @Before
   public void setUp() {
     initMocks(this);
-    defaults = new HashMap<>();
+    defaults = DirectRunnerConfigOptions.newBuilder().setTargetParallelism(1).build();
     MetricsProperties metricsProperties = new MetricsProperties();
     metricsProperties.setEnabled(false);
 
@@ -123,7 +122,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
     expectedPipelineOptions.setAppName("DirectRunnerJobManager");
     expectedPipelineOptions.setRunner(DirectRunner.class);
     expectedPipelineOptions.setBlockOnRun(false);
-    expectedPipelineOptions.setProject("");
+    expectedPipelineOptions.setTargetParallelism(1);
     expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store)));
     expectedPipelineOptions.setProject("");
 
diff --git a/core/src/test/java/feast/core/util/TypeConversionTest.java b/core/src/test/java/feast/core/util/TypeConversionTest.java
index 02f0a7cee4..c44bf50129 100644
--- a/core/src/test/java/feast/core/util/TypeConversionTest.java
+++ b/core/src/test/java/feast/core/util/TypeConversionTest.java
@@ -74,16 +74,4 @@ public void convertMapToJsonStringShouldReturnEmptyJsonForAnEmptyMap() {
     Map<String, String> input = new HashMap<>();
     assertThat(TypeConversion.convertMapToJsonString(input), equalTo("{}"));
   }
-
-  @Test
-  public void convertJsonStringToArgsShouldReturnCorrectListOfArgs() {
-    Map<String, String> input = new HashMap<>();
-    input.put("key", "value");
-    input.put("key2", "value2");
-
-    String[] expected = new String[] {"--key=value", "--key2=value2"};
-    String[] actual = TypeConversion.convertMapToArgs(input);
-    assertThat(actual.length, equalTo(expected.length));
-    assertTrue(Arrays.asList(actual).containsAll(Arrays.asList(expected)));
-  }
 }
diff --git a/protos/feast/core/Runner.proto b/protos/feast/core/Runner.proto
index 544972286d..91c1e99485 100644
--- a/protos/feast/core/Runner.proto
+++ b/protos/feast/core/Runner.proto
@@ -70,4 +70,7 @@ message DataflowRunnerConfigOptions {
 
     /* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
     string deadLetterTableSpec = 12;
+
+    /* Labels to apply to the dataflow job */
+    map<string, string> labels = 13;
 }
\ No newline at end of file