Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MonoSend MAX_SIZE and REFILL_SIZE are not configurable #3255

Closed
VadimKirilchuk opened this issue May 21, 2024 · 6 comments · Fixed by #3276
Closed

MonoSend MAX_SIZE and REFILL_SIZE are not configurable #3255

VadimKirilchuk opened this issue May 21, 2024 · 6 comments · Fixed by #3276
Assignees
Labels
type/enhancement A general enhancement
Milestone

Comments

@VadimKirilchuk
Copy link
Contributor

VadimKirilchuk commented May 21, 2024

While working on S3 file download server (can't use pre-signed URLs) it was found that S3 AWS CRT client is using big part size(8MB) of the transferred chunks of data by default. AWS SDK issue.

It seems that reactor-netty by default requests MonoSend.MAX_SIZE and MonoSend.REFILL_SIZE in the MonoSendMany when processing Flux<ByteBuffer> for streaming body response.

As a result, reactor requests 128*8MB=1024MB from the S3 Client. If the clients are slow enough, and multiple files are downloaded in parallel, this causes OutOfDirectMemory due to S3 client pushing 1024Mb for each file into netty buffers.

There are two issues here:

  1. One is on the AWS S3 SDK side as CRT client doesn't support rateLimit on the Flux.
  2. Another is reactor-netty having hardcoded sizes for initial and refill requests.

We fixed the issue by reducing default size of the part/chunk size in S3, however, this causes a chatty traffic and additional requests between application and S3. It would be better, if we can control initial request/refill on the reactor-netty side.

Basic code snippet:

@GetMapping(path="/{filekey}")
Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {    
    GetObjectRequest request = GetObjectRequest.builder()
      .bucket(s3config.getBucket())
      .key(filekey)
      .build();
    
    return Mono.fromFuture(s3client.getObject(request, AsyncResponseTransformer.toPublisher()))
      .map(response -> {
        checkResult(response.response());
        String filename = getMetadataItem(response.response(),"filename",filekey);            
        return ResponseEntity.ok()
          .header(HttpHeaders.CONTENT_TYPE, response.response().contentType())
          .header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.response().contentLength()))
          .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
          .body(Flux.from(response));
      });
}

Motivation

Motivation is to preserve big chunk size on the S3 client side, but keep a better backpressure between reactor and S3 Client. Default request of 128 makes sense for S3 NettyNioAsyncHttpClient which uses 8KB chunks by default, but for CRT client which uses 8MB chunks it is too much.

Desired solution

MonoSend MAX_SIZE and REFILL_SIZE should be made configurable and not hardcoded.

Considered alternatives

I don't see any other alternatives, but maybe you can suggest something else.

Additional context

AWS Java SDK version used

2.25.10

CRT Client Version

0.29.14

Reactor netty and netty version

reactor-netty 1.1.3
netty 4.1.108.Final

JDK version used

openjdk version "17.0.3" 2022-04-19
oracle jdk 17

Operating System and version

windows 11 23H2 and some linux image in EKS

@VadimKirilchuk VadimKirilchuk added status/need-triage A new issue that still need to be evaluated as a whole type/enhancement A general enhancement labels May 21, 2024
@violetagg violetagg removed the status/need-triage A new issue that still need to be evaluated as a whole label May 23, 2024
@violetagg
Copy link
Member

@VadimKirilchuk Are you interested in providing a PR? Which Reactor Netty version should enable this feature (1.1.x or 1.2.x)?

@VadimKirilchuk
Copy link
Contributor Author

@violetagg I am interested, but will need some guidance.
I haven't checked if the issue is present in 1.2.x, but I would think it's all the same, so can be done in both branches.

@violetagg
Copy link
Member

@violetagg I am interested, but will need some guidance.

sure

I haven't checked if the issue is present in 1.2.x, but I would think it's all the same, so can be done in both branches.

then please base the PR on 1.1.x branch

@VadimKirilchuk
Copy link
Contributor Author

@violetagg Ok, so here is what I was able to find so far.
I don't see any easy way to make it configurable per endpoint, i.e. let's say I have multiple endpoints in my application and not all of them are using S3 Client with large chunks, so ideally user should be able to configure default backpressure per endpoint.

However, as I said, I don't believe it is easy to achieve in the current configuration.
I keep digging deeper to understand where request(n) really happens.

reactor.netty.channel.MonoSend is an abstract class with a single impl: reactor.netty.channel.MonoSendMany.

It declares the following constants which are not used in the abstract MonoSend class:
reactor.netty.channel.MonoSend#MAX_SIZE
reactor.netty.channel.MonoSend#REFILL_SIZE

In fact they are not directly used in the MonoSendMany class itself, but rather in the reactor.netty.channel.MonoSendMany.SendManyInner#SendManyInner.

The only place creating SendManyInner is the subscribe method reactor.netty.channel.MonoSendMany#subscribe

@Override
public void subscribe(CoreSubscriber<? super Void> destination) {
        source.subscribe(new SendManyInner<>(this, destination));
}

It uses MAX_SIZE as a constant:

	SendManyInner(MonoSendMany<I, O> parent, CoreSubscriber<? super Void> actual) {
		this.parent = parent;
		this.actual = actual;
		this.actualContext = actual.currentContext();
		this.requested = MAX_SIZE;
		this.ctx = parent.ctx;
		this.eventLoop = ctx.channel().eventLoop();

		this.asyncFlush = new AsyncFlush();

		//TODO should also cleanup on complete operation 
                   (ChannelOperation.OnTerminate) ?
		ctx.channel()
		   .closeFuture()
		   .addListener(this);
	}

Recalculation of requested vs next request seems to be done in the trySuccess here: reactor.netty.channel.MonoSendMany.SendManyInner#trySuccess(java.lang.Void)

	public boolean trySuccess(Void result) {
		requested--;
		pending--;

		if (checkTerminated()) {
			if (sourceMode == SYNC && requested <= REFILL_SIZE) {
				int u = MAX_SIZE - requested;
				requested += u;
				nextRequest += u;
			}
			trySchedule();
			return true;
		}

		if (requested <= REFILL_SIZE) {
			int u = MAX_SIZE - requested;
			requested += u;
			nextRequest += u;
			trySchedule();
		}
		return true;
	}

I will need more time to understand if this should be as simple as replacing constants by passing system properties like:

Integer.parseInt(System.getProperty("reactor.netty.channel.send.many.max_size"))
Integer.parseInt(System.getProperty("reactor.netty.channel.send.many.refill_size"))

or similar, please advice:

  • What should be the property names? Any best-practice/pattern?
  • System.getProperties is what should be used, right?

@violetagg
Copy link
Member

violetagg commented May 28, 2024

@VadimKirilchuk I haven't thought about possible implementation, but if we use system properties this will be applied to all client requests/server responses i.e. it won't be possible to specify it per request/respose or per client/server (imagine that you have server and some of the routes do remote calls using the client). May be we can add some configuration on HttpClientRequest/HttpServerResponse or NettyOutbound.

@VadimKirilchuk
Copy link
Contributor Author

@violetagg I prepared a draft version of the PR.

I see that most of other properties are defined in ReactorNetty class, but I am not sure if I should move the property name there, please advice.

Are you OK with the property name reactor.netty.channel.send.prefetch.maxSize?

I haven't provided any additional unit tests as it doesn't seem that your unit tests interact with System properties.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants