Skip to content

Commit

Permalink
Introduce Local checkpoints
Browse files Browse the repository at this point in the history
This PR introduces the notion of a local checkpoint on the shard level. A local check point is defined as a the highest sequence number for which all previous operations (i.e. with a lower seq#) have been processed.

relates to #10708

Closes #15390

formatting
  • Loading branch information
bleskes committed Dec 15, 2015
1 parent b836781 commit 3106948
Show file tree
Hide file tree
Showing 22 changed files with 748 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
Expand All @@ -30,6 +29,7 @@
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -105,7 +105,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
// only report on fully started shards
shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats()));
shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(),
new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats(), indexShard.seqNoStats()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ public static enum Flag {
RequestCache("request_cache"),
Recovery("recovery");


private final String restName;

Flag(String restName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.ShardPath;

import java.io.IOException;
Expand All @@ -42,20 +42,23 @@ public class ShardStats implements Streamable, ToXContent {
private CommonStats commonStats;
@Nullable
private CommitStats commitStats;
@Nullable
private SeqNoStats seqNoStats;
private String dataPath;
private String statePath;
private boolean isCustomDataPath;

ShardStats() {
}

public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats) {
public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats, SeqNoStats seqNoStats) {
this.shardRouting = routing;
this.dataPath = shardPath.getRootDataPath().toString();
this.statePath = shardPath.getRootStatePath().toString();
this.isCustomDataPath = shardPath.isCustomDataPath();
this.commitStats = commitStats;
this.commonStats = commonStats;
this.seqNoStats = seqNoStats;
}

/**
Expand All @@ -73,6 +76,11 @@ public CommitStats getCommitStats() {
return this.commitStats;
}

@Nullable
public SeqNoStats getSeqNoStats() {
return this.seqNoStats;
}

public String getDataPath() {
return dataPath;
}
Expand All @@ -99,6 +107,7 @@ public void readFrom(StreamInput in) throws IOException {
statePath = in.readString();
dataPath = in.readString();
isCustomDataPath = in.readBoolean();
seqNoStats = in.readOptionalStreamableReader(SeqNoStats::new);
}

@Override
Expand All @@ -109,6 +118,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(statePath);
out.writeString(dataPath);
out.writeBoolean(isCustomDataPath);
out.writeOptionalWritable(seqNoStats);
}

@Override
Expand All @@ -124,6 +134,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (commitStats != null) {
commitStats.toXContent(builder, params);
}
if (seqNoStats != null) {
seqNoStats.toXContent(builder, params);
}
builder.startObject(Fields.SHARD_PATH);
builder.field(Fields.STATE_PATH, statePath);
builder.field(Fields.DATA_PATH, dataPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh
flags.set(CommonStatsFlags.Flag.Recovery);
}

return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats());
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(),
new CommonStats(indexShard, flags), indexShard.commitStats(), indexShard.seqNoStats());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.cluster.metadata;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.apache.lucene.analysis.Analyzer;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.UnassignedInfo;
Expand All @@ -30,6 +29,7 @@
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.indices.mapper.MapperRegistry;

Expand Down Expand Up @@ -93,8 +93,8 @@ private boolean isUpgraded(IndexMetaData indexMetaData) {
private void checkSupportedVersion(IndexMetaData indexMetaData) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData) == false) {
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created before v2.0.0.beta1 and wasn't upgraded."
+ " This index should be open using a version before " + Version.CURRENT.minimumCompatibilityVersion()
+ " and upgraded using the upgrade API.");
+ " This index should be open using a version before " + Version.CURRENT.minimumCompatibilityVersion()
+ " and upgraded using the upgrade API.");
}
}

Expand All @@ -107,7 +107,7 @@ private static boolean isSupportedVersion(IndexMetaData indexMetaData) {
return true;
}
if (indexMetaData.getMinimumCompatibleVersion() != null &&
indexMetaData.getMinimumCompatibleVersion().onOrAfter(org.apache.lucene.util.Version.LUCENE_5_0_0)) {
indexMetaData.getMinimumCompatibleVersion().onOrAfter(org.apache.lucene.util.Version.LUCENE_5_0_0)) {
//The index was upgraded we can work with it
return true;
}
Expand All @@ -116,42 +116,43 @@ private static boolean isSupportedVersion(IndexMetaData indexMetaData) {

/** All known byte-sized settings for an index. */
public static final Set<String> INDEX_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet(
"index.merge.policy.floor_segment",
"index.merge.policy.max_merged_segment",
"index.merge.policy.max_merge_size",
"index.merge.policy.min_merge_size",
"index.shard.recovery.file_chunk_size",
"index.shard.recovery.translog_size",
"index.store.throttle.max_bytes_per_sec",
"index.translog.flush_threshold_size",
"index.translog.fs.buffer_size",
"index.version_map_size"));
"index.merge.policy.floor_segment",
"index.merge.policy.max_merged_segment",
"index.merge.policy.max_merge_size",
"index.merge.policy.min_merge_size",
"index.shard.recovery.file_chunk_size",
"index.shard.recovery.translog_size",
"index.store.throttle.max_bytes_per_sec",
"index.translog.flush_threshold_size",
"index.translog.fs.buffer_size",
"index.version_map_size"));

