Skip to content

Commit

Permalink
Merge branch 'main' into main-telemetry-metrics-framework
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <gagandeepjuneja@gmail.com>
  • Loading branch information
Gaganjuneja authored Oct 5, 2023
2 parents c1015cf + 3dd9fc1 commit dafd876
Show file tree
Hide file tree
Showing 134 changed files with 1,713 additions and 808 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855))
- Performance improvement for Datetime field caching ([#4558](https://github.com/opensearch-project/OpenSearch/issues/4558))

### Deprecated

Expand Down Expand Up @@ -115,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump asm from 9.5 to 9.6 ([#10302](https://github.com/opensearch-project/OpenSearch/pull/10302))
- Bump netty from 4.1.97.Final to 4.1.99.Final ([#10303](https://github.com/opensearch-project/OpenSearch/pull/10303))
- Bump `peter-evans/create-pull-request` from 3 to 5 ([#10301](https://github.com/opensearch-project/OpenSearch/pull/10301))
- Bump `org.apache.avro:avro` from 1.11.2 to 1.11.3 ([#10210](https://github.com/opensearch-project/OpenSearch/pull/10210))

### Changed
- Add instrumentation in rest and network layer. ([#9415](https://github.com/opensearch-project/OpenSearch/pull/9415))
Expand All @@ -125,8 +127,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tracing Framework] Add support for SpanKind. ([#10122](https://github.com/opensearch-project/OpenSearch/pull/10122))
- Pass parent filter to inner query in nested query ([#10246](https://github.com/opensearch-project/OpenSearch/pull/10246))
- Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200))
- Add instrumentation in Inbound Handler. ([#100143](https://github.com/opensearch-project/OpenSearch/pull/10143))
- Enable remote segment upload backpressure by default ([#10356](https://github.com/opensearch-project/OpenSearch/pull/10356))
- [Metrics Framework] Add Metrics framework. ([#10241](https://github.com/opensearch-project/OpenSearch/pull/10241))
- [Remote Store] Add support to reload repository metadata inplace ([#9569](https://github.com/opensearch-project/OpenSearch/pull/9569))

### Deprecated

Expand Down
2 changes: 1 addition & 1 deletion buildSrc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ dependencies {
api localGroovy()

api 'commons-codec:commons-codec:1.16.0'
api 'org.apache.commons:commons-compress:1.23.0'
api 'org.apache.commons:commons-compress:1.24.0'
api 'org.apache.ant:ant:1.10.14'
api 'com.netflix.nebula:gradle-extra-configurations-plugin:10.0.0'
api 'com.netflix.nebula:nebula-publishing-plugin:20.3.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class TaskInfo {
private long runningTimeNanos;
private boolean cancellable;
private boolean cancelled;
private Long cancellationStartTime;
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
Expand Down Expand Up @@ -127,6 +128,14 @@ void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
}

public Long getCancellationStartTime() {
return this.cancellationStartTime;
}

public void setCancellationStartTime(Long cancellationStartTime) {
this.cancellationStartTime = cancellationStartTime;
}

public TaskId getParentTaskId() {
return parentTaskId;
}
Expand Down Expand Up @@ -180,6 +189,7 @@ private void noOpParse(Object s) {}
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
parser.declareObject(TaskInfo::setResourceStats, (p, c) -> p.map(), new ParseField("resource_stats"));
parser.declareLong(TaskInfo::setCancellationStartTime, new ParseField("cancellation_time_millis"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
}

Expand All @@ -199,7 +209,8 @@ && isCancelled() == taskInfo.isCancelled()
&& Objects.equals(getParentTaskId(), taskInfo.getParentTaskId())
&& Objects.equals(status, taskInfo.status)
&& Objects.equals(getHeaders(), taskInfo.getHeaders())
&& Objects.equals(getResourceStats(), taskInfo.getResourceStats());
&& Objects.equals(getResourceStats(), taskInfo.getResourceStats())
&& Objects.equals(getCancellationStartTime(), taskInfo.cancellationStartTime);
}

@Override
Expand All @@ -216,7 +227,8 @@ public int hashCode() {
getParentTaskId(),
status,
getHeaders(),
getResourceStats()
getResourceStats(),
getCancellationStartTime()
);
}

Expand Down Expand Up @@ -250,6 +262,8 @@ public String toString() {
+ headers
+ ", resource_stats="
+ resourceStats
+ ", cancellationStartTime="
+ cancellationStartTime
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
for (int i = 0; i < 4; i++) {
boolean cancellable = randomBoolean();
boolean cancelled = cancellable == true ? randomBoolean() : false;
Long cancellationStartTime = null;
if (cancelled) {
cancellationStartTime = randomNonNegativeLong();
}
tasks.add(
new org.opensearch.tasks.TaskInfo(
new TaskId(NODE_ID, (long) i),
Expand All @@ -97,7 +101,8 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
cancelled,
new TaskId("node1", randomLong()),
Collections.singletonMap("x-header-of", "some-value"),
null
null,
cancellationStartTime
)
);
}
Expand Down Expand Up @@ -135,6 +140,7 @@ protected void assertInstances(
assertEquals(ti.isCancelled(), taskInfo.isCancelled());
assertEquals(ti.getParentTaskId().getNodeId(), taskInfo.getParentTaskId().getNodeId());
assertEquals(ti.getParentTaskId().getId(), taskInfo.getParentTaskId().getId());
assertEquals(ti.getCancellationStartTime(), taskInfo.getCancellationStartTime());
FakeTaskStatus status = (FakeTaskStatus) ti.getStatus();
assertEquals(status.code, taskInfo.getStatus().get("code"));
assertEquals(status.status, taskInfo.getStatus().get("status"));
Expand Down
6 changes: 6 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,9 @@ ${path.logs}
# index searcher threadpool.
#
#opensearch.experimental.feature.concurrent_segment_search.enabled: false
#
#
# Gates the optimization of datetime formatters caching along with change in default datetime formatter
# Once there is no observed impact on performance, this feature flag can be removed.
#
#opensearch.experimental.optimization.datetime_formatter_caching.enabled: false
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public URLRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
super(metadata, false, namedXContentRegistry, clusterService, recoverySettings);
super(metadata, namedXContentRegistry, clusterService, recoverySettings);

if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
throw new RepositoryException(metadata.name(), "missing url");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.singletonMap(
NETTY_TRANSPORT_NAME,
Expand All @@ -108,7 +109,8 @@ public Map<String, Supplier<Transport>> getTransports(
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
getSharedGroupFactory(settings)
getSharedGroupFactory(settings),
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Netty4NioSocketChannel;
import org.opensearch.transport.NettyAllocator;
Expand Down Expand Up @@ -131,9 +132,10 @@ public Netty4Transport(
PageCacheRecycler pageCacheRecycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService,
SharedGroupFactory sharedGroupFactory
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer);
Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings));
NettyAllocator.logAllocatorDescriptionIfNeeded();
this.sharedGroupFactory = sharedGroupFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.SharedGroupFactory;
Expand Down Expand Up @@ -86,7 +87,8 @@ public void startThreadPool() {
recycler,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
nettyTransport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -141,7 +142,8 @@ private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
recycler,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
transport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.AbstractSimpleTransportTestCase;
Expand Down Expand Up @@ -82,7 +83,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti
PageCacheRecycler.NON_RECYCLING_INSTANCE,
namedWriteableRegistry,
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ protected MockTransportService createTransportService() {
new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE,
writableRegistry(),
new NoneCircuitBreakerService()
new NoneCircuitBreakerService(),
NoopTracer.INSTANCE
) {
@Override
public TransportAddress[] addressesFromString(String address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ protected MockTransportService createTransportService() {
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService()
new NoneCircuitBreakerService(),
NoopTracer.INSTANCE
),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,7 @@ public AzureRepository(
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(
metadata,
COMPRESS_SETTING.get(metadata.settings()),
namedXContentRegistry,
clusterService,
recoverySettings,
buildLocation(metadata)
);
super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.storageService = storageService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(
metadata,
getSetting(COMPRESS_SETTING, metadata),
namedXContentRegistry,
clusterService,
recoverySettings,
buildLocation(metadata)
);
super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
this.storageService = storageService;

String basePath = BASE_PATH.get(metadata.settings());
Expand Down
2 changes: 1 addition & 1 deletion plugins/repository-hdfs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ dependencies {
}
api 'org.apache.htrace:htrace-core4:4.2.0-incubating'
api "org.apache.logging.log4j:log4j-core:${versions.log4j}"
api 'org.apache.avro:avro:1.11.2'
api 'org.apache.avro:avro:1.11.3'
api 'com.google.code.gson:gson:2.10.1'
runtimeOnly "com.google.guava:guava:${versions.guava}"
api "commons-logging:commons-logging:${versions.commonslogging}"
Expand Down
1 change: 0 additions & 1 deletion plugins/repository-hdfs/licenses/avro-1.11.2.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions plugins/repository-hdfs/licenses/avro-1.11.3.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
02b463409b373bff9ece09f54a43d42da5cea55a
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public HdfsRepository(
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings);
super(metadata, namedXContentRegistry, clusterService, recoverySettings);

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,37 +241,23 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
return;
}

final List<CompletableFuture<InputStreamContainer>> blobPartInputStreamFutures = new ArrayList<>();
final List<ReadContext.StreamPartCreator> blobPartInputStreamFutures = new ArrayList<>();
final long blobSize = blobMetadata.objectSize();
final Integer numberOfParts = blobMetadata.objectParts() == null ? null : blobMetadata.objectParts().totalPartsCount();
final String blobChecksum = blobMetadata.checksum().checksumCRC32();

if (numberOfParts == null) {
blobPartInputStreamFutures.add(getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, null));
blobPartInputStreamFutures.add(() -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, null));
} else {
// S3 multipart files use 1 to n indexing
for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) {
blobPartInputStreamFutures.add(getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, partNumber));
final int innerPartNumber = partNumber;
blobPartInputStreamFutures.add(
() -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, innerPartNumber)
);
}
}

CompletableFuture.allOf(blobPartInputStreamFutures.toArray(CompletableFuture[]::new))
.whenComplete((unused, partThrowable) -> {
if (partThrowable == null) {
listener.onResponse(
new ReadContext(
blobSize,
blobPartInputStreamFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()),
blobChecksum
)
);
} else {
Exception ex = partThrowable.getCause() instanceof Exception
? (Exception) partThrowable.getCause()
: new Exception(partThrowable.getCause());
listener.onFailure(ex);
}
});
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum));
});
} catch (Exception ex) {
listener.onFailure(SdkException.create("Error occurred while fetching blob parts from the repository", ex));
Expand Down
Loading

0 comments on commit dafd876

Please sign in to comment.