Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Introduce new OpenSearchTransport based on Apache HttpClient 5 #281

Merged
merged 1 commit into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Github workflow for dependabot PRs ([#247](https://github.com/opensearch-project/opensearch-java/pull/247))
- Add javadoc link for the client ([#255](https://github.com/opensearch-project/opensearch-java/pull/255))
- Add 1-click release workflows ([#321](https://github.com/opensearch-project/opensearch-java/pull/321))
- Introduce new OpenSearchTransport based on Apache HttpClient 5 ([#281](https://github.com/opensearch-project/opensearch-java/pull/281))

### Dependencies
- Bumps `classgraph` from 4.8.149 to 4.8.154

Expand Down
14 changes: 14 additions & 0 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ static class IndexData {

## Create a client

There are multiple low level transports which `OpenSearchClient` could be configured with.

### Create a client using `RestClientTransport`

```java
Transport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
OpenSearchClient client = new OpenSearchClient(transport);
Expand All @@ -64,6 +68,16 @@ Transport transport = new RestClientTransport(restClient,
OpenSearchClient client = new OpenSearchClient(transport);
```

### Create a client using `ApacheHttpClient5Transport`

```java
final Transport transport = ApacheHttpClient5TransportBuilder
.builder(hosts)
.mapper(new JacksonJsonpMapper())
.build();
OpenSearchClient client = new OpenSearchClient(transport);
```

## Create an index

```java
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.transport;

public final class TransportHeaders {
public static final String ACCEPT = "Accept";
public static final String USER_AGENT = "User-Agent";

private TransportHeaders() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.transport.httpclient5;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.opensearch.client.transport.TransportOptions;
import org.opensearch.client.transport.Version;

import static org.opensearch.client.transport.TransportHeaders.ACCEPT;
import static org.opensearch.client.transport.TransportHeaders.USER_AGENT;

public class ApacheHttpClient5Options implements TransportOptions {
/**
* Default request options.
*/
public static final ApacheHttpClient5Options DEFAULT = new Builder(
Collections.emptyList(),
HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory.DEFAULT,
null,
null
).build();

private final List<Header> headers;
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
private final WarningsHandler warningsHandler;
private final RequestConfig requestConfig;

private ApacheHttpClient5Options(Builder builder) {
this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers));
this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory;
this.warningsHandler = builder.warningsHandler;
this.requestConfig = builder.requestConfig;
}

public HttpAsyncResponseConsumerFactory getHttpAsyncResponseConsumerFactory() {
return httpAsyncResponseConsumerFactory;
}

public WarningsHandler getWarningsHandler() {
return warningsHandler;
}

public RequestConfig getRequestConfig() {
return requestConfig;
}

@Override
public Collection<Entry<String, String>> headers() {
return headers.stream()
.map(h -> new AbstractMap.SimpleImmutableEntry<>(h.getName(), h.getValue()))
.collect(Collectors.toList());
}

@Override
public Map<String, String> queryParameters() {
return null;
}

@Override
public Function<List<String>, Boolean> onWarnings() {
if (warningsHandler == null) {
return null;
} else {
return warnings -> warningsHandler.warningsShouldFailRequest(warnings);
}
}

@Override
public Builder toBuilder() {
return new Builder(headers, httpAsyncResponseConsumerFactory, warningsHandler, requestConfig);
}

public static class Builder implements TransportOptions.Builder {
private final List<Header> headers;
private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
private WarningsHandler warningsHandler;
private RequestConfig requestConfig;

private Builder(Builder builder) {
this(builder.headers, builder.httpAsyncResponseConsumerFactory,
builder.warningsHandler, builder.requestConfig);
}

private Builder(
List<Header> headers,
HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
WarningsHandler warningsHandler,
RequestConfig requestConfig
) {
this.headers = new ArrayList<>(headers);
this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory;
this.warningsHandler = warningsHandler;
this.requestConfig = requestConfig;
}

/**
* Add the provided header to the request.
*
* @param name the header name
* @param value the header value
* @throws NullPointerException if {@code name} or {@code value} is null.
*/
@Override
public Builder addHeader(String name, String value) {
Objects.requireNonNull(name, "header name cannot be null");
Objects.requireNonNull(value, "header value cannot be null");
this.headers.add(new ReqHeader(name, value));
return this;
}

@Override
public TransportOptions.Builder setParameter(String name, String value) {
return this;
}

/**
* Called if there are warnings to determine if those warnings should fail the request.
*/
@Override
public TransportOptions.Builder onWarnings(Function<List<String>, Boolean> listener) {
if (listener == null) {
setWarningsHandler(null);
} else {
setWarningsHandler(w -> {
if (w != null && !w.isEmpty()) {
return listener.apply(w);
} else {
return false;
}
});
}

return this;
}

/**
* Set the {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link AsyncResponseConsumer} callback per retry. Controls how the
* response body gets streamed from a non-blocking HTTP connection on the
* client side.
*
* @param httpAsyncResponseConsumerFactory factory for creating {@link AsyncResponseConsumer}.
* @throws NullPointerException if {@code httpAsyncResponseConsumerFactory} is null.
*/
public void setHttpAsyncResponseConsumerFactory(HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
this.httpAsyncResponseConsumerFactory = Objects.requireNonNull(
httpAsyncResponseConsumerFactory,
"httpAsyncResponseConsumerFactory cannot be null"
);
}

/**
* How this request should handle warnings. If null (the default) then
* this request will default to the behavior dictacted by
* `setStrictDeprecationMode`.
* <p>
* This can be set to {@link WarningsHandler#PERMISSIVE} if the client
* should ignore all warnings which is the same behavior as setting
* strictDeprecationMode to true. It can be set to
* {@link WarningsHandler#STRICT} if the client should fail if there are
* any warnings which is the same behavior as settings
* strictDeprecationMode to false.
* <p>
* It can also be set to a custom implementation of
* {@linkplain WarningsHandler} to permit only certain warnings or to
* fail the request if the warnings returned don't
* <strong>exactly</strong> match some set.
*
* @param warningsHandler the {@link WarningsHandler} to be used
*/
public void setWarningsHandler(WarningsHandler warningsHandler) {
this.warningsHandler = warningsHandler;
}

/**
* set RequestConfig, which can set socketTimeout, connectTimeout
* and so on by request
* @param requestConfig http client RequestConfig
* @return Builder
*/
public Builder setRequestConfig(RequestConfig requestConfig) {
this.requestConfig = requestConfig;
return this;
}

@Override
public ApacheHttpClient5Options build() {
return new ApacheHttpClient5Options(this);
}
}

static ApacheHttpClient5Options initialOptions() {
String ua = String.format(
Locale.ROOT,
"opensearch-java/%s (Java/%s)",
Version.VERSION == null ? "Unknown" : Version.VERSION.toString(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should never be null

Copy link
Collaborator Author

@reta reta Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dblock you will be surprised but [1]:

    /**
     * This library's version, read from the classpath. Can be {@code null} if the version resource could not be read.
     */
    @Nullable
    public static final Version VERSION;

[1] https://github.com/opensearch-project/opensearch-java/blob/main/java-client/src/main/java/org/opensearch/client/transport/Version.java#L138-L139

System.getProperty("java.version")
);

return new ApacheHttpClient5Options(
DEFAULT.toBuilder()
.addHeader(USER_AGENT, ua)
.addHeader(ACCEPT, ApacheHttpClient5Transport.JsonContentType.toString())
);
}

static ApacheHttpClient5Options of(TransportOptions options) {
if (options instanceof ApacheHttpClient5Options) {
return (ApacheHttpClient5Options)options;

} else {
final Builder builder = new Builder(DEFAULT.toBuilder());
options.headers().forEach(h -> builder.addHeader(h.getKey(), h.getValue()));
options.queryParameters().forEach(builder::setParameter);
builder.onWarnings(options.onWarnings());
return builder.build();
}
}

/**
* Custom implementation of {@link BasicHeader} that overrides equals and
* hashCode so it is easier to test equality of {@link ApacheHttpClient5Options}.
*/
static final class ReqHeader extends BasicHeader {
ReqHeader(String name, String value) {
super(name, value);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof ReqHeader) {
Header otherHeader = (Header) other;
return Objects.equals(getName(), otherHeader.getName()) && Objects.equals(getValue(), otherHeader.getValue());
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(getName(), getValue());
}
}
}
Loading