From a6e339d239b101c0f207b0f1880f8da9e2fcd554 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Mon, 22 May 2023 17:56:31 +0200 Subject: [PATCH] feat(EdrCache): add SQL implementation of EDR cache store --- core/edr-cache-core/build.gradle.kts | 3 + .../EdrCacheEntryPredicateConverter.java | 37 ++++ .../InMemoryEndpointDataReferenceCache.java | 34 ++-- ...nMemoryEndpointDataReferenceCacheTest.java | 54 +----- .../build.gradle.kts | 1 + .../AdapterTransferProcessServiceImpl.java | 38 ++-- edc-extensions/edr-cache-sql/build.gradle.kts | 32 ++++ edc-extensions/edr-cache-sql/docs/schema.sql | 22 +++ .../sql/SqlEndpointDataReferenceCache.java | 166 ++++++++++++++++++ ...qlEndpointDataReferenceCacheExtension.java | 68 +++++++ .../sql/schema/BaseSqlEdrStatements.java | 55 ++++++ .../edc/edr/store/sql/schema/EdrMapping.java | 29 +++ .../edr/store/sql/schema/EdrStatements.java | 63 +++++++ .../postgres/PostgresEdrStatements.java | 22 +++ ...rg.eclipse.edc.spi.system.ServiceExtension | 15 ++ ...dpointDataReferenceCacheExtensionTest.java | 61 +++++++ .../SqlEndpointDataReferenceCacheTest.java | 81 +++++++++ ...CpAdapterPostgresqlMigrationExtension.java | 35 ---- .../EdrPostgresqlMigrationExtension.java | 29 +++ ...rg.eclipse.edc.spi.system.ServiceExtension | 2 +- ...V0_0_7__Init_CpAdapter_Database_Schema.sql | 54 ------ .../edr/V0_0_1__Init_Edr_Database_Schema.sql | 34 ++++ edc-tests/e2e-tests/build.gradle.kts | 1 + .../tractusx/edc/lifecycle/DataWiper.java | 7 + .../edc/lifecycle/PgParticipantRuntime.java | 3 + .../lifecycle/TestRuntimeConfiguration.java | 4 + .../tests/edr/NegotiateEdrPostgresqlTest.java | 1 + .../runtime-postgresql/build.gradle.kts | 2 +- gradle/libs.versions.toml | 5 + settings.gradle.kts | 2 + spi/edr-cache-spi/build.gradle.kts | 6 + .../edr/spi/EndpointDataReferenceCache.java | 13 +- .../EndpointDataReferenceCacheBaseTest.java | 152 ++++++++++++++++ 33 files changed, 950 insertions(+), 181 deletions(-) create mode 100644 core/edr-cache-core/src/main/java/org/eclipse/tractusx/edc/edr/core/defaults/EdrCacheEntryPredicateConverter.java create mode 100644 edc-extensions/edr-cache-sql/build.gradle.kts create mode 100644 edc-extensions/edr-cache-sql/docs/schema.sql create mode 100644 edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCache.java create mode 100644 edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheExtension.java create mode 100644 edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/BaseSqlEdrStatements.java create mode 100644 edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/EdrMapping.java create mode 100644 edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/EdrStatements.java create mode 100644 edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/postgres/PostgresEdrStatements.java create mode 100644 edc-extensions/edr-cache-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension create mode 100644 edc-extensions/edr-cache-sql/src/test/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheExtensionTest.java create mode 100644 edc-extensions/edr-cache-sql/src/test/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheTest.java delete mode 100644 edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/CpAdapterPostgresqlMigrationExtension.java create mode 100644 edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/EdrPostgresqlMigrationExtension.java delete mode 100644 edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/cpadapter/V0_0_7__Init_CpAdapter_Database_Schema.sql create mode 100644 edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/edr/V0_0_1__Init_Edr_Database_Schema.sql create mode 100644 spi/edr-cache-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/EndpointDataReferenceCacheBaseTest.java diff --git a/core/edr-cache-core/build.gradle.kts b/core/edr-cache-core/build.gradle.kts index 0c1e5474d..95b0c0be6 100644 --- a/core/edr-cache-core/build.gradle.kts +++ b/core/edr-cache-core/build.gradle.kts @@ -22,5 +22,8 @@ dependencies { implementation(libs.edc.util) implementation(project(":spi:edr-cache-spi")) + + testImplementation(testFixtures(project(":spi:edr-cache-spi"))) + } diff --git a/core/edr-cache-core/src/main/java/org/eclipse/tractusx/edc/edr/core/defaults/EdrCacheEntryPredicateConverter.java b/core/edr-cache-core/src/main/java/org/eclipse/tractusx/edc/edr/core/defaults/EdrCacheEntryPredicateConverter.java new file mode 100644 index 000000000..27346d512 --- /dev/null +++ b/core/edr-cache-core/src/main/java/org/eclipse/tractusx/edc/edr/core/defaults/EdrCacheEntryPredicateConverter.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.edr.core.defaults; + +import org.eclipse.edc.spi.query.BaseCriterionToPredicateConverter; +import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry; + +public class EdrCacheEntryPredicateConverter extends BaseCriterionToPredicateConverter { + + @Override + protected Object property(String key, Object object) { + if (object instanceof EndpointDataReferenceEntry) { + var entry = (EndpointDataReferenceEntry) object; + switch (key) { + case "assetId": + return entry.getAssetId(); + case "agreementId": + return entry.getAgreementId(); + default: + return null; + } + } + throw new IllegalArgumentException("Can only handle objects of type " + EndpointDataReferenceEntry.class.getSimpleName() + " but received an " + object.getClass().getSimpleName()); + } +} diff --git a/core/edr-cache-core/src/main/java/org/eclipse/tractusx/edc/edr/core/defaults/InMemoryEndpointDataReferenceCache.java b/core/edr-cache-core/src/main/java/org/eclipse/tractusx/edc/edr/core/defaults/InMemoryEndpointDataReferenceCache.java index 142355c24..cc6e24081 100644 --- a/core/edr-cache-core/src/main/java/org/eclipse/tractusx/edc/edr/core/defaults/InMemoryEndpointDataReferenceCache.java +++ b/core/edr-cache-core/src/main/java/org/eclipse/tractusx/edc/edr/core/defaults/InMemoryEndpointDataReferenceCache.java @@ -14,6 +14,8 @@ package org.eclipse.tractusx.edc.edr.core.defaults; +import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; import org.eclipse.edc.util.concurrency.LockManager; @@ -28,6 +30,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; +import java.util.stream.Stream; import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; @@ -40,17 +44,16 @@ public class InMemoryEndpointDataReferenceCache implements EndpointDataReferenceCache { private final LockManager lockManager; + private final EdrCacheEntryPredicateConverter predicateConverter = new EdrCacheEntryPredicateConverter(); + private final Map> entriesByAssetId; - private final Map> entriesByAgreementId; - private final Map entriesByEdrId; private final Map edrsByTransferProcessId; public InMemoryEndpointDataReferenceCache() { lockManager = new LockManager(new ReentrantReadWriteLock()); entriesByAssetId = new HashMap<>(); - entriesByAgreementId = new HashMap<>(); entriesByEdrId = new HashMap<>(); edrsByTransferProcessId = new HashMap<>(); } @@ -71,14 +74,8 @@ public List referencesForAsset(String assetId) { } @Override - @NotNull - public List entriesForAsset(String assetId) { - return lockManager.readLock(() -> entriesByAssetId.getOrDefault(assetId, emptyList())); - } - - @Override - public @NotNull List entriesForAgreement(String agreementId) { - return lockManager.readLock(() -> entriesByAgreementId.getOrDefault(agreementId, emptyList())); + public Stream queryForEntries(QuerySpec spec) { + return filterBy(spec.getFilterExpression()); } @Override @@ -88,9 +85,6 @@ public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) { var list = entriesByAssetId.computeIfAbsent(entry.getAssetId(), k -> new ArrayList<>()); list.add(entry); - var agreementList = entriesByAgreementId.computeIfAbsent(entry.getAgreementId(), k -> new ArrayList<>()); - agreementList.add(entry); - edrsByTransferProcessId.put(entry.getTransferProcessId(), edr); return null; }); @@ -103,13 +97,23 @@ public StoreResult deleteByTransferProcessId(String if (edr == null) { return notFound("EDR entry not found for id: " + id); } - var entry = entriesByEdrId.get(edr.getId()); + var entry = entriesByEdrId.remove(edr.getId()); var entries = entriesByAssetId.get(entry.getAssetId()); entries.remove(entry); if (entries.isEmpty()) { entriesByAssetId.remove(entry.getAssetId()); } + return success(entry); }); } + + private Stream filterBy(List criteria) { + var predicate = criteria.stream() + .map(predicateConverter::convert) + .reduce(x -> true, Predicate::and); + + return entriesByEdrId.values().stream() + .filter(predicate); + } } diff --git a/core/edr-cache-core/src/test/java/org/eclipse/tractusx/edc/edr/core/defaults/InMemoryEndpointDataReferenceCacheTest.java b/core/edr-cache-core/src/test/java/org/eclipse/tractusx/edc/edr/core/defaults/InMemoryEndpointDataReferenceCacheTest.java index 5507d6287..7fb287716 100644 --- a/core/edr-cache-core/src/test/java/org/eclipse/tractusx/edc/edr/core/defaults/InMemoryEndpointDataReferenceCacheTest.java +++ b/core/edr-cache-core/src/test/java/org/eclipse/tractusx/edc/edr/core/defaults/InMemoryEndpointDataReferenceCacheTest.java @@ -14,55 +14,15 @@ package org.eclipse.tractusx.edc.edr.core.defaults; -import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; -import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -class InMemoryEndpointDataReferenceCacheTest { - private static final String TRANSFER_PROCESS_ID = "tp1"; - private static final String ASSET_ID = "asset1"; - private static final String AGREEMENT_ID = "agreement1"; - - private static final String EDR_ID = "edr1"; +import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache; +import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheBaseTest; +class InMemoryEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheBaseTest { private final InMemoryEndpointDataReferenceCache cache = new InMemoryEndpointDataReferenceCache(); - @Test - @SuppressWarnings("DataFlowIssue") - void verify_operations() { - var edr = EndpointDataReference.Builder.newInstance() - .endpoint("http://test.com") - .id(EDR_ID) - .authCode("11111") - .authKey("authentication").build(); - - var entry = EndpointDataReferenceEntry.Builder.newInstance() - .assetId(ASSET_ID) - .agreementId(AGREEMENT_ID) - .transferProcessId(TRANSFER_PROCESS_ID) - .build(); - - cache.save(entry, edr); - - assertThat(cache.resolveReference(TRANSFER_PROCESS_ID).getId()).isEqualTo(EDR_ID); - - var edrs = cache.referencesForAsset(ASSET_ID); - assertThat(edrs.size()).isEqualTo(1); - assertThat(edrs.get((0)).getId()).isEqualTo(EDR_ID); - - var entries = cache.entriesForAsset(ASSET_ID); - assertThat(entries.size()).isEqualTo(1); - assertThat(entries.get((0)).getAssetId()).isEqualTo(ASSET_ID); - - entries = cache.entriesForAgreement(AGREEMENT_ID); - assertThat(entries.size()).isEqualTo(1); - assertThat(entries.get((0)).getAgreementId()).isEqualTo(AGREEMENT_ID); - - assertThat(cache.deleteByTransferProcessId(TRANSFER_PROCESS_ID).succeeded()).isTrue(); - - assertThat(cache.entriesForAsset(ASSET_ID)).isEmpty(); - assertThat(cache.resolveReference(TRANSFER_PROCESS_ID)).isNull(); + @Override + protected EndpointDataReferenceCache getStore() { + return cache; } + } diff --git a/edc-controlplane/edc-controlplane-postgresql-hashicorp-vault/build.gradle.kts b/edc-controlplane/edc-controlplane-postgresql-hashicorp-vault/build.gradle.kts index 79ce6ef7b..8b8398dcc 100644 --- a/edc-controlplane/edc-controlplane-postgresql-hashicorp-vault/build.gradle.kts +++ b/edc-controlplane/edc-controlplane-postgresql-hashicorp-vault/build.gradle.kts @@ -30,6 +30,7 @@ dependencies { runtimeOnly(project(":edc-controlplane:edc-controlplane-base")) runtimeOnly(project(":edc-extensions:postgresql-migration")) runtimeOnly(project(":edc-extensions:hashicorp-vault")) + runtimeOnly(project(":edc-extensions:edr-cache-sql")) runtimeOnly(libs.bundles.edc.sqlstores) runtimeOnly(libs.edc.transaction.local) runtimeOnly(libs.edc.sql.pool) diff --git a/edc-extensions/control-plane-adapter-callback/src/main/java/org/eclipse/tractusx/edc/cp/adapter/callback/AdapterTransferProcessServiceImpl.java b/edc-extensions/control-plane-adapter-callback/src/main/java/org/eclipse/tractusx/edc/cp/adapter/callback/AdapterTransferProcessServiceImpl.java index a3ed84dc5..90a3290f6 100644 --- a/edc-extensions/control-plane-adapter-callback/src/main/java/org/eclipse/tractusx/edc/cp/adapter/callback/AdapterTransferProcessServiceImpl.java +++ b/edc-extensions/control-plane-adapter-callback/src/main/java/org/eclipse/tractusx/edc/cp/adapter/callback/AdapterTransferProcessServiceImpl.java @@ -19,6 +19,8 @@ import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestData; import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService; import org.eclipse.edc.service.spi.result.ServiceResult; +import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.types.domain.callback.CallbackAddress; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache; @@ -26,12 +28,9 @@ import org.eclipse.tractusx.edc.spi.cp.adapter.model.NegotiateEdrRequest; import org.eclipse.tractusx.edc.spi.cp.adapter.service.AdapterTransferProcessService; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -88,26 +87,27 @@ public ServiceResult findByTransferProcessId(String trans @Override public ServiceResult> findByAssetAndAgreement(String assetId, String agreementId) { - var results = queryEdrs(assetId, agreementId) - .stream() - .filter(fieldFilter(assetId, EndpointDataReferenceEntry::getAssetId)) - .filter(fieldFilter(agreementId, EndpointDataReferenceEntry::getAgreementId)) - .collect(Collectors.toList()); + var results = queryEdrs(assetId, agreementId).collect(Collectors.toList()); return success(results); } - private Predicate fieldFilter(String value, Function function) { - return entry -> Optional.ofNullable(value) - .map(val -> val.equals(function.apply(entry))) - .orElse(true); + private Stream queryEdrs(String assetId, String agreementId) { + var queryBuilder = QuerySpec.Builder.newInstance(); + if (assetId != null) { + queryBuilder.filter(fieldFilter("assetId", assetId)); + } + if (agreementId != null) { + queryBuilder.filter(fieldFilter("agreementId", agreementId)); + } + return endpointDataReferenceCache.queryForEntries(queryBuilder.build()); } - private List queryEdrs(String assetId, String agreementId) { - // Try first for agreementId and then assetId - return Optional.ofNullable(agreementId) - .map(endpointDataReferenceCache::entriesForAgreement) - .or(() -> Optional.ofNullable(assetId).map(endpointDataReferenceCache::entriesForAsset)) - .orElseGet(Collections::emptyList); - } + private Criterion fieldFilter(String field, String value) { + return Criterion.Builder.newInstance() + .operandLeft(field) + .operator("=") + .operandRight(value) + .build(); + } } diff --git a/edc-extensions/edr-cache-sql/build.gradle.kts b/edc-extensions/edr-cache-sql/build.gradle.kts new file mode 100644 index 000000000..62dc71247 --- /dev/null +++ b/edc-extensions/edr-cache-sql/build.gradle.kts @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +plugins { + `java-library` + `maven-publish` +} + +dependencies { + implementation(project(":spi:edr-cache-spi")) + + implementation(libs.edc.spi.core) + implementation(libs.edc.core.sql) + implementation(libs.edc.spi.transactionspi) + implementation(libs.edc.spi.transaction.datasource) + + testImplementation(testFixtures(project(":spi:edr-cache-spi"))) + testImplementation(testFixtures(libs.edc.core.sql)) + testImplementation(testFixtures(libs.edc.junit)) + +} diff --git a/edc-extensions/edr-cache-sql/docs/schema.sql b/edc-extensions/edr-cache-sql/docs/schema.sql new file mode 100644 index 000000000..fe4ef2104 --- /dev/null +++ b/edc-extensions/edr-cache-sql/docs/schema.sql @@ -0,0 +1,22 @@ +-- +-- Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation +-- + +CREATE TABLE IF NOT EXISTS edc_edr_cache +( + transfer_process_id VARCHAR NOT NULL PRIMARY KEY, + agreement_id VARCHAR NOT NULL, + asset_id VARCHAR NOT NULL, + edr_id VARCHAR NOT NULL, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL +); diff --git a/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCache.java b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCache.java new file mode 100644 index 000000000..bc021b989 --- /dev/null +++ b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCache.java @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.edr.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.spi.result.StoreResult; +import org.eclipse.edc.spi.security.Vault; +import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.eclipse.edc.sql.ResultSetMapper; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache; +import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry; +import org.eclipse.tractusx.edc.edr.store.sql.schema.EdrStatements; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Clock; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuery; +import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuerySingle; + +public class SqlEndpointDataReferenceCache extends AbstractSqlStore implements EndpointDataReferenceCache { + + private final EdrStatements statements; + + private final Clock clock; + + private final Vault vault; + + + public SqlEndpointDataReferenceCache(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext, EdrStatements statements, ObjectMapper objectMapper, Vault vault, Clock clock) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper); + this.statements = statements; + this.clock = clock; + this.vault = vault; + } + + @Override + public @Nullable EndpointDataReference resolveReference(String transferProcessId) { + Objects.requireNonNull(transferProcessId); + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + var edrId = findById(connection, transferProcessId, this::mapToEdrId); + if (edrId != null) { + return referenceFromEntry(edrId); + } + return null; + } catch (Exception exception) { + throw new EdcPersistenceException(exception); + } + }); + } + + private T findById(Connection connection, String id, ResultSetMapper resultSetMapper) { + var sql = statements.getFindByTransferProcessIdTemplate(); + return executeQuerySingle(connection, false, resultSetMapper, sql, id); + } + + @Override + public @NotNull List referencesForAsset(String assetId) { + return internalQuery(queryFor("assetId", assetId), this::mapToEdrId).map(this::referenceFromEntry).collect(Collectors.toList()); + } + + @NotNull + private Stream internalQuery(QuerySpec spec, ResultSetMapper resultSetMapper) { + try { + var queryStmt = statements.createQuery(spec); + return executeQuery(getConnection(), true, resultSetMapper, queryStmt.getQueryAsString(), queryStmt.getParameters()); + } catch (SQLException exception) { + throw new EdcPersistenceException(exception); + } + } + + @Override + public Stream queryForEntries(QuerySpec spec) { + return internalQuery(spec, this::mapResultSet); + } + + @Override + public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) { + transactionContext.execute(() -> { + try (var connection = getConnection()) { + var sql = statements.getInsertTemplate(); + var createdAt = clock.millis(); + executeQuery(connection, sql, entry.getTransferProcessId(), entry.getAssetId(), entry.getAgreementId(), edr.getId(), createdAt, createdAt); + vault.storeSecret(edr.getId(), toJson(edr)).orElseThrow((failure) -> new EdcPersistenceException(failure.getFailureDetail())); + } catch (Exception exception) { + throw new EdcPersistenceException(exception); + } + }); + } + + @Override + public StoreResult deleteByTransferProcessId(String id) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + var entry = findById(connection, id, this::mapResultSet); + if (entry != null) { + executeQuery(connection, statements.getDeleteByIdTemplate(), id); + vault.deleteSecret(id).orElseThrow((failure) -> new EdcPersistenceException(failure.getFailureDetail())); + return StoreResult.success(entry); + } else { + return StoreResult.notFound(format("EDR with id %s not found", id)); + } + } catch (Exception exception) { + throw new EdcPersistenceException(exception); + } + }); + } + + + private EndpointDataReferenceEntry mapResultSet(ResultSet resultSet) throws SQLException { + return EndpointDataReferenceEntry.Builder.newInstance() + .transferProcessId(resultSet.getString(statements.getTransferProcessIdColumn())) + .assetId(resultSet.getString(statements.getAssetIdColumn())) + .agreementId(resultSet.getString(statements.getAgreementIdColumn())) + .build(); + } + + private String mapToEdrId(ResultSet resultSet) throws SQLException { + return resultSet.getString(statements.getEdrId()); + } + + private EndpointDataReference referenceFromEntry(String edrId) { + var edr = vault.resolveSecret(edrId); + if (edr != null) { + return fromJson(edr, EndpointDataReference.class); + } + return null; + } + + private QuerySpec queryFor(String field, String value) { + var filter = Criterion.Builder.newInstance() + .operandLeft(field) + .operator("=") + .operandRight(value) + .build(); + + return QuerySpec.Builder.newInstance().filter(filter).build(); + } +} diff --git a/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheExtension.java b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheExtension.java new file mode 100644 index 000000000..1add47e9a --- /dev/null +++ b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheExtension.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.edr.store.sql; + +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Provider; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.security.Vault; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache; +import org.eclipse.tractusx.edc.edr.store.sql.schema.EdrStatements; +import org.eclipse.tractusx.edc.edr.store.sql.schema.postgres.PostgresEdrStatements; + +import java.time.Clock; + +@Extension(value = SqlEndpointDataReferenceCacheExtension.NAME) +public class SqlEndpointDataReferenceCacheExtension implements ServiceExtension { + + public static final String NAME = "SQL EDR cache store"; + + @Setting(required = true, defaultValue = SqlEndpointDataReferenceCacheExtension.DEFAULT_DATASOURCE_NAME) + public static final String DATASOURCE_SETTING_NAME = "edc.datasource.edr.name"; + public static final String DEFAULT_DATASOURCE_NAME = "edr"; + @Inject + private DataSourceRegistry dataSourceRegistry; + @Inject + private TransactionContext transactionContext; + @Inject(required = false) + private EdrStatements statements; + @Inject + private TypeManager typeManager; + @Inject + private Clock clock; + @Inject + private Vault vault; + + @Override + public String name() { + return NAME; + } + + @Provider + public EndpointDataReferenceCache edrCache(ServiceExtensionContext context) { + var dataSourceName = context.getConfig().getString(DATASOURCE_SETTING_NAME, DEFAULT_DATASOURCE_NAME); + return new SqlEndpointDataReferenceCache(dataSourceRegistry, dataSourceName, transactionContext, getStatementImpl(), typeManager.getMapper(), vault, clock); + } + + private EdrStatements getStatementImpl() { + return statements == null ? new PostgresEdrStatements() : statements; + } +} diff --git a/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/BaseSqlEdrStatements.java b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/BaseSqlEdrStatements.java new file mode 100644 index 000000000..1e4111928 --- /dev/null +++ b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/BaseSqlEdrStatements.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2020 - 2022 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.edr.store.sql.schema; + +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +import static java.lang.String.format; + +public class BaseSqlEdrStatements implements EdrStatements { + + @Override + public String getFindByTransferProcessIdTemplate() { + return format("SELECT * FROM %s WHERE %s = ?", getEdrTable(), getTransferProcessIdColumn()); + } + + + @Override + public SqlQueryStatement createQuery(QuerySpec querySpec) { + var select = format("SELECT * FROM %s", getEdrTable()); + return new SqlQueryStatement(select, querySpec, new EdrMapping(this)); + } + + @Override + public String getInsertTemplate() { + return format("INSERT INTO %s (%s, %s, %s, %s,%s, %s) VALUES (?, ?, ?, ?, ?, ?)", + getEdrTable(), + getTransferProcessIdColumn(), + getAssetIdColumn(), + getAgreementIdColumn(), + getEdrId(), + getCreatedAtColumn(), + getUpdatedAtColumn() + ); + } + + @Override + public String getDeleteByIdTemplate() { + return format("DELETE FROM %s WHERE %s = ?", + getEdrTable(), + getTransferProcessIdColumn()); + } +} diff --git a/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/EdrMapping.java b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/EdrMapping.java new file mode 100644 index 000000000..fbc615bff --- /dev/null +++ b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/EdrMapping.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.edr.store.sql.schema; + +import org.eclipse.edc.sql.translation.TranslationMapping; +import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry; + +/** + * Maps fields of a {@link EndpointDataReferenceEntry} onto the + * corresponding SQL schema (= column names) + */ +public class EdrMapping extends TranslationMapping { + public EdrMapping(EdrStatements statements) { + add("assetId", statements.getAssetIdColumn()); + add("agreementId", statements.getAgreementIdColumn()); + } +} diff --git a/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/EdrStatements.java b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/EdrStatements.java new file mode 100644 index 000000000..d755dc2e4 --- /dev/null +++ b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/EdrStatements.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.edr.store.sql.schema; + +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +/** + * Sql Statements for DataPlane Store + */ +public interface EdrStatements { + + default String getEdrTable() { + return "edc_edr_cache"; + } + + default String getTransferProcessIdColumn() { + return "transfer_process_id"; + } + + default String getAgreementIdColumn() { + return "agreement_id"; + } + + default String getAssetIdColumn() { + return "asset_id"; + } + + default String getEdrId() { + return "edr_id"; + } + + default String getCreatedAtColumn() { + return "created_at"; + } + + default String getUpdatedAtColumn() { + return "updated_at"; + } + + + String getFindByTransferProcessIdTemplate(); + + SqlQueryStatement createQuery(QuerySpec querySpec); + + String getInsertTemplate(); + + String getDeleteByIdTemplate(); + +} + diff --git a/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/postgres/PostgresEdrStatements.java b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/postgres/PostgresEdrStatements.java new file mode 100644 index 000000000..3feb69028 --- /dev/null +++ b/edc-extensions/edr-cache-sql/src/main/java/org/eclipse/tractusx/edc/edr/store/sql/schema/postgres/PostgresEdrStatements.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.edr.store.sql.schema.postgres; + +import org.eclipse.tractusx.edc.edr.store.sql.schema.BaseSqlEdrStatements; + +public class PostgresEdrStatements extends BaseSqlEdrStatements { + + +} diff --git a/edc-extensions/edr-cache-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/edc-extensions/edr-cache-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 000000000..8405d9959 --- /dev/null +++ b/edc-extensions/edr-cache-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,15 @@ +# +# Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# +# Contributors: +# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation +# +# + +org.eclipse.tractusx.edc.edr.store.sql.SqlEndpointDataReferenceCacheExtension \ No newline at end of file diff --git a/edc-extensions/edr-cache-sql/src/test/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheExtensionTest.java b/edc-extensions/edr-cache-sql/src/test/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheExtensionTest.java new file mode 100644 index 000000000..0bc1446a9 --- /dev/null +++ b/edc-extensions/edr-cache-sql/src/test/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheExtensionTest.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.edr.store.sql; + +import org.eclipse.edc.junit.extensions.DependencyInjectionExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.system.configuration.Config; +import org.eclipse.edc.spi.system.injection.ObjectFactory; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.tractusx.edc.edr.store.sql.SqlEndpointDataReferenceCacheExtension.DATASOURCE_SETTING_NAME; +import static org.eclipse.tractusx.edc.edr.store.sql.SqlEndpointDataReferenceCacheExtension.DEFAULT_DATASOURCE_NAME; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(DependencyInjectionExtension.class) +public class SqlEndpointDataReferenceCacheExtensionTest { + + SqlEndpointDataReferenceCacheExtension extension; + ServiceExtensionContext context; + + + @BeforeEach + void setUp(ObjectFactory factory, ServiceExtensionContext context) { + this.context = spy(context); + context.registerService(TypeManager.class, new TypeManager()); + context.registerService(DataSourceRegistry.class, mock(DataSourceRegistry.class)); + extension = factory.constructInstance(SqlEndpointDataReferenceCacheExtension.class); + } + + @Test + void shouldInitializeTheStore() { + var config = mock(Config.class); + when(context.getConfig()).thenReturn(config); + when(config.getString(any(), any())).thenReturn(DEFAULT_DATASOURCE_NAME); + + assertThat(extension.edrCache(context)).isInstanceOf(SqlEndpointDataReferenceCache.class); + + verify(config).getString(DATASOURCE_SETTING_NAME, DEFAULT_DATASOURCE_NAME); + } +} diff --git a/edc-extensions/edr-cache-sql/src/test/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheTest.java b/edc-extensions/edr-cache-sql/src/test/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheTest.java new file mode 100644 index 000000000..ecd39b195 --- /dev/null +++ b/edc-extensions/edr-cache-sql/src/test/java/org/eclipse/tractusx/edc/edr/store/sql/SqlEndpointDataReferenceCacheTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.edr.store.sql; + +import org.eclipse.edc.junit.annotations.PostgresqlDbIntegrationTest; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.security.Vault; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.eclipse.edc.sql.testfixtures.PostgresqlStoreSetupExtension; +import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache; +import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheBaseTest; +import org.eclipse.tractusx.edc.edr.store.sql.schema.EdrStatements; +import org.eclipse.tractusx.edc.edr.store.sql.schema.postgres.PostgresEdrStatements; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.SQLException; +import java.time.Clock; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@PostgresqlDbIntegrationTest +@ExtendWith(PostgresqlStoreSetupExtension.class) +public class SqlEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheBaseTest { + + EdrStatements statements = new PostgresEdrStatements(); + SqlEndpointDataReferenceCache cache; + + Clock clock = Clock.systemUTC(); + + Vault vault = mock(Vault.class); + + TypeManager typeManager = new TypeManager(); + + + @BeforeEach + void setUp(PostgresqlStoreSetupExtension extension) throws IOException { + + when(vault.deleteSecret(any())).thenReturn(Result.success()); + when(vault.storeSecret(any(), any())).thenReturn(Result.success()); + + cache = new SqlEndpointDataReferenceCache(extension.getDataSourceRegistry(), extension.getDatasourceName(), extension.getTransactionContext(), statements, typeManager.getMapper(), vault, clock); + var schema = Files.readString(Paths.get("./docs/schema.sql")); + extension.runQuery(schema); + + } + + @Override + protected void prepareEdr(EndpointDataReference edr) { + when(vault.resolveSecret(edr.getId())).thenReturn(typeManager.writeValueAsString(edr)); + } + + @AfterEach + void tearDown(PostgresqlStoreSetupExtension extension) throws SQLException { + extension.runQuery("DROP TABLE " + statements.getEdrTable() + " CASCADE"); + } + + @Override + protected EndpointDataReferenceCache getStore() { + return cache; + } +} diff --git a/edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/CpAdapterPostgresqlMigrationExtension.java b/edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/CpAdapterPostgresqlMigrationExtension.java deleted file mode 100644 index 15c3e710d..000000000 --- a/edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/CpAdapterPostgresqlMigrationExtension.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2023 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.eclipse.tractusx.edc.postgresql.migration; - -public class CpAdapterPostgresqlMigrationExtension extends AbstractPostgresqlMigrationExtension { - private static final String NAME_SUBSYSTEM = "cpadapter"; - private static final String DATASOURCE_SETTING_NAME = "edc.datasource.cpadapter.name"; - - @Override - protected String getDataSourceNameConfigurationKey() { - return DATASOURCE_SETTING_NAME; - } - - @Override - protected String getSubsystemName() { - return NAME_SUBSYSTEM; - } -} diff --git a/edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/EdrPostgresqlMigrationExtension.java b/edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/EdrPostgresqlMigrationExtension.java new file mode 100644 index 000000000..2ebb12bb1 --- /dev/null +++ b/edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/EdrPostgresqlMigrationExtension.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.postgresql.migration; + +public class EdrPostgresqlMigrationExtension extends AbstractPostgresqlMigrationExtension { + private static final String NAME_SUBSYSTEM = "edr"; + + private static final String DATASOURCE_SETTING_NAME = "edc.datasource.edr.name"; + + protected String getDataSourceNameConfigurationKey() { + return DATASOURCE_SETTING_NAME; + } + + protected String getSubsystemName() { + return NAME_SUBSYSTEM; + } +} diff --git a/edc-extensions/postgresql-migration/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/edc-extensions/postgresql-migration/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension index b34529d7e..a641ec766 100644 --- a/edc-extensions/postgresql-migration/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension +++ b/edc-extensions/postgresql-migration/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -22,4 +22,4 @@ org.eclipse.tractusx.edc.postgresql.migration.ContractDefinitionPostgresqlMigrat org.eclipse.tractusx.edc.postgresql.migration.ContractNegotiationPostgresqlMigrationExtension org.eclipse.tractusx.edc.postgresql.migration.PolicyPostgresqlMigrationExtension org.eclipse.tractusx.edc.postgresql.migration.TransferProcessPostgresqlMigrationExtension -org.eclipse.tractusx.edc.postgresql.migration.CpAdapterPostgresqlMigrationExtension +org.eclipse.tractusx.edc.postgresql.migration.EdrPostgresqlMigrationExtension diff --git a/edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/cpadapter/V0_0_7__Init_CpAdapter_Database_Schema.sql b/edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/cpadapter/V0_0_7__Init_CpAdapter_Database_Schema.sql deleted file mode 100644 index 3e64d3c45..000000000 --- a/edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/cpadapter/V0_0_7__Init_CpAdapter_Database_Schema.sql +++ /dev/null @@ -1,54 +0,0 @@ --- --- Copyright (c) 2022 ZF Friedrichshafen AG --- --- This program and the accompanying materials are made available under the --- terms of the Apache License, Version 2.0 which is available at --- https://www.apache.org/licenses/LICENSE-2.0 --- --- SPDX-License-Identifier: Apache-2.0 --- --- Contributors: --- ZF Friedrichshafen AG - Initial SQL Query --- - --- Statements are designed for and tested with Postgres only! - - -CREATE TABLE IF NOT EXISTS edc_lease -( - leased_by VARCHAR NOT NULL, - leased_at BIGINT, - lease_duration INTEGER NOT NULL, - lease_id VARCHAR NOT NULL - CONSTRAINT lease_pk - PRIMARY KEY -); - -CREATE TABLE IF NOT EXISTS edc_cpadapter_queue -( - id VARCHAR NOT NULL, - created_at BIGINT NOT NULL, - channel VARCHAR, - message JSON, - invoke_after BIGINT NOT NULL, - lease_id VARCHAR - CONSTRAINT cpadapter_queue_lease_lease_id_fk - REFERENCES edc_lease - ON DELETE SET NULL, - PRIMARY KEY (id) -); - -CREATE UNIQUE INDEX IF NOT EXISTS edc_cpadapter_queue_id_uindex - ON edc_cpadapter_queue (id); - -CREATE TABLE IF NOT EXISTS edc_cpadapter_object_store -( - id VARCHAR NOT NULL, - created_at BIGINT NOT NULL, - type VARCHAR, - object JSON, - PRIMARY KEY (id) -); - - - diff --git a/edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/edr/V0_0_1__Init_Edr_Database_Schema.sql b/edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/edr/V0_0_1__Init_Edr_Database_Schema.sql new file mode 100644 index 000000000..0e166045b --- /dev/null +++ b/edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/edr/V0_0_1__Init_Edr_Database_Schema.sql @@ -0,0 +1,34 @@ +-- +-- Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation +-- + +-- Statements are designed for and tested with Postgres only! + + +CREATE TABLE IF NOT EXISTS edc_edr_cache +( + transfer_process_id VARCHAR NOT NULL PRIMARY KEY, + agreement_id VARCHAR NOT NULL, + asset_id VARCHAR NOT NULL, + edr_id VARCHAR NOT NULL, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL +); + + +CREATE INDEX IF NOT EXISTS edc_edr_asset_id_index + ON edc_edr_cache (asset_id); + + +CREATE INDEX IF NOT EXISTS edc_edr_agreement_id_index + ON edc_edr_cache (agreement_id); + diff --git a/edc-tests/e2e-tests/build.gradle.kts b/edc-tests/e2e-tests/build.gradle.kts index 3ff1694e3..63181a7ca 100644 --- a/edc-tests/e2e-tests/build.gradle.kts +++ b/edc-tests/e2e-tests/build.gradle.kts @@ -17,6 +17,7 @@ plugins { } dependencies { + testImplementation(project(":spi:edr-cache-spi")) testImplementation(project(":edc-extensions:control-plane-adapter-api")) testImplementation(libs.okhttp.mockwebserver) testImplementation(libs.restAssured) diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/DataWiper.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/DataWiper.java index 45bfba659..6910eeda6 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/DataWiper.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/DataWiper.java @@ -19,6 +19,7 @@ import org.eclipse.edc.spi.asset.AssetIndex; import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache; import java.util.stream.Collectors; @@ -37,6 +38,7 @@ public void clearPersistence() { clearAssetIndex(); clearPolicies(); clearContractDefinitions(); + clearEdrCache(); } public void clearContractDefinitions() { @@ -54,4 +56,9 @@ public void clearAssetIndex() { var index = context.getService(AssetIndex.class); index.queryAssets(QuerySpec.max()).forEach(asset -> index.deleteById(asset.getId())); } + + public void clearEdrCache() { + var edrCache = context.getService(EndpointDataReferenceCache.class); + edrCache.queryForEntries(QuerySpec.max()).forEach(entry -> edrCache.deleteByTransferProcessId(entry.getTransferProcessId())); + } } diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/PgParticipantRuntime.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/PgParticipantRuntime.java index 0860e0ef0..c03afcff9 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/PgParticipantRuntime.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/PgParticipantRuntime.java @@ -14,7 +14,9 @@ package org.eclipse.tractusx.edc.lifecycle; +import org.eclipse.edc.junit.testfixtures.MockVault; import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.security.Vault; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.system.injection.InjectionContainer; @@ -32,6 +34,7 @@ public PgParticipantRuntime(String moduleName, String runtimeName, String bpn, M super(moduleName, runtimeName, bpn, properties); this.dbName = runtimeName.toLowerCase(); this.registerServiceMock(IdentityService.class, new MockDapsService(bpn)); + this.registerServiceMock(Vault.class, new MockVault()); } @Override diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/TestRuntimeConfiguration.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/TestRuntimeConfiguration.java index ec88ffcc0..0e83f6ce7 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/TestRuntimeConfiguration.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/TestRuntimeConfiguration.java @@ -86,6 +86,10 @@ public static Map postgresqlConfiguration(String name) { put("edc.datasource.transferprocess.url", jdbcUrl); put("edc.datasource.transferprocess.user", PostgresqlLocalInstance.USER); put("edc.datasource.transferprocess.password", PostgresqlLocalInstance.PASSWORD); + put("edc.datasource.edr.name", "edr"); + put("edc.datasource.edr.url", jdbcUrl); + put("edc.datasource.edr.user", PostgresqlLocalInstance.USER); + put("edc.datasource.edr.password", PostgresqlLocalInstance.PASSWORD); } }; } diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/NegotiateEdrPostgresqlTest.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/NegotiateEdrPostgresqlTest.java index d7db9cc31..ec2ccf7bf 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/NegotiateEdrPostgresqlTest.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/NegotiateEdrPostgresqlTest.java @@ -42,4 +42,5 @@ public class NegotiateEdrPostgresqlTest extends AbstractNegotiateEdrTest { PLATO_BPN, platoPostgresqlConfiguration() ); + } diff --git a/edc-tests/runtime/runtime-postgresql/build.gradle.kts b/edc-tests/runtime/runtime-postgresql/build.gradle.kts index 53253b67a..74f8003ec 100644 --- a/edc-tests/runtime/runtime-postgresql/build.gradle.kts +++ b/edc-tests/runtime/runtime-postgresql/build.gradle.kts @@ -29,7 +29,7 @@ dependencies { } implementation(project(":edc-tests:runtime:extensions")) - + // use basic (all in-mem) data plane runtimeOnly(project(":edc-dataplane:edc-dataplane-base")) { exclude("org.eclipse.edc", "api-observability") diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f30e6cb3f..b0f41fd58 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -17,6 +17,8 @@ apache-sshd = "2.10.0" testcontainers = "1.18.1" aws = "2.20.70" rsApi = "3.1.0" +jupiter = "5.9.2" +assertj = "3.23.1" [libraries] edc-spi-catalog = { module = "org.eclipse.edc:catalog-spi", version.ref = "edc" } @@ -44,6 +46,7 @@ edc-core-connector = { module = "org.eclipse.edc:connector-core", version.ref = edc-core-jetty = { module = "org.eclipse.edc:jetty-core", version.ref = "edc" } edc-core-jersey = { module = "org.eclipse.edc:jersey-core", version.ref = "edc" } edc-core-api = { module = "org.eclipse.edc:api-core", version.ref = "edc" } +edc-core-sql = { module = "org.eclipse.edc:sql-core", version.ref = "edc" } edc-junit = { module = "org.eclipse.edc:junit", version.ref = "edc" } edc-api-management-config = { module = "org.eclipse.edc:management-api-configuration", version.ref = "edc" } edc-api-management = { module = "org.eclipse.edc:management-api", version.ref = "edc" } @@ -124,6 +127,8 @@ apache-sshd-sftp = { module = "org.apache.sshd:sshd-sftp", version.ref = "apache testcontainers-junit = { module = "org.testcontainers:junit-jupiter", version.ref = "testcontainers" } aws-s3 = { module = "software.amazon.awssdk:s3", version.ref = "aws" } jakarta-rsApi = { module = "jakarta.ws.rs:jakarta.ws.rs-api", version.ref = "rsApi" } +junit-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "jupiter" } +assertj = { module = "org.assertj:assertj-core", version.ref = "assertj" } [bundles] edc-connector = ["edc.boot", "edc.core-connector", "edc.core-controlplane", "edc.api-observability"] diff --git a/settings.gradle.kts b/settings.gradle.kts index ed1a24264..3cd6fd9e5 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -41,6 +41,8 @@ include(":edc-extensions:transferprocess-sftp-common") include(":edc-extensions:transferprocess-sftp-provisioner") include(":edc-extensions:control-plane-adapter-api") include(":edc-extensions:control-plane-adapter-callback") +include(":edc-extensions:edr-cache-sql") + include(":edc-tests:e2e-tests") diff --git a/spi/edr-cache-spi/build.gradle.kts b/spi/edr-cache-spi/build.gradle.kts index f60c83b17..a08b24ee3 100644 --- a/spi/edr-cache-spi/build.gradle.kts +++ b/spi/edr-cache-spi/build.gradle.kts @@ -14,10 +14,16 @@ plugins { `java-library` + `java-test-fixtures` } dependencies { implementation(project(":spi:core-spi")) implementation(libs.edc.spi.core) + + testFixturesImplementation(libs.edc.junit) + testFixturesImplementation(libs.junit.jupiter.api) + testFixturesImplementation(libs.assertj) + } diff --git a/spi/edr-cache-spi/src/main/java/org/eclipse/tractusx/edc/edr/spi/EndpointDataReferenceCache.java b/spi/edr-cache-spi/src/main/java/org/eclipse/tractusx/edc/edr/spi/EndpointDataReferenceCache.java index 556d97729..bf0f3dc37 100644 --- a/spi/edr-cache-spi/src/main/java/org/eclipse/tractusx/edc/edr/spi/EndpointDataReferenceCache.java +++ b/spi/edr-cache-spi/src/main/java/org/eclipse/tractusx/edc/edr/spi/EndpointDataReferenceCache.java @@ -14,12 +14,14 @@ package org.eclipse.tractusx.edc.edr.spi; +import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.List; +import java.util.stream.Stream; /** * Caches and resolves {@link EndpointDataReference}s @@ -39,17 +41,10 @@ public interface EndpointDataReferenceCache { List referencesForAsset(String assetId); /** - * Returns the {@link EndpointDataReferenceEntry}s for the asset. + * Returns all the EDR entries in the store that are covered by a given {@link QuerySpec}. */ - @NotNull - List entriesForAsset(String assetId); - - /** - * Returns the {@link EndpointDataReferenceEntry}s for the agreement. - */ - @NotNull - List entriesForAgreement(String agreementId); + Stream queryForEntries(QuerySpec spec); /** * Saves an {@link EndpointDataReference} to the cache using upsert semantics. diff --git a/spi/edr-cache-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/EndpointDataReferenceCacheBaseTest.java b/spi/edr-cache-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/EndpointDataReferenceCacheBaseTest.java new file mode 100644 index 000000000..890360f9c --- /dev/null +++ b/spi/edr-cache-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/EndpointDataReferenceCacheBaseTest.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.edr.spi; + +import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.spi.result.StoreFailure; +import org.eclipse.edc.spi.result.StoreResult; +import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.junit.jupiter.api.Test; + +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.UUID.randomUUID; +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class EndpointDataReferenceCacheBaseTest { + + protected abstract EndpointDataReferenceCache getStore(); + + @Test + void save() { + + var tpId = "tp1"; + var assetId = "asset1"; + var edrId = "edr1"; + + var edr = edr(edrId); + + prepareEdr(edr); + + var entry = edrEntry(assetId, randomUUID().toString(), tpId); + + getStore().save(entry, edr); + + assertThat(getStore().resolveReference(tpId)) + .isNotNull() + .extracting(EndpointDataReference::getId) + .isEqualTo(edrId); + + var edrs = getStore().referencesForAsset(assetId); + assertThat(edrs.size()).isEqualTo(1); + assertThat(edrs.get((0)).getId()).isEqualTo(edrId); + + } + + + @Test + void queryEntries_noQuerySpec() { + var all = IntStream.range(0, 10) + .mapToObj(i -> edrEntry("assetId" + i, "agreementId" + i, "tpId" + i)) + .peek(entry -> getStore().save(entry, edr(entry.getTransferProcessId()))) + .collect(Collectors.toList()); + + assertThat(getStore().queryForEntries(QuerySpec.none())).containsExactlyInAnyOrderElementsOf(all); + } + + + @Test + void queryEntries_assetIdQuerySpec() { + IntStream.range(0, 10) + .mapToObj(i -> edrEntry("assetId" + i, "agreementId" + i, "tpId" + i)) + .forEach(entry -> getStore().save(entry, edr(entry.getTransferProcessId()))); + + var entry = edrEntry("assetId", "agreementId", "tpId"); + getStore().save(entry, edr("tpId")); + + var filter = Criterion.Builder.newInstance() + .operandLeft("assetId") + .operator("=") + .operandRight(entry.getAssetId()) + .build(); + + assertThat(getStore().queryForEntries(QuerySpec.Builder.newInstance().filter(filter).build())).containsOnly(entry); + } + + @Test + void queryEntries_agreementIdQuerySpec() { + IntStream.range(0, 10) + .mapToObj(i -> edrEntry("assetId" + i, "agreementId" + i, "tpId" + i)) + .forEach(entry -> getStore().save(entry, edr(entry.getTransferProcessId()))); + + var entry = edrEntry("assetId", "agreementId", "tpId"); + getStore().save(entry, edr("tpId")); + + var filter = Criterion.Builder.newInstance() + .operandLeft("agreementId") + .operator("=") + .operandRight(entry.getAgreementId()) + .build(); + + assertThat(getStore().queryForEntries(QuerySpec.Builder.newInstance().filter(filter).build())).containsOnly(entry); + } + + @Test + void deleteByTransferProcessId_shouldDelete_WhenFound() { + + var entry = edrEntry("assetId", "agreementId", "tpId"); + getStore().save(entry, edr("tpId")); + + assertThat(getStore().deleteByTransferProcessId(entry.getTransferProcessId())) + .extracting(StoreResult::getContent) + .isEqualTo(entry); + + assertThat(getStore().resolveReference(entry.getTransferProcessId())).isNull(); + assertThat(getStore().referencesForAsset(entry.getAssetId())).hasSize(0); + assertThat(getStore().queryForEntries(QuerySpec.max())).hasSize(0); + + } + + @Test + void deleteByTransferProcessId_shouldReturnError_whenNotFound() { + assertThat(getStore().deleteByTransferProcessId("notFound")) + .extracting(StoreResult::reason) + .isEqualTo(StoreFailure.Reason.NOT_FOUND); + } + + protected void prepareEdr(EndpointDataReference edr) { + + } + + + private EndpointDataReference edr(String id) { + return EndpointDataReference.Builder.newInstance() + .endpoint("http://test.com") + .id(id) + .authCode("11111") + .authKey("authentication").build(); + } + + private EndpointDataReferenceEntry edrEntry(String assetId, String agreementId, String transferProcessId) { + return EndpointDataReferenceEntry.Builder.newInstance() + .assetId(assetId) + .agreementId(agreementId) + .transferProcessId(transferProcessId) + .build(); + } + +}