Skip to content

Commit

Permalink
Support of GC and S3 storages for registry in Java Feature Server (#2043
Browse files Browse the repository at this point in the history
)

* gs and s3 storages for registry

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* some cleanup

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex authored Nov 18, 2021
1 parent 73af2fa commit 2729f17
Show file tree
Hide file tree
Showing 23 changed files with 788 additions and 270 deletions.
27 changes: 26 additions & 1 deletion java/serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,31 @@
<version>1.6.6</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.118.0</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-nio</artifactId>
<version>0.123.10</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.110</version>
</dependency>

<dependency>
<groupId>com.adobe.testing</groupId>
<artifactId>s3mock-testcontainers</artifactId>
<version>2.2.3</version>
<scope>test</scope>
</dependency>

<!--testCompile "io.grpc:grpc-testing:${grpc.version}"-->
<dependency>
Expand Down Expand Up @@ -310,7 +335,7 @@
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>3.0.0</version>
<version>4.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,36 @@ public void setRegistry(final String registry) {
this.registry = registry;
}

private int registryRefreshInterval;

public int getRegistryRefreshInterval() {
return registryRefreshInterval;
}

public void setRegistryRefreshInterval(final int registryRefreshInterval) {
this.registryRefreshInterval = registryRefreshInterval;
}

private String gcpProject;

public String getGcpProject() {
return gcpProject;
}

public void setGcpProject(final String gcpProject) {
this.gcpProject = gcpProject;
}

private String awsRegion;

public String getAwsRegion() {
return awsRegion;
}

public void setAwsRegion(final String awsRegion) {
this.awsRegion = awsRegion;
}

private String transformationServiceEndpoint;

public String getTransformationServiceEndpoint() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.serving.config;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import feast.serving.registry.*;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Optional;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

