From 120df399256db8e8dfae4f0791f5c84b7438f635 Mon Sep 17 00:00:00 2001 From: laa Date: Tue, 14 Jul 2020 17:53:59 +0300 Subject: [PATCH] Issue #9270 . Ensure that file writes are finished on close. --- .../wal/cas/OCASDiskWriteAheadLog.java | 86 ++++++++++--------- 1 file changed, 45 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/wal/cas/OCASDiskWriteAheadLog.java b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/wal/cas/OCASDiskWriteAheadLog.java index 7e10889a8f5..79b2e6b34e0 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/wal/cas/OCASDiskWriteAheadLog.java +++ b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/wal/cas/OCASDiskWriteAheadLog.java @@ -157,7 +157,7 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog { private final AtomicReference writtenUpTo = new AtomicReference<>(); private long segmentId = -1; - private volatile ScheduledFuture recordsWriterFuture; + private final ScheduledFuture recordsWriterFuture; private final Path masterRecordPath; @@ -187,8 +187,6 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog { private final int fsyncInterval; private volatile long segmentAdditionTs; - private final int commitDelay; - private long currentPosition = 0; private boolean useFirstBuffer = true; @@ -222,8 +220,6 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog { private long reportTs = -1; - private volatile boolean stopWrite = false; - public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, final Path walPath, final int maxPagesCacheSize, final int bufferSize, long segmentsInterval, final long maxSegmentSize, final int commitDelay, final boolean filterWALFiles, final Locale locale, final long walSizeHardLimit, final long freeSpaceLimit, final int fsyncInterval, boolean allowDirectIO, @@ -279,7 +275,7 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f OLogManager.instance().infoNoDb(this, "Page size for WAL located in %s is set to %d bytes.", walLocation.toString(), pageSize); this.maxCacheSize = - multiplyIntsWithOverflowDefault(maxPagesCacheSize, pageSize, DEFAULT_MAX_CACHE_SIZE); + multiplyIntsWithOverflowDefault(maxPagesCacheSize, pageSize); masterRecordPath = walLocation.resolve(storageName + MASTER_RECORD_EXTENSION); masterRecordLSNHolder = FileChannel @@ -313,8 +309,6 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f writtenUpTo.set(new WrittenUpTo(new OLogSequenceNumber(currentSegment, 0), 0)); - this.commitDelay = commitDelay; - writeBufferPointerOne = allocator.allocate(bufferSize1, blockSize); writeBufferOne = writeBufferPointerOne.getNativeByteBuffer().order(ByteOrder.nativeOrder()); @@ -324,15 +318,16 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f log(new OEmptyWALRecord()); this.recordsWriterFuture = commitExecutor - .schedule(new RecordsWriter(false, false, true), commitDelay, TimeUnit.MILLISECONDS); + .scheduleWithFixedDelay(new RecordsWriter(false, false), + commitDelay, commitDelay, TimeUnit.MILLISECONDS); flush(); } - private int multiplyIntsWithOverflowDefault(final int maxPagesCacheSize, final int pageSize, - final int defaultValue) { + private static int multiplyIntsWithOverflowDefault(final int maxPagesCacheSize, + final int pageSize) { long maxCacheSize = (long) maxPagesCacheSize * (long) pageSize; if ((int) maxCacheSize != maxCacheSize) { - return defaultValue; + return OCASDiskWriteAheadLog.DEFAULT_MAX_CACHE_SIZE; } return (int) maxCacheSize; } @@ -513,13 +508,21 @@ private long initSegmentSet(final boolean filterWALFiles, final Locale locale) t final OModifiableLong walSize = new OModifiableLong(); if (filterWALFiles) { - walFiles = Files.find(walLocation, 1, - (Path path, BasicFileAttributes attributes) -> validateName(path.getFileName().toString(), - storageName, locale)); + //noinspection resource + walFiles = + Files.find( + walLocation, + 1, + (Path path, BasicFileAttributes attributes) -> + validateName(path.getFileName().toString(), storageName, locale)); } else { - walFiles = Files.find(walLocation, 1, - (Path path, BasicFileAttributes attrs) -> validateSimpleName( - path.getFileName().toString(), locale)); + //noinspection resource + walFiles = + Files.find( + walLocation, + 1, + (Path path, BasicFileAttributes attrs) -> + validateSimpleName(path.getFileName().toString(), locale)); } if (walFiles == null) { @@ -527,10 +530,15 @@ private long initSegmentSet(final boolean filterWALFiles, final Locale locale) t "Location passed in WAL does not exist, or IO error was happened. DB cannot work in durable mode in such case"); } - walFiles.forEach((Path path) -> { - segments.add(extractSegmentId(path.getFileName().toString())); - walSize.increment(path.toFile().length()); - }); + try { + walFiles.forEach( + (Path path) -> { + segments.add(extractSegmentId(path.getFileName().toString())); + walSize.increment(path.toFile().length()); + }); + } finally { + walFiles.close(); + } return walSize.value; } @@ -1402,14 +1410,17 @@ public void close(final boolean flush) throws IOException { doFlush(true); } - stopWrite = true; + if (!recordsWriterFuture.cancel(false) && !recordsWriterFuture.isDone()) { + throw new OStorageException( + "Can not cancel background " + "WAL task which writes records to the disk"); + } - if (recordsWriterFuture != null) { - try { - recordsWriterFuture.get(); - } catch (InterruptedException | ExecutionException e) { - throw OException.wrapException(new OStorageException("Error during writing of WAL records in storage " + storageName), e); - } + try { + recordsWriterFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw OException.wrapException( + new OStorageException("Error during writing of WAL records in storage " + storageName), + e); } if (writeFuture != null) { @@ -1531,7 +1542,7 @@ public void addSegmentOverflowListener(final OSegmentOverflowListener listener) } private void doFlush(final boolean forceSync) { - final Future future = commitExecutor.submit(new RecordsWriter(forceSync, true, false)); + final Future future = commitExecutor.submit(new RecordsWriter(forceSync, true)); try { future.get(); } catch (final Exception e) { @@ -1783,21 +1794,18 @@ private String getSegmentName(final long segment) { } private final class RecordsWriter implements Runnable { + private final boolean forceSync; private final boolean fullWrite; - private final boolean reschedule; - private RecordsWriter(final boolean forceSync, final boolean fullWrite, boolean reschedule) { + + private RecordsWriter(final boolean forceSync, final boolean fullWrite) { this.forceSync = forceSync; this.fullWrite = fullWrite; - this.reschedule = reschedule; } @Override public void run() { - if (stopWrite) { - return; - } try { if (printPerformanceStatistic) { printReport(); @@ -2045,7 +2053,7 @@ assert record != null; while (counter < cqSize) { final OPair pair = fileCloseQueue.poll(); if (pair != null) { - @SuppressWarnings("resource") final OWALFile file = pair.value; + final OWALFile file = pair.value; assert file.position() % pageSize == 0; @@ -2099,10 +2107,6 @@ assert record != null; } catch (final RuntimeException | Error e) { OLogManager.instance().errorNoDb(this, "Error during WAL writing", e); throw e; - } finally { - if (reschedule && !stopWrite) { - recordsWriterFuture = commitExecutor.schedule(this, commitDelay, TimeUnit.MILLISECONDS); - } } }