/** All known time settings for an index. */
public static final Set<String> INDEX_TIME_SETTINGS = unmodifiableSet(newHashSet(
"index.gateway.wait_for_mapping_update_post_recovery",
"index.shard.wait_for_mapping_update_post_recovery",
"index.gc_deletes",
"index.indexing.slowlog.threshold.index.debug",
"index.indexing.slowlog.threshold.index.info",
"index.indexing.slowlog.threshold.index.trace",
"index.indexing.slowlog.threshold.index.warn",
"index.refresh_interval",
"index.search.slowlog.threshold.fetch.debug",
"index.search.slowlog.threshold.fetch.info",
"index.search.slowlog.threshold.fetch.trace",
"index.search.slowlog.threshold.fetch.warn",
"index.search.slowlog.threshold.query.debug",
"index.search.slowlog.threshold.query.info",
"index.search.slowlog.threshold.query.trace",
"index.search.slowlog.threshold.query.warn",
"index.shadow.wait_for_initial_commit",
"index.store.stats_refresh_interval",
"index.translog.flush_threshold_period",
"index.translog.interval",
"index.translog.sync_interval",
"index.shard.inactive_time",
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING));
"index.gateway.wait_for_mapping_update_post_recovery",
"index.shard.wait_for_mapping_update_post_recovery",
"index.gc_deletes",
"index.indexing.slowlog.threshold.index.debug",
"index.indexing.slowlog.threshold.index.info",
"index.indexing.slowlog.threshold.index.trace",
"index.indexing.slowlog.threshold.index.warn",
"index.refresh_interval",
"index.search.slowlog.threshold.fetch.debug",
"index.search.slowlog.threshold.fetch.info",
"index.search.slowlog.threshold.fetch.trace",
"index.search.slowlog.threshold.fetch.warn",
"index.search.slowlog.threshold.query.debug",
"index.search.slowlog.threshold.query.info",
"index.search.slowlog.threshold.query.trace",
"index.search.slowlog.threshold.query.warn",
"index.shadow.wait_for_initial_commit",
"index.store.stats_refresh_interval",
"index.translog.flush_threshold_period",
"index.translog.interval",
"index.translog.sync_interval",
"index.shard.inactive_time",
LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE,
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING));

/**
* Elasticsearch 2.0 requires units on byte/memory and time settings; this method adds the default unit to any such settings that are
Expand All @@ -163,7 +164,7 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) {
// Created lazily if we find any settings that are missing units:
Settings settings = indexMetaData.getSettings();
Settings.Builder newSettings = null;
for(String byteSizeSetting : INDEX_BYTES_SIZE_SETTINGS) {
for (String byteSizeSetting : INDEX_BYTES_SIZE_SETTINGS) {
String value = settings.get(byteSizeSetting);
if (value != null) {
try {
Expand All @@ -180,7 +181,7 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) {
newSettings.put(byteSizeSetting, value + "b");
}
}
for(String timeSetting : INDEX_TIME_SETTINGS) {
for (String timeSetting : INDEX_TIME_SETTINGS) {
String value = settings.get(timeSetting);
if (value != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public short readShort() throws IOException {
*/
public int readInt() throws IOException {
return ((readByte() & 0xFF) << 24) | ((readByte() & 0xFF) << 16)
| ((readByte() & 0xFF) << 8) | (readByte() & 0xFF);
| ((readByte() & 0xFF) << 8) | (readByte() & 0xFF);
}

/**
Expand Down Expand Up @@ -543,6 +543,17 @@ public <T extends Streamable> T readOptionalStreamable(Supplier<T> supplier) thr
}
}

/**
* Serializes a potential null value.
*/
public <T> T readOptionalStreamableReader(StreamableReader<T> streamableReader) throws IOException {
if (readBoolean()) {
return streamableReader.readFrom(this);
} else {
return null;
}
}

public <T extends Throwable> T readThrowable() throws IOException {
if (readBoolean()) {
int key = readVInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,18 @@ public void writeOptionalStreamable(@Nullable Streamable streamable) throws IOEx
}
}

/**
* Serializes a potential null value.
*/
public void writeOptionalWritable(@Nullable Writeable writeable) throws IOException {
if (writeable != null) {
writeBoolean(true);
writeable.writeTo(this);
} else {
writeBoolean(false);
}
}

public void writeThrowable(Throwable throwable) throws IOException {
if (throwable == null) {
writeBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@
public class EsRejectedExecutionException extends ElasticsearchException {
private final boolean isExecutorShutdown;

public EsRejectedExecutionException(String message, boolean isExecutorShutdown) {
super(message);
public EsRejectedExecutionException(String message, boolean isExecutorShutdown, Object... args) {
super(message, args);
this.isExecutorShutdown = isExecutorShutdown;
}

public EsRejectedExecutionException(String message) {
this(message, false);
public EsRejectedExecutionException(String message, Object... args) {
this(message, false, args);
}

public EsRejectedExecutionException(String message, boolean isExecutorShutdown) {
this(message, isExecutorShutdown, new Object[0]);
}

public EsRejectedExecutionException() {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -325,6 +326,9 @@ public CommitStats commitStats() {
return new CommitStats(getLastCommittedSegmentInfos());
}

/** get sequence number related stats */
public abstract SeqNoStats seqNoStats();

/**
* Read the last segments info from the commit pointed to by the searcher manager
*/
Expand Down
Loading

0 comments on commit 3106948

Please sign in to comment.