Skip to content

Commit

Permalink
feat: upgrade to EDC 0.2.0 (#674)
Browse files Browse the repository at this point in the history
* feat: upgrade to EDC 0.2.0

* remove HAshicorp Vault

* fix tests

* fix tests

* dockerfiles

* increase timeout

* DEPENDENCIES

* removed secrets
  • Loading branch information
paullatzelsperger authored Jul 28, 2023
1 parent 78f1a01 commit 968bb28
Show file tree
Hide file tree
Showing 85 changed files with 341 additions and 3,218 deletions.
265 changes: 119 additions & 146 deletions DEPENDENCIES

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Comparator.comparingLong;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -86,13 +87,25 @@ public InMemoryEndpointDataReferenceCache(String lockId, Clock clock, Map<String
}

@Override
public @Nullable EndpointDataReferenceEntry findByTransferProcessId(String transferProcessId) {
public @Nullable StoreResult<EndpointDataReferenceEntry> findByIdAndLease(String transferProcessId) {
return lockManager.readLock(() -> {
var edr = edrsByTransferProcessId.get(transferProcessId);
return entriesByEdrId.get(edr.getId());
var edrEntry = entriesByEdrId.get(edr.getId());
return edrEntry == null ? StoreResult.notFound(format("EndpointDataReferenceEntry %s not found", transferProcessId)) :
StoreResult.success(edrEntry);
});
}

@Override
public StoreResult<EndpointDataReferenceEntry> findByCorrelationIdAndLease(String correlationId) {
return findByIdAndLease(correlationId);
}

@Override
public void save(EndpointDataReferenceEntry entity) {
throw new UnsupportedOperationException("Please use save(EndpointDataReferenceEntry, EndpointDataReference) instead!");
}

@Override
@NotNull
public List<EndpointDataReference> referencesForAsset(String assetId, String providerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestData;
import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.Criterion;
Expand Down Expand Up @@ -48,7 +46,6 @@
import java.time.ZoneOffset;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -152,7 +149,7 @@ private void update(EndpointDataReferenceEntry edrEntry) {


private StateProcessorImpl<EndpointDataReferenceEntry> processEdrInState(EndpointDataReferenceEntryStates state, Function<EndpointDataReferenceEntry, Boolean> function) {
var filter = new Criterion[]{ hasState(state.code()) };
var filter = new Criterion[] {hasState(state.code())};
return new StateProcessorImpl<>(() -> edrCache.nextNotLeased(batchSize, filter), telemetry.contextPropagationMiddleware(function));
}

Expand All @@ -169,15 +166,11 @@ private StateProcessorImpl<EndpointDataReferenceEntry> processDeletingEdr(Functi
private ContractRequest createContractRequest(NegotiateEdrRequest request) {
var callbacks = Stream.concat(request.getCallbackAddresses().stream(), Stream.of(LOCAL_CALLBACK)).collect(Collectors.toList());

var requestData = ContractRequestData.Builder.newInstance()
return ContractRequest.Builder.newInstance()
.counterPartyAddress(request.getConnectorAddress())
.contractOffer(request.getOffer())
.protocol(request.getProtocol())
.counterPartyAddress(request.getConnectorAddress())
.connectorId(request.getConnectorId())
.build();

return ContractRequest.Builder.newInstance()
.requestData(requestData)
.providerId(request.getConnectorId())
.callbackAddresses(callbacks).build();
}

Expand Down Expand Up @@ -244,21 +237,13 @@ private StatusResult<Void> fireTransferProcess(EndpointDataReferenceEntry entry)
}
var dataRequest = transferProcess.getDataRequest();

var newDataRequest = DataRequest.Builder.newInstance()
.id(UUID.randomUUID().toString())
var transferRequest = TransferRequest.Builder.newInstance()
.assetId(dataRequest.getAssetId())
.connectorId(dataRequest.getConnectorId())
.contractId(dataRequest.getContractId())
.protocol(dataRequest.getProtocol())
.connectorAddress(dataRequest.getConnectorAddress())
.dataDestination(dataRequest.getDataDestination())
.destinationType(dataRequest.getDestinationType())
.processId(dataRequest.getProcessId())
.managedResources(dataRequest.isManagedResources())
.build();

var transferRequest = TransferRequest.Builder.newInstance()
.dataRequest(newDataRequest)
.callbackAddresses(transferProcess.getCallbackAddresses())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public static NegotiateEdrRequest getNegotiateEdrRequest() {
.id("id")
.assetId("assetId")
.policy(Policy.Builder.newInstance().build())
.providerId("provider")
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ void initEdrNegotiation() {

assertThat(msg.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().containsAll(negotiateEdrRequest.getCallbackAddresses());
assertThat(msg.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().contains(LOCAL_CALLBACK);
assertThat(msg.getRequestData().getContractOffer()).usingRecursiveComparison().isEqualTo(negotiateEdrRequest.getOffer());
assertThat(msg.getRequestData().getProtocol()).isEqualTo(negotiateEdrRequest.getProtocol());
assertThat(msg.getRequestData().getCounterPartyAddress()).isEqualTo(negotiateEdrRequest.getConnectorAddress());
assertThat(msg.getContractOffer()).usingRecursiveComparison().isEqualTo(negotiateEdrRequest.getOffer());
assertThat(msg.getProtocol()).isEqualTo(negotiateEdrRequest.getProtocol());
assertThat(msg.getCounterPartyAddress()).isEqualTo(negotiateEdrRequest.getConnectorAddress());

}

Expand All @@ -118,7 +118,7 @@ void initial_shouldTransitionRequesting() {
var edrEntry = edrEntryBuilder().state(NEGOTIATED.code()).build();
var transferProcess = createTransferProcessBuilder().build();
when(edrCache.nextNotLeased(anyInt(), stateIs(NEGOTIATED.code()))).thenReturn(List.of(edrEntry)).thenReturn(emptyList());
when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(edrCache.findByCorrelationIdAndLease(edrEntry.getTransferProcessId())).thenReturn(StoreResult.success(edrEntry));
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(transferProcess);
when(transferProcessService.initiateTransfer(any())).thenReturn(ServiceResult.success(transferProcess));

Expand All @@ -138,7 +138,7 @@ void initial_shouldNotTransitionToRefreshing_WhenNotExpired() {
.thenReturn(List.of(edrEntry))
.thenReturn(emptyList());

when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(edrCache.findByCorrelationIdAndLease(edrEntry.getTransferProcessId())).thenReturn(StoreResult.success(edrEntry));
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(transferProcess);
when(transferProcessService.initiateTransfer(any())).thenReturn(ServiceResult.success(transferProcess));

Expand All @@ -159,7 +159,7 @@ void initial_shouldTransitionError_whenTransferProcessNotFound() {
.thenReturn(List.of(edrEntry))
.thenReturn(emptyList());

when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(edrCache.findByCorrelationIdAndLease(edrEntry.getTransferProcessId())).thenReturn(StoreResult.success(edrEntry));
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(null);

edrManager.start();
Expand All @@ -179,7 +179,7 @@ void initial_shouldNotTransitionError_whenInitiatedTransferFailsOnce() {
.thenReturn(List.of(edrEntry.copy()))
.thenReturn(emptyList());

when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(edrCache.findByCorrelationIdAndLease(edrEntry.getTransferProcessId())).thenReturn(StoreResult.success(edrEntry));
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(transferProcess);
when(transferProcessService.initiateTransfer(any()))
.thenReturn(ServiceResult.badRequest("bad"))
Expand Down Expand Up @@ -221,7 +221,7 @@ void initial_shouldDeleteTheEntry_whenTheRetentionPeriodIsOver() {
.filter(hasState(DELETING.code()))
.limit(DEFAULT_BATCH_SIZE)
.build();

when(edrCache.queryForEntries(query))
.thenReturn(Stream.of(edrEntry))
.thenReturn(Stream.empty());
Expand Down Expand Up @@ -253,7 +253,6 @@ private TransferProcess.Builder createTransferProcessBuilder() {
.processId(processId)
.protocol("protocol")
.connectorAddress("http://an/address")
.managedResources(false)
.build();

return TransferProcess.Builder.newInstance()
Expand All @@ -273,7 +272,7 @@ private DataRequest.Builder createDataRequestBuilder() {
}

private Criterion[] stateIs(int state) {
return aryEq(new Criterion[]{ hasState(state) });
return aryEq(new Criterion[] {hasState(state)});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ private NegotiateEdrRequest getNegotiateEdrRequest() {
.id("id")
.assetId("assetId")
.policy(Policy.Builder.newInstance().build())
.providerId("provider")
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.net.URI;

import static org.eclipse.tractusx.edc.jsonld.JsonLdExtension.CREDENTIALS_SUMMARY_V_1;
import static org.eclipse.tractusx.edc.jsonld.JsonLdExtension.CREDENTIALS_V_1;
import static org.eclipse.tractusx.edc.jsonld.JsonLdExtension.SECURITY_ED25519_V1;
Expand All @@ -46,10 +48,10 @@ void setup(ObjectFactory factory, ServiceExtensionContext context) {
@Test
void initialize(ServiceExtensionContext context) {
extension.initialize(context);
jsonLdService.registerCachedDocument(eq(CREDENTIALS_V_1), any());
jsonLdService.registerCachedDocument(eq(CREDENTIALS_SUMMARY_V_1), any());
jsonLdService.registerCachedDocument(eq(SECURITY_JWS_V1), any());
jsonLdService.registerCachedDocument(eq(SECURITY_ED25519_V1), any());
jsonLdService.registerCachedDocument(eq(CREDENTIALS_V_1), any(URI.class));
jsonLdService.registerCachedDocument(eq(CREDENTIALS_SUMMARY_V_1), any(URI.class));
jsonLdService.registerCachedDocument(eq(SECURITY_JWS_V1), any(URI.class));
jsonLdService.registerCachedDocument(eq(SECURITY_ED25519_V1), any(URI.class));

}
}
1 change: 1 addition & 0 deletions edc-controlplane/edc-controlplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
runtimeOnly(libs.edc.auth.tokenbased)

runtimeOnly(libs.edc.api.management)
runtimeOnly(libs.edc.api.management.config)
runtimeOnly(libs.edc.api.observability)
runtimeOnly(libs.edc.dsp)
runtimeOnly(libs.edc.spi.jwt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ plugins {

dependencies {
runtimeOnly(project(":edc-controlplane:edc-controlplane-base"))
runtimeOnly(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(libs.edc.vault.hashicorp)
runtimeOnly(libs.edc.core.controlplane)
runtimeOnly(libs.edc.dpf.transfer)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ dependencies {
runtimeOnly(libs.edc.controlplane.callback.dispatcher.http)

runtimeOnly(project(":edc-extensions:postgresql-migration"))
runtimeOnly(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(libs.edc.vault.hashicorp)
runtimeOnly(project(":edc-extensions:edr:edr-cache-sql"))
runtimeOnly(libs.bundles.edc.sqlstores)
runtimeOnly(libs.edc.transaction.local)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ plugins {
dependencies {
runtimeOnly(project(":edc-controlplane:edc-controlplane-base"))
runtimeOnly(project(":edc-extensions:postgresql-migration"))
runtimeOnly(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(libs.edc.vault.hashicorp)
runtimeOnly(project(":edc-extensions:edr:edr-cache-sql"))
runtimeOnly(libs.bundles.edc.sqlstores)
runtimeOnly(libs.edc.transaction.local)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ plugins {

dependencies {
implementation(project(":edc-dataplane:edc-dataplane-base"))
implementation(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(libs.edc.vault.hashicorp)
runtimeOnly(project(":edc-extensions:edr:edr-cache-sql"))
runtimeOnly(libs.edc.transaction.local)
runtimeOnly(libs.edc.sql.pool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
1 change: 0 additions & 1 deletion edc-extensions/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ dependencies {
implementation(project(":edc-extensions:cx-oauth2"))
implementation(project(":edc-extensions:data-encryption"))
implementation(project(":edc-extensions:dataplane-selector-configuration"))
implementation(project(":edc-extensions:hashicorp-vault"))
implementation(project(":edc-extensions:postgresql-migration"))
implementation(project(":edc-extensions:provision-additional-headers"))
implementation(project(":edc-extensions:transferprocess-sftp-client"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.eclipse.tractusx.edc.validation.businesspartner.functions.BusinessPartnerPermissionFunction;
import org.eclipse.tractusx.edc.validation.businesspartner.functions.BusinessPartnerProhibitionFunction;

import static org.eclipse.edc.policy.engine.spi.PolicyEngine.ALL_SCOPES;
import static org.eclipse.edc.connector.contract.spi.offer.ContractDefinitionResolver.CATALOGING_SCOPE;
import static org.eclipse.edc.connector.contract.spi.validation.ContractValidationService.NEGOTIATION_SCOPE;
import static org.eclipse.edc.connector.contract.spi.validation.ContractValidationService.TRANSFER_SCOPE;

public class BusinessPartnerValidationExtension implements ServiceExtension {

Expand Down Expand Up @@ -93,15 +95,18 @@ public void initialize(ServiceExtensionContext context) {
final BusinessPartnerProhibitionFunction prohibitionFunction =
new BusinessPartnerProhibitionFunction(monitor, logAgreementEvaluation);

ruleBindingRegistry.bind("USE", ALL_SCOPES);
ruleBindingRegistry.bind(BUSINESS_PARTNER_CONSTRAINT_KEY, ALL_SCOPES);
bindToScope(dutyFunction, permissionFunction, prohibitionFunction, TRANSFER_SCOPE);
bindToScope(dutyFunction, permissionFunction, prohibitionFunction, NEGOTIATION_SCOPE);
bindToScope(dutyFunction, permissionFunction, prohibitionFunction, CATALOGING_SCOPE);
}

private void bindToScope(BusinessPartnerDutyFunction dutyFunction, BusinessPartnerPermissionFunction permissionFunction, BusinessPartnerProhibitionFunction prohibitionFunction, String scope) {
ruleBindingRegistry.bind("USE", scope);
ruleBindingRegistry.bind(BUSINESS_PARTNER_CONSTRAINT_KEY, scope);

policyEngine.registerFunction(
ALL_SCOPES, Duty.class, BUSINESS_PARTNER_CONSTRAINT_KEY, dutyFunction);
policyEngine.registerFunction(
ALL_SCOPES, Permission.class, BUSINESS_PARTNER_CONSTRAINT_KEY, permissionFunction);
policyEngine.registerFunction(
ALL_SCOPES, Prohibition.class, BUSINESS_PARTNER_CONSTRAINT_KEY, prohibitionFunction);
policyEngine.registerFunction(scope, Duty.class, BUSINESS_PARTNER_CONSTRAINT_KEY, dutyFunction);
policyEngine.registerFunction(scope, Permission.class, BUSINESS_PARTNER_CONSTRAINT_KEY, permissionFunction);
policyEngine.registerFunction(scope, Prohibition.class, BUSINESS_PARTNER_CONSTRAINT_KEY, prohibitionFunction);
}

private Boolean logAgreementEvaluationSetting(ServiceExtensionContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected boolean evaluate(
return false;
}

final ParticipantAgent participantAgent = policyContext.getParticipantAgent();
final ParticipantAgent participantAgent = policyContext.getContextData(ParticipantAgent.class);

if (participantAgent == null) {
return false;
Expand Down Expand Up @@ -149,7 +149,7 @@ private boolean isBusinessPartnerNumber(String referringConnectorClaim, Object b
policyContext.reportProblem(message);
return false;
}
if (!(businessPartnerNumber instanceof String)) {
if (!(businessPartnerNumber instanceof String businessPartnerNumberStr)) {
final String message =
format(
FAIL_EVALUATION_BECAUSE_RIGHT_VALUE_NOT_STRING,
Expand All @@ -159,7 +159,6 @@ private boolean isBusinessPartnerNumber(String referringConnectorClaim, Object b
return false;
}

var businessPartnerNumberStr = (String) businessPartnerNumber;
var agreement = policyContext.getContextData(ContractAgreement.class);
var isCorrectBusinessPartner = isCorrectBusinessPartner(referringConnectorClaim, businessPartnerNumberStr);

Expand Down
Loading

0 comments on commit 968bb28

Please sign in to comment.