Skip to content

Commit

Permalink
feat(DataPlaneConsumerProxy): adds support for data plane provider url (
Browse files Browse the repository at this point in the history
#643)

* feat(DataPlaneConsumerProxy): adds support for data plane provider url from EDR and path segments and query params

* add token based auth on data plane consumer proxy
  • Loading branch information
wolf4ood authored Jul 31, 2023
1 parent b8ec60f commit 981bd77
Show file tree
Hide file tree
Showing 12 changed files with 285 additions and 13 deletions.
1 change: 1 addition & 0 deletions edc-dataplane/edc-dataplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
runtimeOnly(project(":edc-extensions:dataplane-proxy:edc-dataplane-proxy-provider-core"))

runtimeOnly(libs.edc.config.filesystem)
runtimeOnly(libs.edc.auth.tokenbased)
runtimeOnly(libs.edc.dpf.awss3)
runtimeOnly(libs.edc.dpf.oauth2)
runtimeOnly(libs.edc.dpf.http)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,27 @@ The path is `<proxyContext>/aas/request` and the body is something like this exa
The body should contain the `assetId` or the `transferProcessId` which identify the data that we want to fetch
and an `endpointUrl` which is the provider gateway on which the data is available. More info [here](../edc-dataplane-proxy-provider-api/README.md) on the gateway.

Alternatively if the `endpointUrl` is not known or the gateway on the provider side is not configured, it can be omitted and the `Edr#endpointUrl`
will be used. In this scenario if needed users can provide additional properties to the request for composing the final
url:

- `pathSegments` sub path to append to the base url
- `queryParams` query parameters to add to the url

Example with base url `http://localhost:8080/test`

```json
{
"assetId": "1",
"pathSegments": "/sub",
"queryParams": "foo=bar"
}
```

The final url will look like `http://localhost:8080/test/sub?foo=bar` composed by the DataPlane manager with the Http request flow,

> Note: the endpoint is not protected with configured `AuthenticationService`, which most likely will be the token based (auth key) one.
## Configuration

| Key | Required | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
implementation(libs.edc.dpf.framework)
implementation(libs.edc.dpf.util)
implementation(libs.edc.ext.http)
implementation(libs.edc.spi.auth)

implementation(project(":spi:edr-spi"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.eclipse.tractusx.edc.dataplane.proxy.consumer.api;

import org.eclipse.edc.api.auth.spi.AuthenticationRequestFilter;
import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
Expand Down Expand Up @@ -63,6 +65,9 @@ public class DataPlaneProxyConsumerApiExtension implements ServiceExtension {
@Inject
private WebServiceConfigurer configurer;

@Inject
private AuthenticationService authenticationService;

@Inject
private Monitor monitor;

Expand All @@ -80,6 +85,7 @@ public void initialize(ServiceExtensionContext context) {

executorService = newFixedThreadPool(context.getSetting(THREAD_POOL_SIZE, DEFAULT_THREAD_POOL));

webService.registerResource(CONSUMER_API_ALIAS, new AuthenticationRequestFilter(authenticationService));
webService.registerResource(CONSUMER_API_ALIAS, new ClientErrorExceptionMapper());
webService.registerResource(CONSUMER_API_ALIAS, new ConsumerAssetRequestController(edrCache, dataPlaneManager, executorService, monitor));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.HttpDataAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.model.AssetRequest;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
Expand All @@ -41,15 +44,17 @@
import static jakarta.ws.rs.core.Response.status;
import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.PATH;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.QUERY_PARAMS;

/**
* Implements the HTTP proxy API.
*/
@Path("/aas")
public class ConsumerAssetRequestController implements ConsumerAssetRequestApi {
public static final String BASE_URL = "baseUrl";
private static final String HTTP_DATA = "HttpData";
private static final String ASYNC_TYPE = "async";
private static final String BASE_URL = "baseUrl";
private static final String HEADER_AUTHORIZATION = "header:authorization";
private static final String BEARER_PREFIX = "Bearer ";

Expand All @@ -76,22 +81,27 @@ public void requestAsset(AssetRequest request, @Suspended AsyncResponse response
// resolve the EDR and add it to the request
var edr = resolveEdr(request);

var sourceAddress = DataAddress.Builder.newInstance()
.type(HTTP_DATA)
.property(BASE_URL, request.getEndpointUrl())
.property(HEADER_AUTHORIZATION, BEARER_PREFIX + edr.getAuthCode())
.build();
var sourceAddress = Optional.ofNullable(request.getEndpointUrl())
.map(url -> gatewayAddress(url, edr))
.orElseGet(() -> dataPlaneAddress(edr));


var destinationAddress = DataAddress.Builder.newInstance()
.type(ASYNC_TYPE)
.build();


var properties = Optional.ofNullable(request.getEndpointUrl())
.map((url) -> Map.<String, String>of())
.orElseGet(() -> dataPlaneProperties(request));

var flowRequest = DataFlowRequest.Builder.newInstance()
.processId(randomUUID().toString())
.trackable(false)
.sourceDataAddress(sourceAddress)
.destinationDataAddress(destinationAddress)
.traceContext(Map.of())
.properties(properties)
.build();

// transfer the data asynchronously
Expand All @@ -104,6 +114,30 @@ public void requestAsset(AssetRequest request, @Suspended AsyncResponse response
}
}


private Map<String, String> dataPlaneProperties(AssetRequest request) {
var props = new HashMap<String, String>();
Optional.ofNullable(request.getQueryParams()).ifPresent((queryParams) -> props.put(QUERY_PARAMS, queryParams));
Optional.ofNullable(request.getPathSegments()).ifPresent((path) -> props.put(PATH, path));
return props;
}

private DataAddress gatewayAddress(String url, EndpointDataReference edr) {
return HttpDataAddress.Builder.newInstance()
.baseUrl(url)
.property(HEADER_AUTHORIZATION, BEARER_PREFIX + edr.getAuthCode())
.build();
}

private DataAddress dataPlaneAddress(EndpointDataReference edr) {
return HttpDataAddress.Builder.newInstance()
.baseUrl(edr.getEndpoint())
.proxyQueryParams("true")
.proxyPath("true")
.property(HEADER_AUTHORIZATION, edr.getAuthCode())
.build();
}

private EndpointDataReference resolveEdr(AssetRequest request) {
if (request.getTransferProcessId() != null) {
var edr = edrCache.resolveReference(request.getTransferProcessId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;

import static java.util.Objects.requireNonNull;

/**
* A request for asset data. The request may contain a transfer process ID or asset ID and must specify an endpoint for retrieving the data.
*/
Expand All @@ -33,6 +31,10 @@ public class AssetRequest {
private String providerId;
private String endpointUrl;

private String queryParams;

private String pathSegments;

private AssetRequest() {
}

Expand All @@ -52,6 +54,14 @@ public String getProviderId() {
return providerId;
}

public String getQueryParams() {
return queryParams;
}

public String getPathSegments() {
return pathSegments;
}

@JsonPOJOBuilder(withPrefix = "")
public static class Builder {
private final AssetRequest request;
Expand Down Expand Up @@ -85,11 +95,20 @@ public Builder providerId(String providerId) {
return this;
}

public Builder queryParams(String queryParams) {
request.queryParams = queryParams;
return this;
}

public Builder pathSegments(String pathSegments) {
request.pathSegments = pathSegments;
return this;
}

public AssetRequest build() {
if (request.assetId == null && request.transferProcessId == null) {
throw new NullPointerException("An assetId or endpointReferenceId must be set");
}
requireNonNull(request.endpointUrl, "endpointUrl");
return request;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ void verify_SerializeDeserialize() throws JsonProcessingException {
.endpointUrl("https://test.com")
.providerId("providerId")
.transferProcessId("tp1")
.queryParams("params")
.pathSegments("path")
.build();

var serialized = mapper.writeValueAsString(request);

var deserialized = mapper.readValue(serialized, AssetRequest.class);
Expand All @@ -42,13 +44,14 @@ void verify_SerializeDeserialize() throws JsonProcessingException {
assertThat(deserialized.getTransferProcessId()).isEqualTo(request.getTransferProcessId());
assertThat(deserialized.getEndpointUrl()).isEqualTo(request.getEndpointUrl());
assertThat(deserialized.getProviderId()).isEqualTo(request.getProviderId());
assertThat(deserialized.getPathSegments()).isEqualTo(request.getPathSegments());
assertThat(deserialized.getQueryParams()).isEqualTo(request.getQueryParams());

}

@Test
void verify_NullArguments() {
assertThatThrownBy(() -> AssetRequest.Builder.newInstance().endpointUrl("https://test.com").build()).isInstanceOf(NullPointerException.class);
assertThatThrownBy(() -> AssetRequest.Builder.newInstance().assetId("asset1").build()).isInstanceOf(NullPointerException.class);
assertThatThrownBy(() -> AssetRequest.Builder.newInstance().build()).isInstanceOf(NullPointerException.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import org.eclipse.edc.junit.annotations.ApiTest;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase;
import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.ConsumerAssetRequestController;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -48,8 +50,12 @@
import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
import static jakarta.ws.rs.core.Response.Status.UNAUTHORIZED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.PATH;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.QUERY_PARAMS;
import static org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.ConsumerAssetRequestController.BASE_URL;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ApiTest
Expand Down Expand Up @@ -243,6 +249,61 @@ void requestAsset_shouldReturnError_whenEdrByTransferProcessIdNotFound() {

}

@Test
void requestAsset_shouldReturnData_withDataPlaneUrl() throws IOException {

var transferProcessId = "tp";
var url = "http://localhost:8080/test";
var request = Map.of("transferProcessId", transferProcessId, PATH, "/path", QUERY_PARAMS, "test=10&foo=bar");
var edr = EndpointDataReference.Builder.newInstance()
.id(transferProcessId)
.authKey("authKey")
.authCode("authCode")
.endpoint(url)
.build();

var response = Map.of("response", "ok");
var responseBytes = mapper.writeValueAsBytes(response);

var datasource = mock(DataSource.class);
var partStream = mock(DataSource.Part.class);

when(datasource.openPartStream()).thenReturn(StreamResult.success(Stream.of(partStream)));
when(partStream.openStream()).thenReturn(new ByteArrayInputStream(responseBytes));

when(cache.resolveReference(transferProcessId)).thenReturn(edr);
when(dataPlaneManager.transfer(any(DataSink.class), any()))
.thenAnswer(a -> {
AsyncStreamingDataSink sink = a.getArgument(0);
return sink.transfer(datasource);
});

var proxyResponseBytes = baseRequest()
.contentType(MediaType.APPLICATION_JSON)
.body(request)
.post(ASSET_REQUEST_PATH)
.then()
.statusCode(200)
.extract().body().asByteArray();

var proxyResponse = mapper.readValue(proxyResponseBytes, new TypeReference<Map<String, String>>() {
});

assertThat(proxyResponse).containsAllEntriesOf(response);

var captor = ArgumentCaptor.forClass(DataFlowRequest.class);
verify(dataPlaneManager).transfer(any(DataSink.class), captor.capture());


var flowRequest = captor.getValue();

assertThat(flowRequest.getSourceDataAddress().getProperty(BASE_URL)).isEqualTo(edr.getEndpoint());

assertThat(flowRequest.getProperties().get(QUERY_PARAMS)).isEqualTo(request.get(QUERY_PARAMS));
assertThat(flowRequest.getProperties().get(PATH)).isEqualTo(request.get(PATH));

}

@Override
protected Object controller() {
return new ConsumerAssetRequestController(cache, dataPlaneManager, Executors.newSingleThreadExecutor(), mock(Monitor.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,17 @@ public String pullProxyDataByAssetId(Participant provider, String assetId) {
return getProxyData(body);
}


public String pullProviderDataPlaneDataByAssetId(Participant provider, String assetId) {
var body = Map.of("assetId", assetId);
return getProxyData(body);
}

public String pullProviderDataPlaneDataByAssetIdAndCustomProperties(Participant provider, String assetId, String path, String params) {
var body = Map.of("assetId", assetId, "pathSegments", path, "queryParams", params);
return getProxyData(body);
}

public Response pullProxyDataResponseByAssetId(Participant provider, String assetId) {
var body = Map.of("assetId", assetId,
"endpointUrl", format("%s/aas/test", provider.gatewayEndpoint),
Expand All @@ -345,6 +356,12 @@ public String pullProxyDataByTransferProcessId(Participant provider, String tran

}

public String pullProviderDataPlaneDataByTransferProcessId(Participant provider, String transferProcessId) {
var body = Map.of("transferProcessId", transferProcessId);
return getProxyData(body);

}

public JsonObject getDatasetForAsset(Participant provider, String assetId) {
var datasets = getCatalogDatasets(provider);
return datasets.stream()
Expand Down Expand Up @@ -374,6 +391,7 @@ private String getProxyData(Map<String, String> body) {
private Response proxyRequest(Map<String, String> body) {
return given()
.baseUri(proxyUrl)
.header("x-api-key", apiKey)
.contentType("application/json")
.body(body)
.post(PROXY_SUBPATH);
Expand Down
Loading

0 comments on commit 981bd77

Please sign in to comment.