From 88c534238447d3f0b94ae8cb4f1a527cc198d529 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Mon, 10 Jul 2023 19:04:45 +0530 Subject: [PATCH] Add shard id to remote store logs --- .../shard/RemoteStoreRefreshListener.java | 26 +++++--- .../index/translog/RemoteFsTranslog.java | 34 ++++++----- .../transfer/TranslogTransferManager.java | 60 ++++++++++++------- 3 files changed, 73 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index aaba74cd54341..e390859f5a80c 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -127,7 +127,7 @@ public RemoteStoreRefreshListener( try { remoteSegmentMetadata = this.remoteDirectory.init(); } catch (IOException e) { - logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); + logger.error("{} Exception while initialising RemoteSegmentStoreDirectory", indexShard.shardId, e); } } // initializing primary term with the primary term of latest metadata in remote store. @@ -176,7 +176,7 @@ public void afterRefresh(boolean didRefresh) { try { indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); } catch (InterruptedException | ExecutionException e) { - logger.info("Exception occurred while scheduling syncSegments", e); + logger.info("{} Exception occurred while scheduling syncSegments", indexShard.shardId, e); } } } @@ -255,15 +255,15 @@ private synchronized void syncSegments(boolean isRetry) { } } } catch (EngineException e) { - logger.warn("Exception while reading SegmentInfosSnapshot", e); + logger.warn("{} Exception while reading SegmentInfosSnapshot", indexShard.shardId, e); } } catch (IOException e) { // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. - logger.warn("Exception while uploading new segments to the remote segment store", e); + logger.warn("{} Exception while uploading new segments to the remote segment store", indexShard.shardId, e); } } catch (Throwable t) { - logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); + logger.error("{} Exception in RemoteStoreRefreshListener.afterRefresh()", indexShard.shardId, t); } finally { // Update the segment tracker with the final upload status as seen at the end updateFinalUploadStatusInSegmentTracker(shouldRetry == false, bytesBeforeUpload, startTimeInNS); @@ -286,7 +286,7 @@ private void clearStaleFilesFromLocalSegmentChecksumMap(Collection local private void beforeSegmentsSync(boolean isRetry) { if (isRetry) { - logger.info("Retrying to sync the segments to remote store"); + logger.info("{} Retrying to sync the segments to remote store", indexShard.shardId); } // Start tracking total uploads started segmentTracker.incrementTotalUploadsStarted(); @@ -384,7 +384,14 @@ private boolean uploadNewSegments(Collection localSegmentsPostRefresh) t fileUploader.uploadFile(file); } catch (IOException e) { uploadSuccess.set(false); - logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); + logger.warn( + () -> new ParameterizedMessage( + "{} Exception while uploading file {} to the remote segment store", + indexShard.shardId, + file + ), + e + ); } }); return uploadSuccess.get(); @@ -436,7 +443,10 @@ private void updateLocalSizeMapAndTracker(Collection segmentFiles) { try { fileSize = storeDirectory.fileLength(file); } catch (IOException e) { - logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e); + logger.warn( + new ParameterizedMessage("{} Exception while reading the fileLength of file={}", indexShard.shardId, file), + e + ); } latestFileNameSizeOnLocalMap.put(file, fileSize); }); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 1e565b97387d1..0694005950f17 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -91,7 +91,7 @@ public RemoteFsTranslog( Checkpoint checkpoint = readCheckpoint(location); this.readers.addAll(recoverFromFiles(checkpoint)); if (readers.isEmpty()) { - throw new IllegalStateException("at least one reader must be recovered"); + throw new IllegalStateException(shardId + " at least one reader must be recovered"); } boolean success = false; current = null; @@ -121,7 +121,7 @@ public RemoteFsTranslog( } public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException { - assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + assert repository instanceof BlobStoreRepository : shardId + " repository should be instance of BlobStoreRepository"; BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); TranslogTransferManager translogTransferManager = buildTranslogTransferManager( @@ -134,7 +134,7 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t } public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException { - logger.info("Downloading translog files from remote for shard {} ", translogTransferManager.getShardId()); + logger.info("{} Downloading translog files from remote", translogTransferManager.getShardId()); TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); if (translogMetadata != null) { if (Files.notExists(location)) { @@ -156,7 +156,7 @@ public static void download(TranslogTransferManager translogTransferManager, Pat location.resolve(Translog.CHECKPOINT_FILE_NAME) ); } - logger.info("Downloaded translog files from remote for shard {} ", translogTransferManager.getShardId()); + logger.info("{} Downloaded translog files from remote", translogTransferManager.getShardId()); } public static TranslogTransferManager buildTranslogTransferManager( @@ -205,7 +205,7 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc readers.add(reader); copyCheckpointTo(location.resolve(getCommitCheckpointFileName(current.getGeneration()))); if (closed.get() == false) { - logger.trace("Creating new writer for gen: [{}]", current.getGeneration() + 1); + logger.trace("{} Creating new writer for gen: [{}]", shardId, current.getGeneration() + 1); current = createWriter(current.getGeneration() + 1); } } catch (final Exception e) { @@ -241,11 +241,11 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException { // primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns // downloads all the translogs from remote store and does a flush before the relocation finishes. if (primaryModeSupplier.getAsBoolean() == false) { - logger.trace("skipped uploading translog for {} {}", primaryTerm, generation); + logger.trace("{} skipped uploading translog for {} {}", shardId, primaryTerm, generation); // NO-OP return true; } - logger.trace("uploading translog for {} {}", primaryTerm, generation); + logger.trace("{} uploading translog for {} {}", shardId, primaryTerm, generation); try ( TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder( primaryTerm, @@ -264,7 +264,7 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti closeFilesIfNoPendingRetentionLocks(); maxRemoteTranslogGenerationUploaded = generation; minRemoteGenReferenced = getMinFileGeneration(); - logger.trace("uploaded translog for {} {} ", primaryTerm, generation); + logger.trace("{} uploaded translog for {} {} ", shardId, primaryTerm, generation); } @Override @@ -321,13 +321,13 @@ public boolean syncNeeded() { @Override public void close() throws IOException { - assert Translog.calledFromOutsideOrViaTragedyClose() - : "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method"; + assert Translog.calledFromOutsideOrViaTragedyClose() : shardId + + "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method"; if (closed.compareAndSet(false, true)) { try (ReleasableLock lock = writeLock.acquire()) { sync(); } finally { - logger.debug("translog closed"); + logger.debug("{} translog closed", shardId); closeFilesIfNoPendingRetentionLocks(); } } @@ -340,12 +340,14 @@ protected long getMinReferencedGen() throws IOException { minGenerationForSeqNo(minSeqNoToKeep, current, readers) ); - assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + assert minReferencedGen >= getMinFileGeneration() : shardId + + " deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + getMinFileGeneration() + "]"; - assert minReferencedGen <= currentFileGeneration() : "deletion policy requires a minReferenceGen of [" + assert minReferencedGen <= currentFileGeneration() : shardId + + " deletion policy requires a minReferenceGen of [" + minReferencedGen + "] which is higher than the current generation [" + currentFileGeneration() @@ -356,7 +358,7 @@ protected long getMinReferencedGen() throws IOException { protected void setMinSeqNoToKeep(long seqNo) { if (seqNo < this.minSeqNoToKeep) { throw new IllegalArgumentException( - "min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]" + shardId + " min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]" ); } this.minSeqNoToKeep = seqNo; @@ -416,7 +418,7 @@ private void deleteStaleRemotePrimaryTerms() { // of older primary term. if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { // First we delete all stale primary terms folders from remote store - assert readers.isEmpty() == false : "Expected non-empty readers"; + assert readers.isEmpty() == false : shardId + " Expected non-empty readers"; long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get(); translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm); } @@ -438,7 +440,7 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th protected void onDelete() { if (primaryModeSupplier.getAsBoolean() == false) { - logger.trace("skipped delete translog"); + logger.trace("{} skipped delete translog", shardId); // NO-OP return; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 6da0ee5521738..23641eb722b70 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -98,7 +98,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); if (toUpload.isEmpty()) { - logger.trace("Nothing to upload for transfer"); + logger.trace(getShardId() + " Nothing to upload for transfer"); translogTransferListener.onUploadComplete(transferSnapshot); return true; } @@ -108,7 +108,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans assert ex instanceof FileTransferException; logger.error( () -> new ParameterizedMessage( - "Exception during transfer for file {}", + getShardId() + " Exception during transfer for file {}", ((FileTransferException) ex).getFileSnapshot().getName() ), ex @@ -129,7 +129,9 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans ); try { if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { - Exception ex = new TimeoutException("Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"); + Exception ex = new TimeoutException( + getShardId() + " Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete" + ); exceptionList.forEach(ex::addSuppressed); throw ex; } @@ -143,12 +145,12 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans translogTransferListener.onUploadComplete(transferSnapshot); return true; } else { - Exception ex = new IOException("Failed to upload " + exceptionList.size() + " files during transfer"); + Exception ex = new IOException(getShardId() + " Failed to upload " + exceptionList.size() + " files during transfer"); exceptionList.forEach(ex::addSuppressed); throw ex; } } catch (Exception ex) { - logger.error(() -> new ParameterizedMessage("Transfer failed for snapshot {}", transferSnapshot), ex); + logger.error(() -> new ParameterizedMessage("{} Transfer failed for snapshot {}", getShardId(), transferSnapshot), ex); translogTransferListener.onUploadFailed(transferSnapshot, ex); return false; } @@ -156,7 +158,8 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans public boolean downloadTranslog(String primaryTerm, String generation, Path location) throws IOException { logger.info( - "Downloading translog files with: Primary Term = {}, Generation = {}, Location = {}", + "{} Downloading translog files with: Primary Term = {}, Generation = {}, Location = {}", + getShardId(), primaryTerm, generation, location @@ -196,11 +199,11 @@ public TranslogTransferMetadata readMetadata() throws IOException { IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput)); } catch (IOException e) { - logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e); + logger.error(() -> new ParameterizedMessage("{} Exception while reading metadata file: {}", getShardId(), filename), e); exceptionSetOnce.set(e); } }, e -> { - logger.error(() -> new ParameterizedMessage("Exception while listing metadata files "), e); + logger.error(() -> new ParameterizedMessage("{} Exception while listing metadata files", getShardId()), e); exceptionSetOnce.set((IOException) e); }), latch @@ -210,7 +213,7 @@ public TranslogTransferMetadata readMetadata() throws IOException { transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener); latch.await(); } catch (InterruptedException e) { - throw new IOException("Exception while reading/downloading metadafile", e); + throw new IOException(getShardId() + " Exception while reading/downloading metadafile", e); } if (exceptionSetOnce.get() != null) { @@ -295,7 +298,7 @@ public void deleteGenerationAsync(long primaryTerm, Set generations, Runna * @param minPrimaryTermToKeep all primary terms below this primary term are deleted. */ public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) { - logger.info("Deleting primary terms from remote store lesser than {} for {}", minPrimaryTermToKeep, shardId); + logger.info("{} Deleting primary terms from remote store lesser than {}", getShardId(), minPrimaryTermToKeep); transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() { @Override public void onResponse(Set folders) { @@ -316,7 +319,7 @@ public void onResponse(Set folders) { @Override public void onFailure(Exception e) { - logger.error("Exception occurred while getting primary terms from remote store", e); + logger.error("{} Exception occurred while getting primary terms from remote store", getShardId(), e); } }); } @@ -333,12 +336,15 @@ private void deletePrimaryTermAsync(long primaryTerm) { new ActionListener<>() { @Override public void onResponse(Void unused) { - logger.info("Deleted primary term {} for {}", primaryTerm, shardId); + logger.info("{} Deleted primary term {}", getShardId(), primaryTerm); } @Override public void onFailure(Exception e) { - logger.error(new ParameterizedMessage("Exception occurred while deleting primary term {}", primaryTerm), e); + logger.error( + new ParameterizedMessage("{} Exception occurred while deleting primary term {}", getShardId(), primaryTerm), + e + ); } } ); @@ -349,12 +355,12 @@ public void delete() { transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() { @Override public void onResponse(Void unused) { - logger.info("Deleted all remote translog data for {}", shardId); + logger.info("{} Deleted all remote translog data", getShardId()); } @Override public void onFailure(Exception e) { - logger.error("Exception occurred while cleaning translog ", e); + logger.error("{} Exception occurred while cleaning translog", getShardId(), e); } }); } @@ -370,24 +376,24 @@ public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) { public void onResponse(List blobMetadata) { List sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); if (sortedMetadataFiles.size() <= 1) { - logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size()); + logger.trace("{} Remote Metadata file count is {}, so skipping deletion", getShardId(), sortedMetadataFiles.size()); onCompletion.run(); return; } List metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size()); - logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete); + logger.trace("{} Deleting remote translog metadata files {}", getShardId(), metadataFilesToDelete); deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion); } @Override public void onFailure(Exception e) { - logger.error("Exception occurred while listing translog metadata files from remote store", e); + logger.error("{} Exception occurred while listing translog metadata files from remote store", getShardId(), e); onCompletion.run(); } } ); } catch (Exception e) { - logger.error("Exception occurred while listing translog metadata files from remote store", e); + logger.error("{} Exception occurred while listing translog metadata files from remote store", getShardId(), e); onCompletion.run(); } } @@ -414,7 +420,7 @@ private void deleteTranslogFilesAsync(long primaryTerm, List files, Runn @Override public void onResponse(Void unused) { fileTransferTracker.delete(files); - logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files); + logger.trace("{} Deleted translogs for primaryTerm={} files={}", getShardId(), primaryTerm, files); onCompletion.run(); } @@ -423,7 +429,8 @@ public void onFailure(Exception e) { onCompletion.run(); logger.error( () -> new ParameterizedMessage( - "Exception occurred while deleting translog for primaryTerm={} files={}", + "{} Exception occurred while deleting translog for primaryTerm={} files={}", + getShardId(), primaryTerm, files ), @@ -450,13 +457,20 @@ private void deleteMetadataFilesAsync(List files, Runnable onCompletion) @Override public void onResponse(Void unused) { onCompletion.run(); - logger.trace("Deleted remote translog metadata files {}", files); + logger.trace("{} Deleted remote translog metadata files {}", getShardId(), files); } @Override public void onFailure(Exception e) { onCompletion.run(); - logger.error(new ParameterizedMessage("Exception occurred while deleting remote translog metadata files {}", files), e); + logger.error( + new ParameterizedMessage( + "{} Exception occurred while deleting remote translog metadata files {}", + getShardId(), + files + ), + e + ); } }); } catch (Exception e) {