Skip to content

Commit

Permalink
Propagate SecurityContext into @transactional methods. (#1979)
Browse files Browse the repository at this point in the history
Closes #1944.
  • Loading branch information
mikereiche committed Oct 10, 2024
1 parent 8360fe3 commit d75b4b6
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -89,6 +90,7 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
@Stability.Internal
<T> Flux<T> executeReactive(TransactionDefinition definition,
org.springframework.transaction.reactive.TransactionCallback<T> callback) {
final CouchbaseResourceHolder couchbaseResourceHolder = new CouchbaseResourceHolder(null, getSecurityContext()); // caller's resources
return TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMapMany(isInTransaction -> {
boolean isInExistingTransaction = isInTransaction.isPresent();
boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction);
Expand All @@ -100,17 +102,20 @@ <T> Flux<T> executeReactive(TransactionDefinition definition,
} else {
return Mono.error(new UnsupportedOperationException("Unsupported operation"));
}
});
}).contextWrite( // set CouchbaseResourceHolder containing caller's SecurityContext
ctx -> ctx.put(CouchbaseResourceHolder.class, couchbaseResourceHolder));
}

private <T> T executeNewTransaction(TransactionCallback<T> callback) {
final AtomicReference<T> execResult = new AtomicReference<>();
final CouchbaseResourceHolder couchbaseResourceHolder = new CouchbaseResourceHolder(null, getSecurityContext());

// Each of these transactions will block one thread on the underlying SDK's transactions scheduler. This
// scheduler is effectively unlimited, but this can still potentially lead to high thread usage by the application.
// If this is an issue then users need to instead use the standard Couchbase reactive transactions SDK.
try {
TransactionResult ignored = couchbaseClientFactory.getCluster().transactions().run(ctx -> {
setSecurityContext(couchbaseResourceHolder.getSecurityContext()); // set the security context for the transaction
CouchbaseTransactionStatus status = new CouchbaseTransactionStatus(ctx, true, false, false, true, null);

T res = callback.doInTransaction(status);
Expand Down Expand Up @@ -173,12 +178,16 @@ public boolean isCompleted() {
}
};

return Flux.from(callback.doInTransaction(status)).doOnNext(v -> out.add(v)).then(Mono.defer(() -> {
if (status.isRollbackOnly()) {
return Mono.error(new TransactionRollbackRequestedException("TransactionStatus.isRollbackOnly() is set"));
}
return Mono.empty();
}));
// Get caller's resources, set SecurityContext for the transaction
return CouchbaseResourceOwner.get().map(cbrh -> setSecurityContext(cbrh.get().getSecurityContext()))
.flatMap(ignore -> Flux.from(callback.doInTransaction(status)).doOnNext(v -> out.add(v))
.then(Mono.defer(() -> {
if (status.isRollbackOnly()) {
return Mono.error(new TransactionRollbackRequestedException(
"TransactionStatus.isRollbackOnly() is set"));
}
return Mono.empty();
})));
});

}, this.options).thenMany(Flux.defer(() -> Flux.fromIterable(out))).onErrorMap(ex -> {
Expand Down Expand Up @@ -288,4 +297,26 @@ public void rollback(TransactionStatus ignored) throws TransactionException {
throw new UnsupportedOperationException(
"Direct programmatic use of the Couchbase PlatformTransactionManager is not supported");
}

static private Object getSecurityContext() {
try {
Class<?> securityContextHolderClass = Class
.forName("org.springframework.security.core.context.SecurityContextHolder");
return securityContextHolderClass.getMethod("getContext").invoke(null);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException cnfe) {}
return null;
}

static private <S> S setSecurityContext(S sc) {
try {
Class<?> securityContextHolder = Class.forName("org.springframework.security.core.context.SecurityContext");
Class<?> securityContextHolderClass = Class
.forName("org.springframework.security.core.context.SecurityContextHolder");
securityContextHolderClass.getMethod("setContext", new Class[] { securityContextHolder }).invoke(null, sc);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException cnfe) {}
return sc;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public class CouchbaseResourceHolder extends ResourceHolderSupport {

private @Nullable CoreTransactionAttemptContext core; // which holds the atr
private @Nullable Object securityContext; // SecurityContext. We don't have the class.

Map<Integer, Object> getResultMap = new HashMap<>();

/**
Expand All @@ -42,7 +44,17 @@ public class CouchbaseResourceHolder extends ResourceHolderSupport {
* @param core the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}.
*/
public CouchbaseResourceHolder(@Nullable CoreTransactionAttemptContext core) {
this(core, null);
}

/**
* Create a new {@link CouchbaseResourceHolder} for a given {@link CoreTransactionAttemptContext session}.
*
* @param core the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}.
*/
public CouchbaseResourceHolder(@Nullable CoreTransactionAttemptContext core, @Nullable Object securityContext) {
this.core = core;
this.securityContext = securityContext;
}

/**
Expand All @@ -53,6 +65,14 @@ public CoreTransactionAttemptContext getCore() {
return core;
}

/**
* @return the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}.
*/
@Nullable
public Object getSecurityContext() {
return securityContext;
}

public Object transactionResultHolder(Object holder, Object o) {
getResultMap.put(System.identityHashCode(o), holder);
return holder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.springframework.data.couchbase.transaction;

import reactor.core.publisher.Mono;

import java.util.Optional;

import com.couchbase.client.core.annotation.Stability.Internal;

@Internal
public class CouchbaseResourceOwner {
private static final ThreadLocal<CouchbaseResourceHolder> marker = new ThreadLocal();

public CouchbaseResourceOwner() {}

public static void set(CouchbaseResourceHolder toInject) {
if (marker.get() != null) {
throw new IllegalStateException(
"Trying to set resource holder when already inside a transaction - likely an internal bug, please report it");
} else {
marker.set(toInject);
}
}

public static void clear() {
marker.remove();
}

public static Mono<Optional<CouchbaseResourceHolder>> get() {
return Mono.deferContextual((ctx) -> {
CouchbaseResourceHolder fromThreadLocal = marker.get();
CouchbaseResourceHolder fromReactive = ctx.hasKey(CouchbaseResourceHolder.class)
? ctx.get(CouchbaseResourceHolder.class)
: null;
if (fromThreadLocal != null) {
return Mono.just(Optional.of(fromThreadLocal));
} else {
return fromReactive != null ? Mono.just(Optional.of(fromReactive)) : Mono.just(Optional.empty());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.couchbase.client.java.query.QueryScanConsistency.REQUEST_PLUS;

import org.springframework.data.couchbase.util.Util;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -119,6 +120,7 @@ public void shouldRollbackAfterExceptionOfTxAnnotatedMethod() {
@Test
public void commitShouldPersistTxEntries() {

System.err.println("parent SecurityContext: " + System.identityHashCode(Util.getSecurityContext()));
personService.savePerson(WalterWhite) //
.as(StepVerifier::create) //
.expectNextCount(1) //
Expand All @@ -130,6 +132,17 @@ public void commitShouldPersistTxEntries() {
.verifyComplete();
}

@Test
public void commitShouldPersistTxEntriesBlocking() {
System.err.println("parent SecurityContext: " + System.identityHashCode(Util.getSecurityContext()));
Person p = personService.savePersonBlocking(WalterWhite);

operations.findByQuery(Person.class).withConsistency(REQUEST_PLUS).count() //
.as(StepVerifier::create) //
.expectNext(1L) //
.verifyComplete();
}

@Test
public void commitShouldPersistTxEntriesOfTxAnnotatedMethod() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.springframework.data.couchbase.core.TransactionalSupport;
import org.springframework.data.couchbase.domain.PersonWithoutVersion;
import org.springframework.data.couchbase.transaction.CouchbaseResourceHolder;
import org.springframework.data.couchbase.util.Util;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -31,6 +33,9 @@
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;

import java.lang.reflect.InvocationTargetException;
import java.util.Optional;

/**
* reactive PersonService for tests
*
Expand All @@ -57,8 +62,15 @@ public Mono<Person> savePersonErrors(Person person) {
.<Person> flatMap(it -> Mono.error(new SimulateFailureException()));
}

@Transactional
public Person savePersonBlocking(Person person) {
System.err.println("savePerson: "+Thread.currentThread().getName() +" "+ System.identityHashCode(Util.getSecurityContext()));
return personOperations.insertById(Person.class).one(person);
}

@Transactional
public Mono<Person> savePerson(Person person) {
System.err.println("savePerson: "+Thread.currentThread().getName() +" "+ System.identityHashCode(Util.getSecurityContext()));
return TransactionalSupport.checkForTransactionInThreadLocalStorage().map(stat -> {
assertTrue(stat.isPresent(), "Not in transaction");
System.err.println("In a transaction!!");
Expand Down
23 changes: 23 additions & 0 deletions src/test/java/org/springframework/data/couchbase/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.awaitility.Awaitility.with;

import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedList;
Expand Down Expand Up @@ -170,4 +171,26 @@ public static void assertInAnnotationTransaction(boolean inTransaction) {
+ " but expected in-annotation-transaction = " + inTransaction);
}

static public Object getSecurityContext(){
Object sc = null;
try {
Class<?> securityContextHolderClass = Class
.forName("org.springframework.security.core.context.SecurityContextHolder");
sc = securityContextHolderClass.getMethod("getContext").invoke(null);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException cnfe) {}
System.err.println(Thread.currentThread().getName() +" Util.get "+ System.identityHashCode(sc));
return sc;
}

static public void setSecurityContext(Object sc) {
System.err.println(Thread.currentThread().getName() +" Util.set "+ System.identityHashCode(sc));
try {
Class<?> securityContextHolderClass = Class
.forName("org.springframework.security.core.context.SecurityContextHolder");
sc = securityContextHolderClass.getMethod("setContext", new Class[]{securityContextHolderClass}).invoke(sc);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException cnfe) {}
}

}

0 comments on commit d75b4b6

Please sign in to comment.