@Configuration
public class RegistryConfig {
@Bean
@Lazy
Storage googleStorage(FeastProperties feastProperties) {
return StorageOptions.newBuilder()
.setProjectId(feastProperties.getGcpProject())
.build()
.getService();
}

@Bean
@Lazy
AmazonS3 awsStorage(FeastProperties feastProperties) {
return AmazonS3ClientBuilder.standard().withRegion(feastProperties.getAwsRegion()).build();
}

@Bean
RegistryFile registryFile(FeastProperties feastProperties, ApplicationContext context) {

String registryPath = feastProperties.getRegistry();
Optional<String> scheme = Optional.ofNullable(URI.create(registryPath).getScheme());

switch (scheme.orElseGet(() -> "")) {
case "gs":
return new GSRegistryFile(context.getBean(Storage.class), registryPath);
case "s3":
return new S3RegistryFile(context.getBean(AmazonS3.class), registryPath);
case "":
case "file":
return new LocalRegistryFile(Paths.get(registryPath));
default:
throw new RuntimeException("Registry storage %s is unsupported");
}
}

@Bean
RegistryRepository registryRepository(
RegistryFile registryFile, FeastProperties feastProperties) {
return new RegistryRepository(registryFile, feastProperties.getRegistryRefreshInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
*/
package feast.serving.config;

import feast.serving.registry.LocalRegistryRepo;
import feast.serving.registry.*;
import feast.serving.service.OnlineServingServiceV2;
import feast.serving.service.OnlineTransformationService;
import feast.serving.service.ServingServiceV2;
import feast.serving.specs.FeatureSpecRetriever;
import feast.serving.specs.RegistryFeatureSpecRetriever;
import feast.storage.api.retriever.OnlineRetrieverV2;
import feast.storage.connectors.redis.retriever.*;
import io.opentracing.Tracer;
import java.nio.file.Paths;
import org.slf4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -36,7 +33,7 @@ public class ServingServiceConfigV2 {

@Bean
public ServingServiceV2 registryBasedServingServiceV2(
FeastProperties feastProperties, Tracer tracer) {
FeastProperties feastProperties, RegistryRepository registryRepository, Tracer tracer) {
final ServingServiceV2 servingService;
final FeastProperties.Store store = feastProperties.getActiveStore();

Expand All @@ -56,23 +53,19 @@ public ServingServiceV2 registryBasedServingServiceV2(
default:
throw new RuntimeException(
String.format(
"Unable to identify online store type: %s for Regsitry Backed Serving Service",
"Unable to identify online store type: %s for Registry Backed Serving Service",
store.getType()));
}

final FeatureSpecRetriever featureSpecRetriever;
log.info("Created RegistryFeatureSpecRetriever");
log.info("Working Directory = " + System.getProperty("user.dir"));
final LocalRegistryRepo repo = new LocalRegistryRepo(Paths.get(feastProperties.getRegistry()));
featureSpecRetriever = new RegistryFeatureSpecRetriever(repo);

final String transformationServiceEndpoint = feastProperties.getTransformationServiceEndpoint();
final OnlineTransformationService onlineTransformationService =
new OnlineTransformationService(transformationServiceEndpoint, featureSpecRetriever);
new OnlineTransformationService(transformationServiceEndpoint, registryRepository);

servingService =
new OnlineServingServiceV2(
retrieverV2, tracer, featureSpecRetriever, onlineTransformationService);
retrieverV2, tracer, registryRepository, onlineTransformationService);

return servingService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import net.devh.boot.grpc.server.service.GrpcService;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.AccessDeniedException;

@GrpcService(
interceptors = {
Expand Down Expand Up @@ -89,19 +88,13 @@ public void getOnlineFeaturesV2(
responseObserver.onNext(onlineFeatures);
responseObserver.onCompleted();
} catch (SpecRetrievalException e) {
log.error("Failed to retrieve specs in SpecService", e);
log.error("Failed to retrieve specs from Registry", e);
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
} catch (AccessDeniedException e) {
log.info(String.format("User prevented from accessing one of the projects in request"));
responseObserver.onError(
Status.PERMISSION_DENIED
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
} catch (Exception e) {
log.warn("Failed to get Online Features", e);
responseObserver.onError(e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.serving.registry;

import com.google.cloud.storage.*;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.proto.core.RegistryProto;
import java.util.Optional;

public class GSRegistryFile implements RegistryFile {
private Blob blob;

public GSRegistryFile(Storage storage, String url) {
blob = storage.get(BlobId.fromGsUtilUri(url));
if (blob == null) {
throw new RuntimeException(String.format("Registry file %s was not found", url));
}
}

public RegistryProto.Registry getContent() {
try {
return RegistryProto.Registry.parseFrom(blob.getContent());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(
String.format(
"Couldn't read remote registry: %s. Error: %s",
blob.getBlobId().toGsUtilUri(), e.getMessage()));
}
}

public Optional<RegistryProto.Registry> getContentIfModified() {
try {
this.blob = blob.reload(Blob.BlobSourceOption.generationNotMatch());
} catch (StorageException e) {
if (e.getCode() == 304) {
// Content not modified
return Optional.empty();
} else {
throw new RuntimeException(
String.format(
"Couldn't read remote registry: %s. Error: %s",
blob.getBlobId().toGsUtilUri(), e.getMessage()));
}
}

return Optional.of(this.getContent());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.serving.registry;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.proto.core.RegistryProto;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;

public class LocalRegistryFile implements RegistryFile {
private RegistryProto.Registry cachedRegistry;

public LocalRegistryFile(String path) {
try {
cachedRegistry = RegistryProto.Registry.parseFrom(Files.readAllBytes(Paths.get(path)));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(
String.format(
"Couldn't read local registry: %s. Protobuf is invalid: %s", path, e.getMessage()));
} catch (IOException e) {
throw new RuntimeException(
String.format("Couldn't read local registry file: %s. Error: %s", path, e.getMessage()));
}
}

@Override
public RegistryProto.Registry getContent() {
return this.cachedRegistry;
}

@Override
public Optional<RegistryProto.Registry> getContentIfModified() {
return Optional.empty();
}
}
Loading

0 comments on commit 2729f17

Please sign in to comment.