Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(DataPlaneConsumerProxy): adds support for data plane provider url #643

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions edc-dataplane/edc-dataplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
runtimeOnly(project(":edc-extensions:dataplane-proxy:edc-dataplane-proxy-provider-core"))

runtimeOnly(libs.edc.config.filesystem)
runtimeOnly(libs.edc.auth.tokenbased)
runtimeOnly(libs.edc.dpf.awss3)
runtimeOnly(libs.edc.dpf.oauth2)
runtimeOnly(libs.edc.dpf.http)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,27 @@ The path is `<proxyContext>/aas/request` and the body is something like this exa
The body should contain the `assetId` or the `transferProcessId` which identify the data that we want to fetch
and an `endpointUrl` which is the provider gateway on which the data is available. More info [here](../edc-dataplane-proxy-provider-api/README.md) on the gateway.

Alternatively if the `endpointUrl` is not known or the gateway on the provider side is not configured, it can be omitted and the `Edr#endpointUrl`
will be used. In this scenario if needed users can provide additional properties to the request for composing the final
url:

- `pathSegments` sub path to append to the base url
- `queryParams` query parameters to add to the url

Example with base url `http://localhost:8080/test`

```json
{
"assetId": "1",
"pathSegments": "/sub",
"queryParams": "foo=bar"
}
```

The final url will look like `http://localhost:8080/test/sub?foo=bar` composed by the DataPlane manager with the Http request flow,

> Note: the endpoint is not protected with configured `AuthenticationService`, which most likely will be the token based (auth key) one.
## Configuration

| Key | Required | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
implementation(libs.edc.dpf.framework)
implementation(libs.edc.dpf.util)
implementation(libs.edc.ext.http)
implementation(libs.edc.spi.auth)

