Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor runner configuration, add labels to dataflow options #718

Merged
merged 4 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public static class Runner {
* Job runner configuration options. See the following for options
* https://api.docs.feast.dev/grpc/feast.core.pb.html#Runner
*/
Map<String, String> options = new HashMap<>();
Map<String, Object> options = new HashMap<>();

/**
* Gets the job runner type as an enum.
Expand Down
24 changes: 20 additions & 4 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
*/
package feast.core.config;

import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.job.JobManager;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.direct.DirectJobRegistry;
import feast.core.job.direct.DirectRunnerJobManager;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -31,6 +36,7 @@
@Slf4j
@Configuration
public class JobConfig {
private final Gson gson = new Gson();

/**
* Get a JobManager according to the runner type and Dataflow configuration.
Expand All @@ -39,18 +45,28 @@ public class JobConfig {
*/
@Bean
@Autowired
public JobManager getJobManager(FeastProperties feastProperties) {
public JobManager getJobManager(FeastProperties feastProperties)
throws InvalidProtocolBufferException {

JobProperties jobProperties = feastProperties.getJobs();
FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner();
Map<String, String> runnerConfigOptions = runner.getOptions();
Map<String, Object> runnerConfigOptions = runner.getOptions();
String configJson = gson.toJson(runnerConfigOptions);

FeastProperties.MetricsProperties metrics = jobProperties.getMetrics();

switch (runner.getType()) {
case DATAFLOW:
return new DataflowJobManager(runnerConfigOptions, metrics);
DataflowRunnerConfigOptions.Builder dataflowRunnerConfigOptions =
DataflowRunnerConfigOptions.newBuilder();
JsonFormat.parser().merge(configJson, dataflowRunnerConfigOptions);
return new DataflowJobManager(dataflowRunnerConfigOptions.build(), metrics);
case DIRECT:
return new DirectRunnerJobManager(runnerConfigOptions, new DirectJobRegistry(), metrics);
DirectRunnerConfigOptions.Builder directRunnerConfigOptions =
DirectRunnerConfigOptions.newBuilder();
JsonFormat.parser().merge(configJson, directRunnerConfigOptions);
return new DirectRunnerJobManager(
directRunnerConfigOptions.build(), new DirectJobRegistry(), metrics);
default:
throw new IllegalArgumentException("Unsupported runner: " + runner);
}
Expand Down
23 changes: 10 additions & 13 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,19 @@
import feast.core.job.Runner;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.*;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import feast.proto.core.SourceProto;
import feast.proto.core.StoreProto;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
Expand All @@ -61,21 +60,20 @@ public class DataflowJobManager implements JobManager {
private final String projectId;
private final String location;
private final Dataflow dataflow;
private final Map<String, String> defaultOptions;
private final DataflowRunnerConfig defaultOptions;
private final MetricsProperties metrics;

public DataflowJobManager(
Map<String, String> runnerConfigOptions, MetricsProperties metricsProperties) {
DataflowRunnerConfigOptions runnerConfigOptions, MetricsProperties metricsProperties) {
this(runnerConfigOptions, metricsProperties, getGoogleCredential());
}

public DataflowJobManager(
Map<String, String> runnerConfigOptions,
DataflowRunnerConfigOptions runnerConfigOptions,
MetricsProperties metricsProperties,
Credential credential) {

DataflowRunnerConfig config = new DataflowRunnerConfig(runnerConfigOptions);

defaultOptions = new DataflowRunnerConfig(runnerConfigOptions);
Dataflow dataflow = null;
try {
dataflow =
Expand All @@ -89,11 +87,10 @@ public DataflowJobManager(
throw new IllegalStateException("Unable to initialize DataflowJobManager", e);
}

this.defaultOptions = runnerConfigOptions;
this.dataflow = dataflow;
this.metrics = metricsProperties;
this.projectId = config.getProject();
this.location = config.getRegion();
this.projectId = defaultOptions.getProject();
this.location = defaultOptions.getRegion();
}

private static Credential getGoogleCredential() {
Expand Down Expand Up @@ -270,9 +267,9 @@ private ImportOptions getPipelineOptions(
List<FeatureSetProto.FeatureSet> featureSets,
StoreProto.Store sink,
boolean update)
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
throws IOException, IllegalAccessException {
ImportOptions pipelineOptions =
PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class);

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package feast.core.job.dataflow;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.Set;
import feast.core.job.option.RunnerConfig;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import java.util.*;
import javax.validation.*;
import javax.validation.constraints.NotBlank;
import lombok.Getter;
Expand All @@ -27,77 +27,65 @@
/** DataflowRunnerConfig contains configuration fields for the Dataflow job runner. */
@Getter
@Setter
public class DataflowRunnerConfig {

public DataflowRunnerConfig(Map<String, String> runnerConfigOptions) {

// Try to find all fields in DataflowRunnerConfig inside the runnerConfigOptions and map it into
// this object
for (Field field : DataflowRunnerConfig.class.getFields()) {
String fieldName = field.getName();
try {
if (!runnerConfigOptions.containsKey(fieldName)) {
continue;
}
String value = runnerConfigOptions.get(fieldName);

if (Boolean.class.equals(field.getType())) {
field.set(this, Boolean.valueOf(value));
continue;
}
if (field.getType() == Integer.class) {
field.set(this, Integer.valueOf(value));
continue;
}
field.set(this, value);
} catch (IllegalAccessException e) {
throw new RuntimeException(
String.format(
"Could not successfully convert DataflowRunnerConfig for key: %s", fieldName),
e);
}
}
public class DataflowRunnerConfig extends RunnerConfig {

public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.project = runnerConfigOptions.getProject();
this.region = runnerConfigOptions.getRegion();
this.zone = runnerConfigOptions.getZone();
this.serviceAccount = runnerConfigOptions.getServiceAccount();
this.network = runnerConfigOptions.getNetwork();
this.subnetwork = runnerConfigOptions.getSubnetwork();
this.workerMachineType = runnerConfigOptions.getWorkerMachineType();
this.autoscalingAlgorithm = runnerConfigOptions.getAutoscalingAlgorithm();
this.usePublicIps = runnerConfigOptions.getUsePublicIps();
this.tempLocation = runnerConfigOptions.getTempLocation();
this.maxNumWorkers = runnerConfigOptions.getMaxNumWorkers();
this.deadLetterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
this.labels = runnerConfigOptions.getLabelsMap();
validate();
}

/* Project id to use when launching jobs. */
@NotBlank public String project;
@NotBlank String project;

/* The Google Compute Engine region for creating Dataflow jobs. */
@NotBlank public String region;
@NotBlank String region;

/* GCP availability zone for operations. */
@NotBlank public String zone;
@NotBlank String zone;

/* Run the job as a specific service account, instead of the default GCE robot. */
public String serviceAccount;
String serviceAccount;

/* GCE network for launching workers. */
@NotBlank public String network;
@NotBlank String network;

/* GCE subnetwork for launching workers. */
@NotBlank public String subnetwork;
@NotBlank String subnetwork;

/* Machine type to create Dataflow worker VMs as. */
public String workerMachineType;
String workerMachineType;

/* The autoscaling algorithm to use for the workerpool. */
public String autoscalingAlgorithm;
String autoscalingAlgorithm;

/* Specifies whether worker pools should be started with public IP addresses. */
public Boolean usePublicIps;
Boolean usePublicIps;

/**
* A pipeline level default location for storing temporary files. Support Google Cloud Storage
* locations, e.g. gs://bucket/object
*/
@NotBlank public String tempLocation;
@NotBlank String tempLocation;

/* The maximum number of workers to use for the workerpool. */
public Integer maxNumWorkers;
Integer maxNumWorkers;

/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
public String deadLetterTableSpec;
String deadLetterTableSpec;

Map<String, String> labels;

/** Validates Dataflow runner configuration options */
public void validate() {
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job.direct;

import feast.core.job.option.RunnerConfig;
import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;

public class DirectRunnerConfig extends RunnerConfig {
/**
* Controls the amount of target parallelism the DirectRunner will use. Defaults to the greater of
* the number of available processors and 3. Must be a value greater than zero.
*/
Integer targetParallelism;

/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
String deadletterTableSpec;

public DirectRunnerConfig(DirectRunnerConfigOptions runnerConfigOptions) {
this.deadletterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
this.targetParallelism = runnerConfigOptions.getTargetParallelism();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,17 @@
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Project;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
import feast.proto.core.StoreProto;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.PipelineResult;
Expand All @@ -49,15 +48,15 @@ public class DirectRunnerJobManager implements JobManager {

private final Runner RUNNER_TYPE = Runner.DIRECT;

protected Map<String, String> defaultOptions;
private DirectRunnerConfig defaultOptions;
private final DirectJobRegistry jobs;
private MetricsProperties metrics;

public DirectRunnerJobManager(
Map<String, String> defaultOptions,
DirectRunnerConfigOptions directRunnerConfigOptions,
DirectJobRegistry jobs,
MetricsProperties metricsProperties) {
this.defaultOptions = defaultOptions;
this.defaultOptions = new DirectRunnerConfig(directRunnerConfigOptions);
this.jobs = jobs;
this.metrics = metricsProperties;
}
Expand Down Expand Up @@ -95,9 +94,9 @@ public Job startJob(Job job) {

private ImportOptions getPipelineOptions(
String jobName, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink)
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
throws IOException, IllegalAccessException {
ImportOptions pipelineOptions =
PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class);

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
Expand Down
Loading