Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: updates to EDC 0.3.1 #806

Merged
merged 2 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 117 additions & 117 deletions DEPENDENCIES

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationInitiated;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationRequested;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationVerified;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessInitiated;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessProvisioned;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessRequested;
Expand All @@ -35,11 +34,14 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.edc.spi.types.domain.edr.EndpointDataReference.EDR_SIMPLE_TYPE;
import static org.eclipse.tractusx.edc.helpers.EdrNegotiationHelperFunctions.createCallback;
Expand All @@ -58,6 +60,9 @@ public abstract class AbstractNegotiateEdrTest {
protected static final Participant SOKRATES = new Participant(SOKRATES_NAME, SOKRATES_BPN, sokratesConfiguration());
protected static final Participant PLATO = new Participant(PLATO_NAME, PLATO_BPN, platoConfiguration());

private static final Duration ASYNC_TIMEOUT = ofSeconds(45);
private static final Duration ASYNC_POLL_INTERVAL = ofSeconds(1);

MockWebServer server;

@BeforeEach
Expand All @@ -78,8 +83,7 @@ void negotiateEdr_shouldInvokeCallbacks() throws IOException {
createEvent(TransferProcessInitiated.class),
createEvent(TransferProcessProvisioned.class),
createEvent(TransferProcessRequested.class),
createEvent(TransferProcessStarted.class),
createEvent(TransferProcessCompleted.class));
createEvent(TransferProcessStarted.class));

var assetId = "api-asset-1";
var url = server.url("/mock/api");
Expand Down Expand Up @@ -113,6 +117,14 @@ void negotiateEdr_shouldInvokeCallbacks() throws IOException {
.map(receivedEvent -> waitForEvent(server, receivedEvent))
.collect(Collectors.toList());


await().pollInterval(ASYNC_POLL_INTERVAL)
.atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(edrCaches).hasSize(1);
});

assertThat(expectedEvents).usingRecursiveFieldByFieldElementComparator().containsAll(events);

