From d75b4b665f16f190a11ea2d8d089baf2621f5439 Mon Sep 17 00:00:00 2001 From: Michael Reiche <48999328+mikereiche@users.noreply.github.com> Date: Wed, 9 Oct 2024 17:46:32 -0700 Subject: [PATCH] Propagate SecurityContext into @Transactional methods. (#1979) Closes #1944. --- .../CouchbaseCallbackTransactionManager.java | 45 ++++++++++++++++--- .../transaction/CouchbaseResourceHolder.java | 20 +++++++++ .../transaction/CouchbaseResourceOwner.java | 41 +++++++++++++++++ ...onTransactionReactiveIntegrationTests.java | 13 ++++++ .../transactions/PersonServiceReactive.java | 12 +++++ .../data/couchbase/util/Util.java | 23 ++++++++++ 6 files changed, 147 insertions(+), 7 deletions(-) create mode 100644 src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceOwner.java diff --git a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java index 96c1ae896..dbf407e0b 100644 --- a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java +++ b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java @@ -18,6 +18,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -89,6 +90,7 @@ public T execute(TransactionDefinition definition, TransactionCallback ca @Stability.Internal Flux executeReactive(TransactionDefinition definition, org.springframework.transaction.reactive.TransactionCallback callback) { + final CouchbaseResourceHolder couchbaseResourceHolder = new CouchbaseResourceHolder(null, getSecurityContext()); // caller's resources return TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMapMany(isInTransaction -> { boolean isInExistingTransaction = isInTransaction.isPresent(); boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction); @@ -100,17 +102,20 @@ Flux executeReactive(TransactionDefinition definition, } else { return Mono.error(new UnsupportedOperationException("Unsupported operation")); } - }); + }).contextWrite( // set CouchbaseResourceHolder containing caller's SecurityContext + ctx -> ctx.put(CouchbaseResourceHolder.class, couchbaseResourceHolder)); } private T executeNewTransaction(TransactionCallback callback) { final AtomicReference execResult = new AtomicReference<>(); + final CouchbaseResourceHolder couchbaseResourceHolder = new CouchbaseResourceHolder(null, getSecurityContext()); // Each of these transactions will block one thread on the underlying SDK's transactions scheduler. This // scheduler is effectively unlimited, but this can still potentially lead to high thread usage by the application. // If this is an issue then users need to instead use the standard Couchbase reactive transactions SDK. try { TransactionResult ignored = couchbaseClientFactory.getCluster().transactions().run(ctx -> { + setSecurityContext(couchbaseResourceHolder.getSecurityContext()); // set the security context for the transaction CouchbaseTransactionStatus status = new CouchbaseTransactionStatus(ctx, true, false, false, true, null); T res = callback.doInTransaction(status); @@ -173,12 +178,16 @@ public boolean isCompleted() { } }; - return Flux.from(callback.doInTransaction(status)).doOnNext(v -> out.add(v)).then(Mono.defer(() -> { - if (status.isRollbackOnly()) { - return Mono.error(new TransactionRollbackRequestedException("TransactionStatus.isRollbackOnly() is set")); - } - return Mono.empty(); - })); + // Get caller's resources, set SecurityContext for the transaction + return CouchbaseResourceOwner.get().map(cbrh -> setSecurityContext(cbrh.get().getSecurityContext())) + .flatMap(ignore -> Flux.from(callback.doInTransaction(status)).doOnNext(v -> out.add(v)) + .then(Mono.defer(() -> { + if (status.isRollbackOnly()) { + return Mono.error(new TransactionRollbackRequestedException( + "TransactionStatus.isRollbackOnly() is set")); + } + return Mono.empty(); + }))); }); }, this.options).thenMany(Flux.defer(() -> Flux.fromIterable(out))).onErrorMap(ex -> { @@ -288,4 +297,26 @@ public void rollback(TransactionStatus ignored) throws TransactionException { throw new UnsupportedOperationException( "Direct programmatic use of the Couchbase PlatformTransactionManager is not supported"); } + + static private Object getSecurityContext() { + try { + Class securityContextHolderClass = Class + .forName("org.springframework.security.core.context.SecurityContextHolder"); + return securityContextHolderClass.getMethod("getContext").invoke(null); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException cnfe) {} + return null; + } + + static private S setSecurityContext(S sc) { + try { + Class securityContextHolder = Class.forName("org.springframework.security.core.context.SecurityContext"); + Class securityContextHolderClass = Class + .forName("org.springframework.security.core.context.SecurityContextHolder"); + securityContextHolderClass.getMethod("setContext", new Class[] { securityContextHolder }).invoke(null, sc); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException cnfe) {} + return sc; + } + } diff --git a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceHolder.java b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceHolder.java index ae2085d21..5e70ef9e6 100644 --- a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceHolder.java +++ b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceHolder.java @@ -34,6 +34,8 @@ public class CouchbaseResourceHolder extends ResourceHolderSupport { private @Nullable CoreTransactionAttemptContext core; // which holds the atr + private @Nullable Object securityContext; // SecurityContext. We don't have the class. + Map getResultMap = new HashMap<>(); /** @@ -42,7 +44,17 @@ public class CouchbaseResourceHolder extends ResourceHolderSupport { * @param core the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}. */ public CouchbaseResourceHolder(@Nullable CoreTransactionAttemptContext core) { + this(core, null); + } + + /** + * Create a new {@link CouchbaseResourceHolder} for a given {@link CoreTransactionAttemptContext session}. + * + * @param core the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}. + */ + public CouchbaseResourceHolder(@Nullable CoreTransactionAttemptContext core, @Nullable Object securityContext) { this.core = core; + this.securityContext = securityContext; } /** @@ -53,6 +65,14 @@ public CoreTransactionAttemptContext getCore() { return core; } + /** + * @return the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}. + */ + @Nullable + public Object getSecurityContext() { + return securityContext; + } + public Object transactionResultHolder(Object holder, Object o) { getResultMap.put(System.identityHashCode(o), holder); return holder; diff --git a/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceOwner.java b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceOwner.java new file mode 100644 index 000000000..8be143e67 --- /dev/null +++ b/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseResourceOwner.java @@ -0,0 +1,41 @@ +package org.springframework.data.couchbase.transaction; + +import reactor.core.publisher.Mono; + +import java.util.Optional; + +import com.couchbase.client.core.annotation.Stability.Internal; + +@Internal +public class CouchbaseResourceOwner { + private static final ThreadLocal marker = new ThreadLocal(); + + public CouchbaseResourceOwner() {} + + public static void set(CouchbaseResourceHolder toInject) { + if (marker.get() != null) { + throw new IllegalStateException( + "Trying to set resource holder when already inside a transaction - likely an internal bug, please report it"); + } else { + marker.set(toInject); + } + } + + public static void clear() { + marker.remove(); + } + + public static Mono> get() { + return Mono.deferContextual((ctx) -> { + CouchbaseResourceHolder fromThreadLocal = marker.get(); + CouchbaseResourceHolder fromReactive = ctx.hasKey(CouchbaseResourceHolder.class) + ? ctx.get(CouchbaseResourceHolder.class) + : null; + if (fromThreadLocal != null) { + return Mono.just(Optional.of(fromThreadLocal)); + } else { + return fromReactive != null ? Mono.just(Optional.of(fromReactive)) : Mono.just(Optional.empty()); + } + }); + } +} diff --git a/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionReactiveIntegrationTests.java b/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionReactiveIntegrationTests.java index 01cb2340e..4aa4776b5 100644 --- a/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionReactiveIntegrationTests.java +++ b/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionReactiveIntegrationTests.java @@ -18,6 +18,7 @@ import static com.couchbase.client.java.query.QueryScanConsistency.REQUEST_PLUS; +import org.springframework.data.couchbase.util.Util; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -119,6 +120,7 @@ public void shouldRollbackAfterExceptionOfTxAnnotatedMethod() { @Test public void commitShouldPersistTxEntries() { + System.err.println("parent SecurityContext: " + System.identityHashCode(Util.getSecurityContext())); personService.savePerson(WalterWhite) // .as(StepVerifier::create) // .expectNextCount(1) // @@ -130,6 +132,17 @@ public void commitShouldPersistTxEntries() { .verifyComplete(); } + @Test + public void commitShouldPersistTxEntriesBlocking() { + System.err.println("parent SecurityContext: " + System.identityHashCode(Util.getSecurityContext())); + Person p = personService.savePersonBlocking(WalterWhite); + + operations.findByQuery(Person.class).withConsistency(REQUEST_PLUS).count() // + .as(StepVerifier::create) // + .expectNext(1L) // + .verifyComplete(); + } + @Test public void commitShouldPersistTxEntriesOfTxAnnotatedMethod() { diff --git a/src/test/java/org/springframework/data/couchbase/transactions/PersonServiceReactive.java b/src/test/java/org/springframework/data/couchbase/transactions/PersonServiceReactive.java index 87983fd5e..734b7abd3 100644 --- a/src/test/java/org/springframework/data/couchbase/transactions/PersonServiceReactive.java +++ b/src/test/java/org/springframework/data/couchbase/transactions/PersonServiceReactive.java @@ -21,6 +21,8 @@ import org.springframework.data.couchbase.core.TransactionalSupport; import org.springframework.data.couchbase.domain.PersonWithoutVersion; +import org.springframework.data.couchbase.transaction.CouchbaseResourceHolder; +import org.springframework.data.couchbase.util.Util; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -31,6 +33,9 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.reactive.TransactionalOperator; +import java.lang.reflect.InvocationTargetException; +import java.util.Optional; + /** * reactive PersonService for tests * @@ -57,8 +62,15 @@ public Mono savePersonErrors(Person person) { . flatMap(it -> Mono.error(new SimulateFailureException())); } + @Transactional + public Person savePersonBlocking(Person person) { + System.err.println("savePerson: "+Thread.currentThread().getName() +" "+ System.identityHashCode(Util.getSecurityContext())); + return personOperations.insertById(Person.class).one(person); + } + @Transactional public Mono savePerson(Person person) { + System.err.println("savePerson: "+Thread.currentThread().getName() +" "+ System.identityHashCode(Util.getSecurityContext())); return TransactionalSupport.checkForTransactionInThreadLocalStorage().map(stat -> { assertTrue(stat.isPresent(), "Not in transaction"); System.err.println("In a transaction!!"); diff --git a/src/test/java/org/springframework/data/couchbase/util/Util.java b/src/test/java/org/springframework/data/couchbase/util/Util.java index dc7aa106b..1e2a80dc2 100644 --- a/src/test/java/org/springframework/data/couchbase/util/Util.java +++ b/src/test/java/org/springframework/data/couchbase/util/Util.java @@ -20,6 +20,7 @@ import static org.awaitility.Awaitility.with; import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.util.Arrays; import java.util.LinkedList; @@ -170,4 +171,26 @@ public static void assertInAnnotationTransaction(boolean inTransaction) { + " but expected in-annotation-transaction = " + inTransaction); } + static public Object getSecurityContext(){ + Object sc = null; + try { + Class securityContextHolderClass = Class + .forName("org.springframework.security.core.context.SecurityContextHolder"); + sc = securityContextHolderClass.getMethod("getContext").invoke(null); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException cnfe) {} + System.err.println(Thread.currentThread().getName() +" Util.get "+ System.identityHashCode(sc)); + return sc; + } + + static public void setSecurityContext(Object sc) { + System.err.println(Thread.currentThread().getName() +" Util.set "+ System.identityHashCode(sc)); + try { + Class securityContextHolderClass = Class + .forName("org.springframework.security.core.context.SecurityContextHolder"); + sc = securityContextHolderClass.getMethod("setContext", new Class[]{securityContextHolderClass}).invoke(sc); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException cnfe) {} + } + }