Skip to content

Commit

Permalink
Issue #9270 . Ensure that file writes are finished on close.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii0lomakin committed Jul 14, 2020
1 parent 0a28a1a commit 120df39
Showing 1 changed file with 45 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog {
private final AtomicReference<WrittenUpTo> writtenUpTo = new AtomicReference<>();
private long segmentId = -1;

private volatile ScheduledFuture<?> recordsWriterFuture;
private final ScheduledFuture<?> recordsWriterFuture;

private final Path masterRecordPath;

Expand Down Expand Up @@ -187,8 +187,6 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog {
private final int fsyncInterval;
private volatile long segmentAdditionTs;

private final int commitDelay;

private long currentPosition = 0;

private boolean useFirstBuffer = true;
Expand Down Expand Up @@ -222,8 +220,6 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog {

private long reportTs = -1;

private volatile boolean stopWrite = false;

public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, final Path walPath, final int maxPagesCacheSize,
final int bufferSize, long segmentsInterval, final long maxSegmentSize, final int commitDelay, final boolean filterWALFiles,
final Locale locale, final long walSizeHardLimit, final long freeSpaceLimit, final int fsyncInterval, boolean allowDirectIO,
Expand Down Expand Up @@ -279,7 +275,7 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f
OLogManager.instance().infoNoDb(this, "Page size for WAL located in %s is set to %d bytes.", walLocation.toString(), pageSize);

this.maxCacheSize =
multiplyIntsWithOverflowDefault(maxPagesCacheSize, pageSize, DEFAULT_MAX_CACHE_SIZE);
multiplyIntsWithOverflowDefault(maxPagesCacheSize, pageSize);

masterRecordPath = walLocation.resolve(storageName + MASTER_RECORD_EXTENSION);
masterRecordLSNHolder = FileChannel
Expand Down Expand Up @@ -313,8 +309,6 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f

writtenUpTo.set(new WrittenUpTo(new OLogSequenceNumber(currentSegment, 0), 0));

this.commitDelay = commitDelay;

writeBufferPointerOne = allocator.allocate(bufferSize1, blockSize);
writeBufferOne = writeBufferPointerOne.getNativeByteBuffer().order(ByteOrder.nativeOrder());

Expand All @@ -324,15 +318,16 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f
log(new OEmptyWALRecord());

this.recordsWriterFuture = commitExecutor
.schedule(new RecordsWriter(false, false, true), commitDelay, TimeUnit.MILLISECONDS);
.scheduleWithFixedDelay(new RecordsWriter(false, false),
commitDelay, commitDelay, TimeUnit.MILLISECONDS);
flush();
}

private int multiplyIntsWithOverflowDefault(final int maxPagesCacheSize, final int pageSize,
final int defaultValue) {
private static int multiplyIntsWithOverflowDefault(final int maxPagesCacheSize,
final int pageSize) {
long maxCacheSize = (long) maxPagesCacheSize * (long) pageSize;
if ((int) maxCacheSize != maxCacheSize) {
return defaultValue;
return OCASDiskWriteAheadLog.DEFAULT_MAX_CACHE_SIZE;
}
return (int) maxCacheSize;
}
Expand Down Expand Up @@ -513,24 +508,37 @@ private long initSegmentSet(final boolean filterWALFiles, final Locale locale) t

final OModifiableLong walSize = new OModifiableLong();
if (filterWALFiles) {
walFiles = Files.find(walLocation, 1,
(Path path, BasicFileAttributes attributes) -> validateName(path.getFileName().toString(),
storageName, locale));
//noinspection resource
walFiles =
Files.find(
walLocation,
1,
(Path path, BasicFileAttributes attributes) ->
validateName(path.getFileName().toString(), storageName, locale));
} else {
walFiles = Files.find(walLocation, 1,
(Path path, BasicFileAttributes attrs) -> validateSimpleName(
path.getFileName().toString(), locale));
//noinspection resource
walFiles =
Files.find(
walLocation,
1,
(Path path, BasicFileAttributes attrs) ->
validateSimpleName(path.getFileName().toString(), locale));
}

if (walFiles == null) {
throw new IllegalStateException(
"Location passed in WAL does not exist, or IO error was happened. DB cannot work in durable mode in such case");
}

walFiles.forEach((Path path) -> {
segments.add(extractSegmentId(path.getFileName().toString()));
walSize.increment(path.toFile().length());
});
try {
walFiles.forEach(
(Path path) -> {
segments.add(extractSegmentId(path.getFileName().toString()));
walSize.increment(path.toFile().length());
});
} finally {
walFiles.close();
}

return walSize.value;
}
Expand Down Expand Up @@ -1402,14 +1410,17 @@ public void close(final boolean flush) throws IOException {
doFlush(true);
}

stopWrite = true;
if (!recordsWriterFuture.cancel(false) && !recordsWriterFuture.isDone()) {
throw new OStorageException(
"Can not cancel background " + "WAL task which writes records to the disk");
}

if (recordsWriterFuture != null) {
try {
recordsWriterFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw OException.wrapException(new OStorageException("Error during writing of WAL records in storage " + storageName), e);
}
try {
recordsWriterFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw OException.wrapException(
new OStorageException("Error during writing of WAL records in storage " + storageName),
e);
}

if (writeFuture != null) {
Expand Down Expand Up @@ -1531,7 +1542,7 @@ public void addSegmentOverflowListener(final OSegmentOverflowListener listener)
}

private void doFlush(final boolean forceSync) {
final Future<?> future = commitExecutor.submit(new RecordsWriter(forceSync, true, false));
final Future<?> future = commitExecutor.submit(new RecordsWriter(forceSync, true));
try {
future.get();
} catch (final Exception e) {
Expand Down Expand Up @@ -1783,21 +1794,18 @@ private String getSegmentName(final long segment) {
}

private final class RecordsWriter implements Runnable {

private final boolean forceSync;
private final boolean fullWrite;
private final boolean reschedule;

private RecordsWriter(final boolean forceSync, final boolean fullWrite, boolean reschedule) {

private RecordsWriter(final boolean forceSync, final boolean fullWrite) {
this.forceSync = forceSync;
this.fullWrite = fullWrite;
this.reschedule = reschedule;
}

@Override
public void run() {
if (stopWrite) {
return;
}
try {
if (printPerformanceStatistic) {
printReport();
Expand Down Expand Up @@ -2045,7 +2053,7 @@ assert record != null;
while (counter < cqSize) {
final OPair<Long, OWALFile> pair = fileCloseQueue.poll();
if (pair != null) {
@SuppressWarnings("resource") final OWALFile file = pair.value;
final OWALFile file = pair.value;

assert file.position() % pageSize == 0;

Expand Down Expand Up @@ -2099,10 +2107,6 @@ assert record != null;
} catch (final RuntimeException | Error e) {
OLogManager.instance().errorNoDb(this, "Error during WAL writing", e);
throw e;
} finally {
if (reschedule && !stopWrite) {
recordsWriterFuture = commitExecutor.schedule(this, commitDelay, TimeUnit.MILLISECONDS);
}
}
}

Expand Down

0 comments on commit 120df39

Please sign in to comment.