var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.assertj.core.api.Condition;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.policy.model.Operator;
import org.eclipse.tractusx.edc.lifecycle.Participant;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -58,6 +58,8 @@ public abstract class AbstractRenewalEdrTest {
protected static final Participant SOKRATES = new Participant(SOKRATES_NAME, SOKRATES_BPN, sokratesConfiguration());
protected static final Participant PLATO = new Participant(PLATO_NAME, PLATO_BPN, platoConfiguration());
private static final Duration ASYNC_TIMEOUT = ofSeconds(45);
private static final Duration ASYNC_POLL_INTERVAL = ofSeconds(1);

MockWebServer server;

@BeforeEach
Expand All @@ -70,8 +72,8 @@ void setup() {
void negotiateEdr_shouldRenewTheEdr() throws IOException {

var expectedEvents = List.of(
createEvent(TransferProcessCompleted.class),
createEvent(TransferProcessCompleted.class));
createEvent(TransferProcessStarted.class),
createEvent(TransferProcessStarted.class));

var assetId = UUID.randomUUID().toString();
var url = server.url("/mock/api");
Expand All @@ -93,7 +95,7 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException {
PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2");

var callbacks = Json.createArrayBuilder()
.add(createCallback(url.toString(), true, Set.of("transfer.process.completed")))
.add(createCallback(url.toString(), true, Set.of("transfer.process.started")))
.build();

expectedEvents.forEach(event -> server.enqueue(new MockResponse()));
Expand All @@ -109,6 +111,7 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException {
JsonArrayBuilder edrCaches = Json.createArrayBuilder();

await().atMost(ASYNC_TIMEOUT)
.pollInterval(ASYNC_POLL_INTERVAL)
.untilAsserted(() -> {
var localEdrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(localEdrCaches).hasSizeGreaterThan(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.policy.model.Operator;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.tractusx.edc.lifecycle.Participant;
Expand All @@ -32,11 +32,14 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.tractusx.edc.helpers.EdrNegotiationHelperFunctions.createCallback;
import static org.eclipse.tractusx.edc.helpers.PolicyHelperFunctions.businessPartnerGroupPolicy;
Expand All @@ -58,6 +61,9 @@ public abstract class AbstractDataPlaneProxyTest {

private static final String CUSTOM_QUERY_PARAMS = "foo=bar";

private static final Duration ASYNC_TIMEOUT = ofSeconds(45);
private static final Duration ASYNC_POLL_INTERVAL = ofSeconds(1);

private static final String CUSTOM_FULL_PATH = CUSTOM_BASE_PATH + CUSTOM_SUB_PATH + "?" + CUSTOM_QUERY_PARAMS;
private final ObjectMapper mapper = new ObjectMapper();
private MockWebServer server;
Expand Down Expand Up @@ -86,7 +92,7 @@ void httpPullDataTransfer_withEdrAndProxy() {
PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2");

var callbacks = Json.createArrayBuilder()
.add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.completed")))
.add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.started")))
.build();

// response to callback
Expand All @@ -96,6 +102,13 @@ void httpPullDataTransfer_withEdrAndProxy() {

var transferEvent = waitForTransferCompletion();

await().pollInterval(ASYNC_POLL_INTERVAL)
.atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(edrCaches).hasSize(1);
});

var body = "{\"response\": \"ok\"}";

server.enqueue(new MockResponse().setBody(body));
Expand Down Expand Up @@ -159,7 +172,7 @@ void httpPullDataTransfer_shouldFailForAsset_withTwoEdrAndProxy() throws IOExcep
PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2");

var callbacks = Json.createArrayBuilder()
.add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.completed")))
.add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.started")))
.build();

// response to callback
Expand All @@ -172,6 +185,14 @@ void httpPullDataTransfer_shouldFailForAsset_withTwoEdrAndProxy() throws IOExcep
var transferEvent1 = waitForTransferCompletion();
var transferEvent2 = waitForTransferCompletion();

await().pollInterval(ASYNC_POLL_INTERVAL)
.atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(edrCaches).hasSize(2);
});


var body = "{\"response\": \"ok\"}";

server.enqueue(new MockResponse().setBody(body));
Expand Down Expand Up @@ -210,7 +231,7 @@ void httpPullDataTransfer_withEdrAndProviderDataPlaneProxy() throws IOException
PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2");

var callbacks = Json.createArrayBuilder()
.add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.completed")))
.add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.started")))
.build();

// response to callback
Expand All @@ -220,6 +241,13 @@ void httpPullDataTransfer_withEdrAndProviderDataPlaneProxy() throws IOException

var transferEvent = waitForTransferCompletion();

await().pollInterval(ASYNC_POLL_INTERVAL)
.atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(edrCaches).hasSize(1);
});

var body = "{\"response\": \"ok\"}";

server.enqueue(new MockResponse().setBody(body));
Expand Down Expand Up @@ -272,13 +300,20 @@ public MockResponse dispatch(@NotNull RecordedRequest recordedRequest) throws In
PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2");

var callbacks = Json.createArrayBuilder()
.add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.completed")))
.add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.started")))
.build();

SOKRATES.negotiateEdr(PLATO, assetId, callbacks);

waitForTransferCompletion();

await().pollInterval(ASYNC_POLL_INTERVAL)
.atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(edrCaches).hasSize(1);
});

var data = SOKRATES.pullProviderDataPlaneDataByAssetIdAndCustomProperties(PLATO, assetId, CUSTOM_SUB_PATH, CUSTOM_QUERY_PARAMS);
assertThat(data).isEqualTo(body);

Expand All @@ -295,7 +330,7 @@ void teardown() throws IOException {
server.shutdown();
}

private EventEnvelope<TransferProcessCompleted> waitForTransferCompletion() {
private EventEnvelope<TransferProcessStarted> waitForTransferCompletion() {
try {
var request = server.takeRequest(60, TimeUnit.SECONDS);
if (request != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void transferData_privateBackend() throws IOException, InterruptedException {
.atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var tpState = SOKRATES.getTransferProcessState(transferProcessId.get());
assertThat(tpState).isNotNull().isEqualTo(TransferProcessStates.COMPLETED.toString());
assertThat(tpState).isNotNull().isEqualTo(TransferProcessStates.STARTED.toString());
});

// wait until EDC is available on the consumer side
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,3 @@

org.eclipse.tractusx.edc.lifecycle.ConsumerServicesExtension
org.eclipse.tractusx.edc.lifecycle.VaultSeedExtension
org.eclipse.tractusx.edc.lifecycle.TestServiceExtension

2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
format.version = "1.1"

[versions]
edc = "0.3.0"
edc = "0.3.1"
postgres = "42.6.0"
awaitility = "4.2.0"
nimbus = "9.35"
Expand Down