Skip to content

Commit

Permalink
Optimize pool warmup (#171)
Browse files Browse the repository at this point in the history
- Added a new signature for sizeBetween method, which allows to control the concurrency level used when the allocator is subscribed to during the warmup phase, if any. 
- drainLoop is now called for each pre-allocated resource during warmup phase.
  • Loading branch information
pderop authored Jul 9, 2023
1 parent 574c24b commit fc78b83
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,16 +74,28 @@ static final class SizeBasedAllocationStrategy implements AllocationStrategy {

final int min;
final int max;
final int warmupParallelism;

volatile int permits;
static final AtomicIntegerFieldUpdater<SizeBasedAllocationStrategy> PERMITS = AtomicIntegerFieldUpdater.newUpdater(SizeBasedAllocationStrategy.class, "permits");

SizeBasedAllocationStrategy(int min, int max) {
this(min, max, PoolBuilder.DEFAULT_WARMUP_PARALLELISM);
}

SizeBasedAllocationStrategy(int min, int max, int warmupParallelism) {
if (min < 0) throw new IllegalArgumentException("min must be positive or zero");
if (max < 1) throw new IllegalArgumentException("max must be strictly positive");
if (min > max) throw new IllegalArgumentException("min must be less than or equal to max");
if (min > 0 && warmupParallelism < 1) {
throw new IllegalArgumentException("warmupParallelism must be greater than 0");
}
if (min > 0 && warmupParallelism > min) {
throw new IllegalArgumentException("warmupParallelism must be less than or equal to min");
}
this.min = min;
this.max = max;
this.warmupParallelism = warmupParallelism;
PERMITS.lazySet(this, this.max);
}

Expand Down Expand Up @@ -146,5 +158,10 @@ public void returnPermits(int returned) {
}
}
}

