Skip to content

Commit

Permalink
Adding ActionRequestLazyBuilder implementation of RequestBuilder (#10…
Browse files Browse the repository at this point in the history
…4927)

This introduces a second implementation of RequestBuilder (#104778). As opposed
to ActionRequestBuilder, ActionRequestLazyBuilder does not create its request
until the request() method is called, and does not hold onto that request (so each
call to request() gets a new request instance).
This PR also updates BulkRequestBuilder to inherit from ActionRequestLazyBuilder
as an example of its use.
  • Loading branch information
masseyke authored Jan 30, 2024
1 parent a4ddd32 commit 66a930b
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 18 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/104927.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104927
summary: Adding `ActionRequestLazyBuilder` implementation of `RequestBuilder`
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action;

import org.elasticsearch.client.internal.ElasticsearchClient;
import org.elasticsearch.core.TimeValue;

import java.util.Objects;

/**
* This class is similar to ActionRequestBuilder, except that it does not build the request until the request() method is called.
* @param <Request>
* @param <Response>
*/
public abstract class ActionRequestLazyBuilder<Request extends ActionRequest, Response extends ActionResponse>
implements
RequestBuilder<Request, Response> {

protected final ActionType<Response> action;
protected final ElasticsearchClient client;

protected ActionRequestLazyBuilder(ElasticsearchClient client, ActionType<Response> action) {
Objects.requireNonNull(action, "action must not be null");
this.action = action;
this.client = client;
}

/**
* This method creates the request. The caller of this method is responsible for calling Request#decRef.
* @return A newly-built Request, fully initialized by this builder.
*/
public abstract Request request();

public ActionFuture<Response> execute() {
return client.execute(action, request());
}

/**
* Short version of execute().actionGet().
*/
public Response get() {
return execute().actionGet();
}

/**
* Short version of execute().actionGet().
*/
public Response get(TimeValue timeout) {
return execute().actionGet(timeout);
}

public void execute(ActionListener<Response> listener) {
client.execute(action, request(), listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestLazyBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RequestBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
Expand All @@ -23,26 +28,50 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A bulk request holds an ordered {@link IndexRequest}s and {@link DeleteRequest}s and allows to executes
* it in a single batch.
*/
public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkResponse> implements WriteRequestBuilder<BulkRequestBuilder> {
public class BulkRequestBuilder extends ActionRequestLazyBuilder<BulkRequest, BulkResponse>
implements
WriteRequestBuilder<BulkRequestBuilder> {
private final String globalIndex;
/*
* The following 3 variables hold the list of requests that make up this bulk. Only one can be non-empty. That is, users can't add
* some IndexRequests and some IndexRequestBuilders. They need to pick one (preferably builders) and stick with it.
*/
private final List<DocWriteRequest<?>> requests = new ArrayList<>();
private final List<FramedData> framedData = new ArrayList<>();
private final List<RequestBuilder<? extends ActionRequest, ? extends ActionResponse>> requestBuilders = new ArrayList<>();
private ActiveShardCount waitForActiveShards;
private TimeValue timeout;
private String timeoutString;
private String globalPipeline;
private String globalRouting;
private WriteRequest.RefreshPolicy refreshPolicy;
private String refreshPolicyString;

public BulkRequestBuilder(ElasticsearchClient client, @Nullable String globalIndex) {
super(client, BulkAction.INSTANCE, new BulkRequest(globalIndex));
super(client, BulkAction.INSTANCE);
this.globalIndex = globalIndex;
}

public BulkRequestBuilder(ElasticsearchClient client) {
super(client, BulkAction.INSTANCE, new BulkRequest());
this(client, null);
}

/**
* Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
* (for example, if no id is provided, one will be generated, or usage of the create flag).
* @deprecated use {@link #add(IndexRequestBuilder)} instead
*/
@Deprecated
public BulkRequestBuilder add(IndexRequest request) {
super.request.add(request);
requests.add(request);
return this;
}

Expand All @@ -51,47 +80,51 @@ public BulkRequestBuilder add(IndexRequest request) {
* (for example, if no id is provided, one will be generated, or usage of the create flag).
*/
public BulkRequestBuilder add(IndexRequestBuilder request) {
super.request.add(request.request());
requestBuilders.add(request);
return this;
}

/**
* Adds an {@link DeleteRequest} to the list of actions to execute.
* @deprecated use {@link #add(DeleteRequestBuilder)} instead
*/
@Deprecated
public BulkRequestBuilder add(DeleteRequest request) {
super.request.add(request);
requests.add(request);
return this;
}

/**
* Adds an {@link DeleteRequest} to the list of actions to execute.
*/
public BulkRequestBuilder add(DeleteRequestBuilder request) {
super.request.add(request.request());
requestBuilders.add(request);
return this;
}

/**
* Adds an {@link UpdateRequest} to the list of actions to execute.
* @deprecated use {@link #add(UpdateRequestBuilder)} instead
*/
@Deprecated
public BulkRequestBuilder add(UpdateRequest request) {
super.request.add(request);
requests.add(request);
return this;
}

/**
* Adds an {@link UpdateRequest} to the list of actions to execute.
*/
public BulkRequestBuilder add(UpdateRequestBuilder request) {
super.request.add(request.request());
requestBuilders.add(request);
return this;
}

/**
* Adds a framed data in binary format
*/
public BulkRequestBuilder add(byte[] data, int from, int length, XContentType xContentType) throws Exception {
request.add(data, from, length, null, xContentType);
framedData.add(new FramedData(data, from, length, null, xContentType));
return this;
}

Expand All @@ -100,7 +133,7 @@ public BulkRequestBuilder add(byte[] data, int from, int length, XContentType xC
*/
public BulkRequestBuilder add(byte[] data, int from, int length, @Nullable String defaultIndex, XContentType xContentType)
throws Exception {
request.add(data, from, length, defaultIndex, xContentType);
framedData.add(new FramedData(data, from, length, defaultIndex, xContentType));
return this;
}

Expand All @@ -109,7 +142,7 @@ public BulkRequestBuilder add(byte[] data, int from, int length, @Nullable Strin
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public BulkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
this.waitForActiveShards = waitForActiveShards;
return this;
}

Expand All @@ -126,32 +159,112 @@ public BulkRequestBuilder setWaitForActiveShards(final int waitForActiveShards)
* A timeout to wait if the index operation can't be performed immediately. Defaults to {@code 1m}.
*/
public final BulkRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
this.timeout = timeout;
return this;
}

/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to {@code 1m}.
*/
public final BulkRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
this.timeoutString = timeout;
return this;
}

/**
* The number of actions currently in the bulk.
*/
public int numberOfActions() {
return request.numberOfActions();
return requests.size() + requestBuilders.size() + framedData.size();
}

public BulkRequestBuilder pipeline(String globalPipeline) {
request.pipeline(globalPipeline);
this.globalPipeline = globalPipeline;
return this;
}

public BulkRequestBuilder routing(String globalRouting) {
request.routing(globalRouting);
this.globalRouting = globalRouting;
return this;
}

@Override
public BulkRequestBuilder setRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}

@Override
public BulkRequestBuilder setRefreshPolicy(String refreshPolicy) {
this.refreshPolicyString = refreshPolicy;
return this;
}

@Override
public BulkRequest request() {
validate();
BulkRequest request = new BulkRequest(globalIndex);
for (RequestBuilder<? extends ActionRequest, ? extends ActionResponse> requestBuilder : requestBuilders) {
ActionRequest childRequest = requestBuilder.request();
request.add((DocWriteRequest<?>) childRequest);
}
for (DocWriteRequest<?> childRequest : requests) {
request.add(childRequest);
}
for (FramedData framedData : framedData) {
try {
request.add(framedData.data, framedData.from, framedData.length, framedData.defaultIndex, framedData.xContentType);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if (waitForActiveShards != null) {
request.waitForActiveShards(waitForActiveShards);
}
if (timeout != null) {
request.timeout(timeout);
}
if (timeoutString != null) {
request.timeout(timeoutString);
}
if (globalPipeline != null) {
request.pipeline(globalPipeline);
}
if (globalRouting != null) {
request.routing(globalRouting);
}
if (refreshPolicy != null) {
request.setRefreshPolicy(refreshPolicy);
}
if (refreshPolicyString != null) {
request.setRefreshPolicy(refreshPolicyString);
}
return request;
}

private void validate() {
if (countNonEmptyLists(requestBuilders, requests, framedData) > 1) {
throw new IllegalStateException(
"Must use only request builders, requests, or byte arrays within a single bulk request. Cannot mix and match"
);
}
if (timeout != null && timeoutString != null) {
throw new IllegalStateException("Must use only one setTimeout method");
}
if (refreshPolicy != null && refreshPolicyString != null) {
throw new IllegalStateException("Must use only one setRefreshPolicy method");
}
}

private int countNonEmptyLists(List<?>... lists) {
int sum = 0;
for (List<?> list : lists) {
if (list.isEmpty() == false) {
sum++;
}
}
return sum;
}

private record FramedData(byte[] data, int from, int length, @Nullable String defaultIndex, XContentType xContentType) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;

public class BulkRequestBuilderTests extends ESTestCase {

public void testValidation() {
BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(null, null);
bulkRequestBuilder.add(new IndexRequestBuilder(null, randomAlphaOfLength(10)));
bulkRequestBuilder.add(new IndexRequest());
expectThrows(IllegalStateException.class, bulkRequestBuilder::request);

bulkRequestBuilder = new BulkRequestBuilder(null, null);
bulkRequestBuilder.add(new IndexRequestBuilder(null, randomAlphaOfLength(10)));
bulkRequestBuilder.setTimeout(randomTimeValue());
bulkRequestBuilder.setTimeout(TimeValue.timeValueSeconds(randomIntBetween(1, 30)));
expectThrows(IllegalStateException.class, bulkRequestBuilder::request);

bulkRequestBuilder = new BulkRequestBuilder(null, null);
bulkRequestBuilder.add(new IndexRequestBuilder(null, randomAlphaOfLength(10)));
bulkRequestBuilder.setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values()).getValue());
bulkRequestBuilder.setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values()));
expectThrows(IllegalStateException.class, bulkRequestBuilder::request);
}
}

0 comments on commit 66a930b

Please sign in to comment.