Skip to content

Commit

Permalink
Add shard id to remote store logs
Browse files Browse the repository at this point in the history
  • Loading branch information
linuxpi committed Jul 10, 2023
1 parent 418ab51 commit 88c5342
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public RemoteStoreRefreshListener(
try {
remoteSegmentMetadata = this.remoteDirectory.init();
} catch (IOException e) {
logger.error("Exception while initialising RemoteSegmentStoreDirectory", e);
logger.error("{} Exception while initialising RemoteSegmentStoreDirectory", indexShard.shardId, e);
}
}
// initializing primary term with the primary term of latest metadata in remote store.
Expand Down Expand Up @@ -176,7 +176,7 @@ public void afterRefresh(boolean didRefresh) {
try {
indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get();
} catch (InterruptedException | ExecutionException e) {
logger.info("Exception occurred while scheduling syncSegments", e);
logger.info("{} Exception occurred while scheduling syncSegments", indexShard.shardId, e);
}
}
}
Expand Down Expand Up @@ -255,15 +255,15 @@ private synchronized void syncSegments(boolean isRetry) {
}
}
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
logger.warn("{} Exception while reading SegmentInfosSnapshot", indexShard.shardId, e);
}
} catch (IOException e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log integration.
logger.warn("Exception while uploading new segments to the remote segment store", e);
logger.warn("{} Exception while uploading new segments to the remote segment store", indexShard.shardId, e);
}
} catch (Throwable t) {
logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t);
logger.error("{} Exception in RemoteStoreRefreshListener.afterRefresh()", indexShard.shardId, t);
} finally {
// Update the segment tracker with the final upload status as seen at the end
updateFinalUploadStatusInSegmentTracker(shouldRetry == false, bytesBeforeUpload, startTimeInNS);
Expand All @@ -286,7 +286,7 @@ private void clearStaleFilesFromLocalSegmentChecksumMap(Collection<String> local

private void beforeSegmentsSync(boolean isRetry) {
if (isRetry) {
logger.info("Retrying to sync the segments to remote store");
logger.info("{} Retrying to sync the segments to remote store", indexShard.shardId);
}
// Start tracking total uploads started
segmentTracker.incrementTotalUploadsStarted();
Expand Down Expand Up @@ -384,7 +384,14 @@ private boolean uploadNewSegments(Collection<String> localSegmentsPostRefresh) t
fileUploader.uploadFile(file);
} catch (IOException e) {
uploadSuccess.set(false);
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
logger.warn(
() -> new ParameterizedMessage(
"{} Exception while uploading file {} to the remote segment store",
indexShard.shardId,
file
),
e
);
}
});
return uploadSuccess.get();
Expand Down Expand Up @@ -436,7 +443,10 @@ private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
try {
fileSize = storeDirectory.fileLength(file);
} catch (IOException e) {
logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e);
logger.warn(
new ParameterizedMessage("{} Exception while reading the fileLength of file={}", indexShard.shardId, file),
e
);
}
latestFileNameSizeOnLocalMap.put(file, fileSize);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public RemoteFsTranslog(
Checkpoint checkpoint = readCheckpoint(location);
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
throw new IllegalStateException(shardId + " at least one reader must be recovered");
}
boolean success = false;
current = null;
Expand Down Expand Up @@ -121,7 +121,7 @@ public RemoteFsTranslog(
}

public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
assert repository instanceof BlobStoreRepository : shardId + " repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
Expand All @@ -134,7 +134,7 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
logger.info("Downloading translog files from remote for shard {} ", translogTransferManager.getShardId());
logger.info("{} Downloading translog files from remote", translogTransferManager.getShardId());
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
if (Files.notExists(location)) {
Expand All @@ -156,7 +156,7 @@ public static void download(TranslogTransferManager translogTransferManager, Pat
location.resolve(Translog.CHECKPOINT_FILE_NAME)
);
}
logger.info("Downloaded translog files from remote for shard {} ", translogTransferManager.getShardId());
logger.info("{} Downloaded translog files from remote", translogTransferManager.getShardId());
}