@Override
public int warmupParallelism() {
return warmupParallelism;
}
}
}
20 changes: 19 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/AllocationStrategy.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,4 +75,22 @@ public interface AllocationStrategy {
* is not consistent with the strategy's limits and delivered permits.
*/
void returnPermits(int returned);

/**
* Return the concurrency level used when the allocator is subscribed to during the warmup phase, if any.
* <p>
* The number of resources created concurrently will not exceed the value returned by {@code warmupParallelism()}.
* If the concurrency level is set to 1, pre-allocation of resources will be performed sequentially by subscribing to the allocator
* one at a time. The process waits for a resource to be created before subscribing again to the allocator.
* This sequence continues until all pre-allocated resources have been successfully created.
* <p>
* Defaults to 1
*
* @return The concurrency level used when the allocator is subscribed to during the warmup phase, must be positive,
* {@code 1} by default
* @since 1.0.1
*/
default int warmupParallelism() {
return PoolBuilder.DEFAULT_WARMUP_PARALLELISM;
}
}
35 changes: 32 additions & 3 deletions reactor-pool/src/main/java/reactor/pool/PoolBuilder.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -349,6 +349,10 @@ public PoolBuilder<T, CONF> releaseHandler(Function<T, ? extends Publisher<Void>
* {@code min} live resources before serving the acquire with (one of) the newly created resource(s).
* At the same time it MUST NOT allocate any resource if that would bring the number of live resources
* over the {@code max}, rejecting further allocations until some resources have been {@link PooledRef#release() released}.
* <p>
* Pre-allocation of warmed-up resources, if any, will be performed sequentially by subscribing to the allocator
* one at a time. The process waits for a resource to be created before subscribing again to the allocator.
* This sequence continues until all pre-allocated resources have been successfully created.
*
* @param min the minimum number of live resources to keep in the pool (can be best effort)
* @param max the maximum number of live resources to keep in the pool. use {@link Integer#MAX_VALUE} when you only need a
Expand All @@ -358,7 +362,32 @@ public PoolBuilder<T, CONF> releaseHandler(Function<T, ? extends Publisher<Void>
* @see #allocationStrategy(AllocationStrategy)
*/
public PoolBuilder<T, CONF> sizeBetween(int min, int max) {
return allocationStrategy(new AllocationStrategies.SizeBasedAllocationStrategy(min, max));
return sizeBetween(min, max, DEFAULT_WARMUP_PARALLELISM);
}

/**
* Replace the {@link AllocationStrategy} with one that lets the {@link Pool} allocate between {@code min} and {@code max} resources.
* When acquiring and there is no available resource, the pool should strive to warm up enough resources to reach
* {@code min} live resources before serving the acquire with (one of) the newly created resource(s).
* At the same time it MUST NOT allocate any resource if that would bring the number of live resources
* over the {@code max}, rejecting further allocations until some resources have been {@link PooledRef#release() released}.
*
* @param min the minimum number of live resources to keep in the pool (can be best effort)
* @param max the maximum number of live resources to keep in the pool. use {@link Integer#MAX_VALUE} when you only need a
* minimum and no upper bound
* @param warmupParallelism Specifies the concurrency level used when the allocator is subscribed to during the warmup phase, if any.
* During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code warmupParallelism} resources are
* subscribed to at the same time.
* A {@code warmupParallelism} of 1 means that pre-allocation of resources is achieved by sequentially subscribing to the allocator,
* waiting for a resource to be created before subscribing a next time to the allocator, and so on until the last allocation
* completes.
* @return this {@link Pool} builder
* @see #sizeUnbounded()
* @see #allocationStrategy(AllocationStrategy)
* @since 1.0.1
*/
public PoolBuilder<T, CONF> sizeBetween(int min, int max, int warmupParallelism) {
return allocationStrategy(new AllocationStrategies.SizeBasedAllocationStrategy(min, max, warmupParallelism));
}

/**
Expand Down Expand Up @@ -501,5 +530,5 @@ static <T> BiPredicate<T, PooledRefMetadata> idlePredicate(Duration maxIdleTime)
static final Function<?, Mono<Void>> NOOP_HANDLER = it -> Mono.empty();
static final BiPredicate<?, ?> NEVER_PREDICATE = (ignored1, ignored2) -> false;
static final BiFunction<Runnable, Duration, Disposable> DEFAULT_PENDING_ACQUIRE_TIMER = (r, d) -> Schedulers.parallel().schedule(r, d.toNanos(), TimeUnit.NANOSECONDS);

static final int DEFAULT_WARMUP_PARALLELISM = 1;
}
24 changes: 15 additions & 9 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiPredicate;
import java.util.function.Function;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -249,7 +250,10 @@ public Mono<Integer> warmup() {
.returnPermits(1);
});
}
return Flux.concat(allWarmups)
// merge will eagerly subscribe to all warmups from the current thread, but
// the parallelism can be controlled from configuration.
int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), allWarmups.length);
return Flux.merge(Flux.fromArray(allWarmups), mergeConcurrency)
.reduce(0, (count, p) -> count + 1);
});
}
Expand Down Expand Up @@ -442,13 +446,15 @@ else if (sig.isOnError()) {
logger.debug("should warm up {} extra resources", toWarmup);

final long startWarmupIteration = clock.millis();
Flux<Void> warmupFlux = Flux.range(1, toWarmup)
//individual warmup failures decrement the permit and are logged
.flatMap(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator));

primary.onErrorResume(e -> Mono.empty())
.thenMany(warmupFlux)
.subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain);
// flatMap will eagerly subscribe to the allocator from the current thread, but the concurrency
// can be controlled from configuration
final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1);
Flux.range(1, toWarmup)
.map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain()))
.startWith(primary.doOnSuccess(__ -> drain()).then())
.flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified
.onErrorResume(e -> Mono.empty())
.subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -250,4 +250,9 @@ public int permitMinimum() {
public int permitMaximum() {
return delegate.permitMaximum();
}

@Override
public int warmupParallelism() {
return delegate.warmupParallelism();
}
}
43 changes: 42 additions & 1 deletion reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1413,6 +1413,47 @@ void recordsAllocationLatenciesInWarmup(PoolStyle configAdjuster) {
assertThat(minError).as("allocation error latency").isGreaterThanOrEqualTo(200L);
}

@ParameterizedTestWithName
@MethodSource("allPools")
@Tag("metrics")
void recordsAllocationLatenciesInEagerWarmup(PoolStyle configAdjuster) {
AtomicBoolean flip = new AtomicBoolean();
//note the starter method here is irrelevant, only the config is created and passed to createPool
PoolBuilder<String, ?> builder = PoolBuilder
.from(Mono.defer(() -> {
if (flip.compareAndSet(false, true)) {
return Mono.just("foo").delayElement(Duration.ofMillis(100));
}
else {
flip.compareAndSet(true, false);
return Mono.delay(Duration.ofMillis(200)).then(Mono.error(new IllegalStateException("boom")));
}
}))
.sizeBetween(10, Integer.MAX_VALUE, 10)
.metricsRecorder(recorder)
.clock(recorder.getClock());
AbstractPool<String> pool = configAdjuster.apply(builder);

// warmup will eagerly subscribe 10 times to the allocator.
// The five first subscribtions will success (after around 100 millis), and some allocation should fail after around
// 200 millis.
assertThatIllegalStateException()
.isThrownBy(() -> pool.warmup().block());

// at least 5 allocation should be successful
assertThat(recorder.getAllocationSuccessCount()).isEqualTo(5);
// at least 1 allocation should have failed
assertThat(recorder.getAllocationErrorCount()).isGreaterThanOrEqualTo(1);
// at least 6 allocations should have taken place
assertThat(recorder.getAllocationTotalCount()).isGreaterThanOrEqualTo(6);

long minSuccess = recorder.getAllocationSuccessHistogram().getMinValue();
long minError = recorder.getAllocationErrorHistogram().getMinValue();

assertThat(minSuccess).as("allocation success latency").isGreaterThanOrEqualTo(100L);
assertThat(minError).as("allocation error latency").isGreaterThanOrEqualTo(200L);
}

@ParameterizedTestWithName
@MethodSource("allPools")
@Tag("metrics")
Expand Down
Loading

0 comments on commit fc78b83

Please sign in to comment.