Skip to content

Commit

Permalink
Add timestamp pinning service and scheduler to update in-memory state
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Aug 9, 2024
1 parent 978d14e commit 83c945b
Show file tree
Hide file tree
Showing 7 changed files with 593 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.opensearch.node.Node.DiscoverySettings;
import org.opensearch.node.NodeRoleSettings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.decider.EnableAssignmentDecider;
Expand Down Expand Up @@ -760,6 +761,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

RemoteStorePinnedTimestampService.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL,

// Composite index settings
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING,

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;

/**
* Wrapper class for uploading/downloading {@link RemotePinnedTimestamps} to/from remote blob store
*
* @opensearch.internal
*/
public class RemotePinnedTimestamps extends AbstractRemoteWritableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> {
private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class);

public static class PinnedTimestamps extends AbstractDiffable<ClusterBlocks> {
private final Map<Long, List<String>> pinnedTimestampPinningEntityMap;

public PinnedTimestamps(Map<Long, List<String>> pinnedTimestampPinningEntityMap) {
this.pinnedTimestampPinningEntityMap = new ConcurrentHashMap<>(pinnedTimestampPinningEntityMap);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(pinnedTimestampPinningEntityMap, StreamOutput::writeLong, StreamOutput::writeStringCollection);
}

public static PinnedTimestamps readFrom(StreamInput in) throws IOException {
return new PinnedTimestamps(in.readMap(StreamInput::readLong, StreamInput::readStringList));
}

public void pin(Long timestamp, String pinningEntity) {
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity);
pinnedTimestampPinningEntityMap.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(pinningEntity);
}

public void unpin(Long timestamp, String pinningEntity) {
logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity);
pinnedTimestampPinningEntityMap.computeIfPresent(timestamp, (k, v) -> {
v.remove(pinningEntity);
return v.isEmpty() ? null : v;
});
}

public Map<Long, List<String>> getPinnedTimestampPinningEntityMap() {
return new HashMap<>(pinnedTimestampPinningEntityMap);
}
}

public static final String PINNED_TIMESTAMPS = "pinned_timestamps";
public static final ChecksumWritableBlobStoreFormat<PinnedTimestamps> PINNED_TIMESTAMPS_FORMAT = new ChecksumWritableBlobStoreFormat<>(
PINNED_TIMESTAMPS,
PinnedTimestamps::readFrom
);

private PinnedTimestamps pinnedTimestamps;

public RemotePinnedTimestamps(String clusterUUID, Compressor compressor, NamedXContentRegistry namedXContentRegistry) {
super(clusterUUID, compressor, namedXContentRegistry);
pinnedTimestamps = new PinnedTimestamps(new HashMap<>());
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(PINNED_TIMESTAMPS), PINNED_TIMESTAMPS);
}

@Override
public String getType() {
return PINNED_TIMESTAMPS;
}

@Override
public String generateBlobFileName() {
return this.blobFileName = String.join(DELIMITER, PINNED_TIMESTAMPS, RemoteStoreUtils.invertLong(System.currentTimeMillis()));
}

@Override
public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {
return null;
}

@Override
public InputStream serialize() throws IOException {
return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput();
}

@Override
public PinnedTimestamps deserialize(InputStream inputStream) throws IOException {
return PINNED_TIMESTAMPS_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
}

public void setBlobFileName(String blobFileName) {
this.blobFileName = blobFileName;
}

public void setPinnedTimestamps(PinnedTimestamps pinnedTimestamps) {
this.pinnedTimestamps = pinnedTimestamps;
}

public PinnedTimestamps getPinnedTimestamps() {
return pinnedTimestamps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

/**
* Extends the RemoteClusterStateBlobStore to support {@link RemotePinnedTimestamps}
*/
public class RemoteStorePinnedTimestampsBlobStore extends RemoteClusterStateBlobStore<
RemotePinnedTimestamps.PinnedTimestamps,
RemotePinnedTimestamps> {

private final BlobStoreRepository blobStoreRepository;

public RemoteStorePinnedTimestampsBlobStore(
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName,
ThreadPool threadPool,
String executor
) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor);
this.blobStoreRepository = blobStoreRepository;
}

@Override
public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> obj) {
return blobStoreRepository.basePath();
}
}
20 changes: 19 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.node.resource.tracker.NodeResourceUsageTracker;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.PersistentTasksExecutor;
Expand Down Expand Up @@ -784,6 +785,7 @@ protected Node(
final RemoteClusterStateService remoteClusterStateService;
final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
final RemoteIndexPathUploader remoteIndexPathUploader;
final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService;
if (isRemoteStoreClusterStateEnabled(settings)) {
remoteIndexPathUploader = new RemoteIndexPathUploader(
threadPool,
Expand All @@ -801,11 +803,19 @@ protected Node(
List.of(remoteIndexPathUploader),
namedWriteableRegistry
);
remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService(
repositoriesServiceReference::get,
settings,
threadPool,
clusterService
);
resourcesToClose.add(remoteStorePinnedTimestampService);
remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager();
} else {
remoteClusterStateService = null;
remoteIndexPathUploader = null;
remoteClusterStateCleanupManager = null;
remoteStorePinnedTimestampService = null;
}

// collect engine factory providers from plugins
Expand Down Expand Up @@ -1170,7 +1180,8 @@ protected Node(
clusterModule.getIndexNameExpressionResolver(),
repositoryService,
transportService,
actionModule.getActionFilters()
actionModule.getActionFilters(),
remoteStorePinnedTimestampService
);
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
settings,
Expand Down Expand Up @@ -1416,6 +1427,7 @@ protected Node(
b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader);
b.bind(RemoteStorePinnedTimestampService.class).toProvider(() -> remoteStorePinnedTimestampService);
b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
Expand Down Expand Up @@ -1569,6 +1581,12 @@ public Node start() throws NodeValidationException {
if (remoteIndexPathUploader != null) {
remoteIndexPathUploader.start();
}
final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = injector.getInstance(
RemoteStorePinnedTimestampService.class
);
if (remoteStorePinnedTimestampService != null) {
remoteStorePinnedTimestampService.start();
}
// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(
Expand Down
Loading

0 comments on commit 83c945b

Please sign in to comment.