public static TranslogTransferManager buildTranslogTransferManager(
Expand Down Expand Up @@ -205,7 +205,7 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
readers.add(reader);
copyCheckpointTo(location.resolve(getCommitCheckpointFileName(current.getGeneration())));
if (closed.get() == false) {
logger.trace("Creating new writer for gen: [{}]", current.getGeneration() + 1);
logger.trace("{} Creating new writer for gen: [{}]", shardId, current.getGeneration() + 1);
current = createWriter(current.getGeneration() + 1);
}
} catch (final Exception e) {
Expand Down Expand Up @@ -241,11 +241,11 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException {
// primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns
// downloads all the translogs from remote store and does a flush before the relocation finishes.
if (primaryModeSupplier.getAsBoolean() == false) {
logger.trace("skipped uploading translog for {} {}", primaryTerm, generation);
logger.trace("{} skipped uploading translog for {} {}", shardId, primaryTerm, generation);
// NO-OP
return true;
}
logger.trace("uploading translog for {} {}", primaryTerm, generation);
logger.trace("{} uploading translog for {} {}", shardId, primaryTerm, generation);
try (
TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder(
primaryTerm,
Expand All @@ -264,7 +264,7 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti
closeFilesIfNoPendingRetentionLocks();
maxRemoteTranslogGenerationUploaded = generation;
minRemoteGenReferenced = getMinFileGeneration();
logger.trace("uploaded translog for {} {} ", primaryTerm, generation);
logger.trace("{} uploaded translog for {} {} ", shardId, primaryTerm, generation);
}

@Override
Expand Down Expand Up @@ -321,13 +321,13 @@ public boolean syncNeeded() {

@Override
public void close() throws IOException {
assert Translog.calledFromOutsideOrViaTragedyClose()
: "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method";
assert Translog.calledFromOutsideOrViaTragedyClose() : shardId
+ "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method";
if (closed.compareAndSet(false, true)) {
try (ReleasableLock lock = writeLock.acquire()) {
sync();
} finally {
logger.debug("translog closed");
logger.debug("{} translog closed", shardId);
closeFilesIfNoPendingRetentionLocks();
}
}
Expand All @@ -340,12 +340,14 @@ protected long getMinReferencedGen() throws IOException {
minGenerationForSeqNo(minSeqNoToKeep, current, readers)
);

assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of ["
assert minReferencedGen >= getMinFileGeneration() : shardId
+ " deletion policy requires a minReferenceGen of ["
+ minReferencedGen
+ "] but the lowest gen available is ["
+ getMinFileGeneration()
+ "]";
assert minReferencedGen <= currentFileGeneration() : "deletion policy requires a minReferenceGen of ["
assert minReferencedGen <= currentFileGeneration() : shardId
+ " deletion policy requires a minReferenceGen of ["
+ minReferencedGen
+ "] which is higher than the current generation ["
+ currentFileGeneration()
Expand All @@ -356,7 +358,7 @@ protected long getMinReferencedGen() throws IOException {
protected void setMinSeqNoToKeep(long seqNo) {
if (seqNo < this.minSeqNoToKeep) {
throw new IllegalArgumentException(
"min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]"
shardId + " min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]"
);
}
this.minSeqNoToKeep = seqNo;
Expand Down Expand Up @@ -416,7 +418,7 @@ private void deleteStaleRemotePrimaryTerms() {
// of older primary term.
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
// First we delete all stale primary terms folders from remote store
assert readers.isEmpty() == false : "Expected non-empty readers";
assert readers.isEmpty() == false : shardId + " Expected non-empty readers";
long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get();
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
}
Expand All @@ -438,7 +440,7 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th

protected void onDelete() {
if (primaryModeSupplier.getAsBoolean() == false) {
logger.trace("skipped delete translog");
logger.trace("{} skipped delete translog", shardId);
// NO-OP
return;
}
Expand Down
Loading

0 comments on commit 88c5342

Please sign in to comment.