Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests for transformation service integration in java feature server #2236

Merged
merged 2 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions java/serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
Expand Down Expand Up @@ -141,6 +146,11 @@
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<!--compile "com.google.protobuf:protobuf-java-util:${protobuf.version}"-->
<dependency>
<groupId>com.google.protobuf</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,38 +147,46 @@ public TracingProperties getTracing() {
public LoggingProperties getLogging() {
return logging;
}
}

private FeastProperties feast;
private String gcpProject;

public void setFeast(FeastProperties feast) {
this.feast = feast;
}
public String getGcpProject() {
return gcpProject;
}

public FeastProperties getFeast() {
return feast;
}
public void setGcpProject(String gcpProject) {
this.gcpProject = gcpProject;
}

private String gcpProject;
public void setAwsRegion(String awsRegion) {
this.awsRegion = awsRegion;
}

public String getGcpProject() {
return gcpProject;
}
private String awsRegion;

public void setAwsRegion(String awsRegion) {
this.awsRegion = awsRegion;
}
public String getAwsRegion() {
return awsRegion;
}

private String transformationServiceEndpoint;

private String awsRegion;
public String getTransformationServiceEndpoint() {
return transformationServiceEndpoint;
}

public String getAwsRegion() {
return awsRegion;
public void setTransformationServiceEndpoint(String transformationServiceEndpoint) {
this.transformationServiceEndpoint = transformationServiceEndpoint;
}
}

private String transformationServiceEndpoint;
private FeastProperties feast;

public String getTransformationServiceEndpoint() {
return transformationServiceEndpoint;
public void setFeast(FeastProperties feast) {
this.feast = feast;
}

public FeastProperties getFeast() {
return feast;
}

/** Store configuration class for database that this Feast Serving uses. */
Expand Down Expand Up @@ -263,6 +271,10 @@ public static class Server {
public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}
}

public static class GrpcServer {
Expand All @@ -271,6 +283,10 @@ public static class GrpcServer {
public Server getServer() {
return server;
}

public void setServer(Server server) {
this.server = server;
}
}

public static class RestServer {
Expand All @@ -279,6 +295,10 @@ public static class RestServer {
public Server getServer() {
return server;
}

public void setServer(Server server) {
this.server = server;
}
}

private GrpcServer grpc;
Expand All @@ -288,10 +308,18 @@ public GrpcServer getGrpc() {
return grpc;
}

public void setGrpc(GrpcServer grpc) {
this.grpc = grpc;
}

public RestServer getRest() {
return rest;
}

public void setRest(RestServer rest) {
this.rest = rest;
}

public enum StoreType {
REDIS,
REDIS_CLUSTER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public class RegistryConfig extends AbstractModule {
@Provides
Storage googleStorage(ApplicationProperties applicationProperties) {
return StorageOptions.newBuilder()
.setProjectId(applicationProperties.getGcpProject())
.setProjectId(applicationProperties.getFeast().getGcpProject())
.build()
.getService();
}

@Provides
public AmazonS3 awsStorage(ApplicationProperties applicationProperties) {
return AmazonS3ClientBuilder.standard()
.withRegion(applicationProperties.getAwsRegion())
.withRegion(applicationProperties.getFeast().getAwsRegion())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public ServingServiceV2 registryBasedServingServiceV2(

log.info("Working Directory = " + System.getProperty("user.dir"));

final String transformationServiceEndpoint =
applicationProperties.getTransformationServiceEndpoint();
final OnlineTransformationService onlineTransformationService =
new OnlineTransformationService(transformationServiceEndpoint, registryRepository);
new OnlineTransformationService(
applicationProperties.getFeast().getTransformationServiceEndpoint(),
registryRepository);

servingService =
new OnlineServingServiceV2(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
import feast.proto.serving.ServingAPIProto;
import feast.proto.serving.ServingServiceGrpc;
import feast.serving.service.ServingServiceV2;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import org.slf4j.Logger;

public class OnlineServingGrpcServiceV2 extends ServingServiceGrpc.ServingServiceImplBase {
private final ServingServiceV2 servingServiceV2;
private static final Logger log =
org.slf4j.LoggerFactory.getLogger(OnlineServingGrpcServiceV2.class);

@Inject
OnlineServingGrpcServiceV2(ServingServiceV2 servingServiceV2) {
Expand All @@ -34,15 +38,27 @@ public class OnlineServingGrpcServiceV2 extends ServingServiceGrpc.ServingServic
public void getFeastServingInfo(
ServingAPIProto.GetFeastServingInfoRequest request,
StreamObserver<ServingAPIProto.GetFeastServingInfoResponse> responseObserver) {
responseObserver.onNext(this.servingServiceV2.getFeastServingInfo(request));
responseObserver.onCompleted();
try {
responseObserver.onNext(this.servingServiceV2.getFeastServingInfo(request));
responseObserver.onCompleted();
} catch (RuntimeException e) {
log.warn("Failed to get Serving Info", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void getOnlineFeatures(
ServingAPIProto.GetOnlineFeaturesRequest request,
StreamObserver<ServingAPIProto.GetOnlineFeaturesResponse> responseObserver) {
responseObserver.onNext(this.servingServiceV2.getOnlineFeatures(request));
responseObserver.onCompleted();
try {
responseObserver.onNext(this.servingServiceV2.getOnlineFeatures(request));
responseObserver.onCompleted();
} catch (RuntimeException e) {
log.warn("Failed to get Online Features", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ public ServingAPIProto.GetOnlineFeaturesResponse getOnlineFeatures(
// Pair from extractRequestDataFeatureNamesAndOnDemandFeatureInputs.
// Currently, we can retrieve context variables directly from GetOnlineFeaturesRequest.
List<FeatureReferenceV2> onDemandFeatureInputs =
this.onlineTransformationService
.extractRequestDataFeatureNamesAndOnDemandFeatureInputs(onDemandFeatureReferences)
.getRight();
this.onlineTransformationService.extractOnDemandFeaturesDependencies(
onDemandFeatureReferences);

// Add on demand feature inputs to list of feature references to retrieve.
for (FeatureReferenceV2 onDemandFeatureInput : onDemandFeatureInputs) {
Expand Down Expand Up @@ -284,7 +283,12 @@ private void populateOnDemandFeatures(
valueList.add(features.get(rowIdx).get(featureIdx).getFeatureValue(valueType));
}

onDemandContext.add(Pair.of(Feature.getFeatureReference(featureReference), valueList));
onDemandContext.add(
Pair.of(
String.format(
"%s__%s",
featureReference.getFeatureViewName(), featureReference.getFeatureName()),
valueList));
}
// Serialize the augmented values.
ValueType transformationInput =
Expand Down
Loading