implementation(project(":spi:edr-spi"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.eclipse.tractusx.edc.dataplane.proxy.consumer.api;

import org.eclipse.edc.api.auth.spi.AuthenticationRequestFilter;
import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
Expand Down Expand Up @@ -63,6 +65,9 @@ public class DataPlaneProxyConsumerApiExtension implements ServiceExtension {
@Inject
private WebServiceConfigurer configurer;

@Inject
private AuthenticationService authenticationService;

@Inject
private Monitor monitor;

Expand All @@ -80,6 +85,7 @@ public void initialize(ServiceExtensionContext context) {

executorService = newFixedThreadPool(context.getSetting(THREAD_POOL_SIZE, DEFAULT_THREAD_POOL));

webService.registerResource(CONSUMER_API_ALIAS, new AuthenticationRequestFilter(authenticationService));
webService.registerResource(CONSUMER_API_ALIAS, new ClientErrorExceptionMapper());
webService.registerResource(CONSUMER_API_ALIAS, new ConsumerAssetRequestController(edrCache, dataPlaneManager, executorService, monitor));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.HttpDataAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.model.AssetRequest;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
Expand All @@ -41,15 +44,17 @@
import static jakarta.ws.rs.core.Response.status;
import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.PATH;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.QUERY_PARAMS;

/**
* Implements the HTTP proxy API.
*/
@Path("/aas")
public class ConsumerAssetRequestController implements ConsumerAssetRequestApi {
public static final String BASE_URL = "baseUrl";
private static final String HTTP_DATA = "HttpData";
private static final String ASYNC_TYPE = "async";
private static final String BASE_URL = "baseUrl";
private static final String HEADER_AUTHORIZATION = "header:authorization";
private static final String BEARER_PREFIX = "Bearer ";

Expand All @@ -76,22 +81,27 @@ public void requestAsset(AssetRequest request, @Suspended AsyncResponse response
// resolve the EDR and add it to the request
var edr = resolveEdr(request);

var sourceAddress = DataAddress.Builder.newInstance()
.type(HTTP_DATA)
.property(BASE_URL, request.getEndpointUrl())
.property(HEADER_AUTHORIZATION, BEARER_PREFIX + edr.getAuthCode())
.build();
var sourceAddress = Optional.ofNullable(request.getEndpointUrl())
.map(url -> gatewayAddress(url, edr))
.orElseGet(() -> dataPlaneAddress(edr));


var destinationAddress = DataAddress.Builder.newInstance()
.type(ASYNC_TYPE)
.build();


var properties = Optional.ofNullable(request.getEndpointUrl())
.map((url) -> Map.<String, String>of())
.orElseGet(() -> dataPlaneProperties(request));

var flowRequest = DataFlowRequest.Builder.newInstance()
.processId(randomUUID().toString())
.trackable(false)
.sourceDataAddress(sourceAddress)
.destinationDataAddress(destinationAddress)
.traceContext(Map.of())
.properties(properties)
.build();

// transfer the data asynchronously
Expand All @@ -104,6 +114,30 @@ public void requestAsset(AssetRequest request, @Suspended AsyncResponse response
}
}


private Map<String, String> dataPlaneProperties(AssetRequest request) {
var props = new HashMap<String, String>();
Optional.ofNullable(request.getQueryParams()).ifPresent((queryParams) -> props.put(QUERY_PARAMS, queryParams));
Optional.ofNullable(request.getPathSegments()).ifPresent((path) -> props.put(PATH, path));
return props;
}

private DataAddress gatewayAddress(String url, EndpointDataReference edr) {
return HttpDataAddress.Builder.newInstance()
.baseUrl(url)
.property(HEADER_AUTHORIZATION, BEARER_PREFIX + edr.getAuthCode())
.build();
}

private DataAddress dataPlaneAddress(EndpointDataReference edr) {
return HttpDataAddress.Builder.newInstance()
.baseUrl(edr.getEndpoint())
.proxyQueryParams("true")
.proxyPath("true")
.property(HEADER_AUTHORIZATION, edr.getAuthCode())
.build();
}

private EndpointDataReference resolveEdr(AssetRequest request) {
if (request.getTransferProcessId() != null) {
var edr = edrCache.resolveReference(request.getTransferProcessId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;

import static java.util.Objects.requireNonNull;

/**
* A request for asset data. The request may contain a transfer process ID or asset ID and must specify an endpoint for retrieving the data.
*/
Expand All @@ -33,6 +31,10 @@ public class AssetRequest {
private String providerId;
private String endpointUrl;

private String queryParams;

private String pathSegments;

private AssetRequest() {
}

Expand All @@ -52,6 +54,14 @@ public String getProviderId() {
return providerId;
}

public String getQueryParams() {
return queryParams;
}

public String getPathSegments() {
return pathSegments;
}

@JsonPOJOBuilder(withPrefix = "")
public static class Builder {
private final AssetRequest request;
Expand Down Expand Up @@ -85,11 +95,20 @@ public Builder providerId(String providerId) {
return this;
}

public Builder queryParams(String queryParams) {
request.queryParams = queryParams;
return this;
}

public Builder pathSegments(String pathSegments) {
request.pathSegments = pathSegments;
return this;
}

public AssetRequest build() {
if (request.assetId == null && request.transferProcessId == null) {
throw new NullPointerException("An assetId or endpointReferenceId must be set");
}
requireNonNull(request.endpointUrl, "endpointUrl");
return request;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ void verify_SerializeDeserialize() throws JsonProcessingException {
.endpointUrl("https://test.com")
.providerId("providerId")
.transferProcessId("tp1")
.queryParams("params")
.pathSegments("path")
.build();

var serialized = mapper.writeValueAsString(request);

var deserialized = mapper.readValue(serialized, AssetRequest.class);
Expand All @@ -42,13 +44,14 @@ void verify_SerializeDeserialize() throws JsonProcessingException {
assertThat(deserialized.getTransferProcessId()).isEqualTo(request.getTransferProcessId());
assertThat(deserialized.getEndpointUrl()).isEqualTo(request.getEndpointUrl());
assertThat(deserialized.getProviderId()).isEqualTo(request.getProviderId());
assertThat(deserialized.getPathSegments()).isEqualTo(request.getPathSegments());
assertThat(deserialized.getQueryParams()).isEqualTo(request.getQueryParams());

}

@Test
void verify_NullArguments() {
assertThatThrownBy(() -> AssetRequest.Builder.newInstance().endpointUrl("https://test.com").build()).isInstanceOf(NullPointerException.class);
assertThatThrownBy(() -> AssetRequest.Builder.newInstance().assetId("asset1").build()).isInstanceOf(NullPointerException.class);
assertThatThrownBy(() -> AssetRequest.Builder.newInstance().build()).isInstanceOf(NullPointerException.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import org.eclipse.edc.junit.annotations.ApiTest;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase;
import org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.ConsumerAssetRequestController;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -48,8 +50,12 @@
import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
import static jakarta.ws.rs.core.Response.Status.UNAUTHORIZED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.PATH;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.QUERY_PARAMS;
import static org.eclipse.tractusx.edc.dataplane.proxy.consumer.api.asset.ConsumerAssetRequestController.BASE_URL;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ApiTest
Expand Down Expand Up @@ -243,6 +249,61 @@ void requestAsset_shouldReturnError_whenEdrByTransferProcessIdNotFound() {

}

@Test
void requestAsset_shouldReturnData_withDataPlaneUrl() throws IOException {

var transferProcessId = "tp";
var url = "http://localhost:8080/test";
var request = Map.of("transferProcessId", transferProcessId, PATH, "/path", QUERY_PARAMS, "test=10&foo=bar");
var edr = EndpointDataReference.Builder.newInstance()
.id(transferProcessId)
.authKey("authKey")
.authCode("authCode")
.endpoint(url)
.build();

var response = Map.of("response", "ok");
var responseBytes = mapper.writeValueAsBytes(response);

var datasource = mock(DataSource.class);
var partStream = mock(DataSource.Part.class);

when(datasource.openPartStream()).thenReturn(StreamResult.success(Stream.of(partStream)));
when(partStream.openStream()).thenReturn(new ByteArrayInputStream(responseBytes));

when(cache.resolveReference(transferProcessId)).thenReturn(edr);
when(dataPlaneManager.transfer(any(DataSink.class), any()))
.thenAnswer(a -> {
AsyncStreamingDataSink sink = a.getArgument(0);
return sink.transfer(datasource);
});

var proxyResponseBytes = baseRequest()
.contentType(MediaType.APPLICATION_JSON)
.body(request)
.post(ASSET_REQUEST_PATH)
.then()
.statusCode(200)
.extract().body().asByteArray();

var proxyResponse = mapper.readValue(proxyResponseBytes, new TypeReference<Map<String, String>>() {
});

assertThat(proxyResponse).containsAllEntriesOf(response);

var captor = ArgumentCaptor.forClass(DataFlowRequest.class);
verify(dataPlaneManager).transfer(any(DataSink.class), captor.capture());


var flowRequest = captor.getValue();

assertThat(flowRequest.getSourceDataAddress().getProperty(BASE_URL)).isEqualTo(edr.getEndpoint());

assertThat(flowRequest.getProperties().get(QUERY_PARAMS)).isEqualTo(request.get(QUERY_PARAMS));
assertThat(flowRequest.getProperties().get(PATH)).isEqualTo(request.get(PATH));

}

@Override
protected Object controller() {
return new ConsumerAssetRequestController(cache, dataPlaneManager, Executors.newSingleThreadExecutor(), mock(Monitor.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,17 @@ public String pullProxyDataByAssetId(Participant provider, String assetId) {
return getProxyData(body);
}


public String pullProviderDataPlaneDataByAssetId(Participant provider, String assetId) {
var body = Map.of("assetId", assetId);
return getProxyData(body);
}

public String pullProviderDataPlaneDataByAssetIdAndCustomProperties(Participant provider, String assetId, String path, String params) {
var body = Map.of("assetId", assetId, "pathSegments", path, "queryParams", params);
return getProxyData(body);
}

public Response pullProxyDataResponseByAssetId(Participant provider, String assetId) {
var body = Map.of("assetId", assetId,
"endpointUrl", format("%s/aas/test", provider.gatewayEndpoint),
Expand All @@ -345,6 +356,12 @@ public String pullProxyDataByTransferProcessId(Participant provider, String tran

}

public String pullProviderDataPlaneDataByTransferProcessId(Participant provider, String transferProcessId) {
var body = Map.of("transferProcessId", transferProcessId);
return getProxyData(body);

}

public JsonObject getDatasetForAsset(Participant provider, String assetId) {
var datasets = getCatalogDatasets(provider);
return datasets.stream()
Expand Down Expand Up @@ -374,6 +391,7 @@ private String getProxyData(Map<String, String> body) {
private Response proxyRequest(Map<String, String> body) {
return given()
.baseUri(proxyUrl)
.header("x-api-key", apiKey)
.contentType("application/json")
.body(body)
.post(PROXY_SUBPATH);
Expand Down
Loading