Skip to content

Commit

Permalink
Issue #9270, new way of managing of WAL buffer was introduced.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii0lomakin committed Jun 26, 2020
1 parent aabad7d commit c0b3518
Showing 1 changed file with 101 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.orientechnologies.orient.core.storage.impl.local.paginated.wal.cas;

import com.orientechnologies.common.concur.lock.OInterruptedException;
import com.orientechnologies.common.concur.lock.ScalableRWLock;
import com.orientechnologies.common.directmemory.ODirectMemoryAllocator;
import com.orientechnologies.common.directmemory.OPointer;
Expand Down Expand Up @@ -70,7 +71,8 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog {
return thread;
});

writeExecutor = new OThreadPoolExecutorWithLogging(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), r -> {
writeExecutor = new OThreadPoolExecutorWithLogging(1, 1,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), r -> {
final Thread thread = new Thread(OStorageAbstract.storageThreadGroup, r);
thread.setDaemon(true);
thread.setName("OrientDB WAL Write Task Thread)");
Expand Down Expand Up @@ -149,7 +151,7 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog {
private final AtomicInteger fileCloseQueueSize = new AtomicInteger();

private final AtomicReference<CountDownLatch> flushLatch = new AtomicReference<>(new CountDownLatch(0));
private volatile Future<?> writeFuture = null;
private volatile Future<?> latestWriteFuture = null;
//not volatile because used only inside of write thread.
private OLogSequenceNumber writtenCheckpoint = null;

Expand All @@ -161,17 +163,11 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog {

private long currentPosition = 0;

private boolean useFirstBuffer = true;

private ByteBuffer writeBuffer = null;
private OPointer writeBufferPointer = null;
private int writeBufferPageIndex = -1;

private final ByteBuffer writeBufferOne;
private final OPointer writeBufferPointerOne;

private final ByteBuffer writeBufferTwo;
private final OPointer writeBufferPointerTwo;
private final ArrayBlockingQueue<OPointer> pointersPool = new ArrayBlockingQueue<>(2);

private OLogSequenceNumber lastLSN = null;
private OLogSequenceNumber checkPointLSN = null;
Expand Down Expand Up @@ -200,7 +196,7 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f
boolean keepSingleWALSegment, boolean callFsync, boolean printPerformanceStatistic, int statisticPrintInterval)
throws IOException {

int bufferSize1 = bufferSize * 1024 * 1024;
final int bufferSizeInBytes = bufferSize * 1024 * 1024;

this.segmentsInterval = segmentsInterval;
this.keepSingleWALSegment = keepSingleWALSegment;
Expand Down Expand Up @@ -248,7 +244,7 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f

OLogManager.instance().infoNoDb(this, "Page size for WAL located in %s is set to %d bytes.", walLocation.toString(), pageSize);

this.maxCacheSize = multiplyIntsWithOverflowDefault(maxPagesCacheSize, pageSize, DEFAULT_MAX_CACHE_SIZE);
this.maxCacheSize = multiplyIntsWithOverflowDefault(maxPagesCacheSize, pageSize);

masterRecordPath = walLocation.resolve(storageName + MASTER_RECORD_EXTENSION);
masterRecordLSNHolder = FileChannel
Expand Down Expand Up @@ -284,22 +280,27 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f

this.commitDelay = commitDelay;

writeBufferPointerOne = allocator.allocate(bufferSize1, blockSize);
writeBufferOne = writeBufferPointerOne.getNativeByteBuffer().order(ByteOrder.nativeOrder());
for (int i = 0; i < 2; i++) {
try {
pointersPool.put(allocator.allocate(bufferSizeInBytes, blockSize));
} catch (final InterruptedException interruptedException) {
throw OException.wrapException(
new OInterruptedException("WAL initialization was interrupted"), interruptedException);
}
}

writeBufferPointerTwo = allocator.allocate(bufferSize1, blockSize);
writeBufferTwo = writeBufferPointerTwo.getNativeByteBuffer().order(ByteOrder.nativeOrder());

log(new OEmptyWALRecord());

this.recordsWriterFuture = commitExecutor.schedule(new RecordsWriter(false, false, true), commitDelay, TimeUnit.MILLISECONDS);
flush();
}

private int multiplyIntsWithOverflowDefault(final int maxPagesCacheSize, final int pageSize, final int defaultValue) {
private static int multiplyIntsWithOverflowDefault(final int maxPagesCacheSize,
final int pageSize) {
long maxCacheSize = (long)maxPagesCacheSize * (long)pageSize;
if ((int)maxCacheSize != maxCacheSize) {
return defaultValue;
return OCASDiskWriteAheadLog.DEFAULT_MAX_CACHE_SIZE;
}
return (int)maxCacheSize;
}
Expand Down Expand Up @@ -476,24 +477,44 @@ private OLogSequenceNumber readMasterRecord(final int index) throws IOException
}

private long initSegmentSet(final boolean filterWALFiles, final Locale locale) throws IOException {
final Stream<Path> walFiles;
Stream<Path> walFiles = null;

final OModifiableLong walSize = new OModifiableLong();
if (filterWALFiles)
walFiles = Files.find(walLocation, 1,
(Path path, BasicFileAttributes attributes) -> validateName(path.getFileName().toString(), storageName, locale));
else
walFiles = Files.find(walLocation, 1,
(Path path, BasicFileAttributes attrs) -> validateSimpleName(path.getFileName().toString(), locale));

if (walFiles == null)
throw new IllegalStateException(
"Location passed in WAL does not exist, or IO error was happened. DB cannot work in durable mode in such case");

walFiles.forEach((Path path) -> {
segments.add(extractSegmentId(path.getFileName().toString()));
walSize.increment(path.toFile().length());
});
try{
if (filterWALFiles) {
//noinspection resource
walFiles =
Files.find(
walLocation,
1,
(Path path, BasicFileAttributes attributes) ->
validateName(path.getFileName().toString(), storageName, locale));
} else {
//noinspection resource
walFiles =
Files.find(
walLocation,
1,
(Path path, BasicFileAttributes attrs) ->
validateSimpleName(path.getFileName().toString(), locale));
}

if (walFiles == null) {
throw new IllegalStateException(
"Location passed in WAL does not exist, or IO error was happened. "
+ "DB cannot work in durable mode in such case");
}

walFiles.forEach((Path path) -> {
segments.add(extractSegmentId(path.getFileName().toString()));
walSize.increment(path.toFile().length());
});
} finally{
if (walFiles != null) {
walFiles.close();
}
}


return walSize.value;
}
Expand Down Expand Up @@ -645,7 +666,7 @@ record = recordCursor.getItem();
}

private void waitTillWriteWillBeFinished() {
final Future<?> wf = writeFuture;
final Future<?> wf = latestWriteFuture;
if (wf != null) {
try {
wf.get();
Expand Down Expand Up @@ -1375,9 +1396,9 @@ public void close(final boolean flush) throws IOException {
}
}

if (writeFuture != null) {
if (latestWriteFuture != null) {
try {
writeFuture.get();
latestWriteFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw OException.wrapException(new OStorageException("Error during writing of WAL records in storage " + storageName), e);
}
Expand All @@ -1393,8 +1414,8 @@ record = records.poll();
}

try {
if (writeFuture != null) {
writeFuture.get();
if (latestWriteFuture != null) {
latestWriteFuture.get();
}

} catch (final InterruptedException e) {
Expand Down Expand Up @@ -1425,8 +1446,18 @@ record = records.poll();
segments.clear();
fileCloseQueue.clear();

allocator.deallocate(writeBufferPointerOne);
allocator.deallocate(writeBufferPointerTwo);
for (int i = 0; i < 2; i++) {
try {
final OPointer pointer = pointersPool.poll(1, TimeUnit.MINUTES);
if (pointer == null) {
throw new OStorageException("Can not clear memory allocated by WAL buffer");
}
allocator.deallocate(pointer);
} catch (InterruptedException interruptedException) {
throw OException.wrapException(new OInterruptedException("WAL close was interrupted"),
interruptedException);
}
}

if (writeBufferPointer != null) {
writeBufferPointer = null;
Expand Down Expand Up @@ -1822,7 +1853,7 @@ assert record != null;
if (segmentId != lsn.getSegment()) {
if (walFile != null) {
if (writeBufferPointer != null) {
writeBuffer(walFile, writeBuffer, lastLSN, checkPointLSN);
writeBuffer(walFile, writeBuffer, writeBufferPointer, lastLSN, checkPointLSN);
}

writeBufferPointer = null;
Expand All @@ -1833,8 +1864,8 @@ assert record != null;
lastLSN = null;

try {
if (writeFuture != null) {
writeFuture.get();
if (latestWriteFuture != null) {
latestWriteFuture.get();
}
} catch (final InterruptedException e) {
OLogManager.instance().errorNoDb(this, "WAL write was interrupted", e);
Expand Down Expand Up @@ -1872,20 +1903,24 @@ assert record != null;
if (writeBuffer == null || writeBuffer.remaining() == 0) {
if (writeBufferPointer != null) {
assert writeBuffer != null;
writeBuffer(walFile, writeBuffer, lastLSN, checkPointLSN);
writeBuffer(walFile, writeBuffer, writeBufferPointer,
lastLSN, checkPointLSN);
}

if (useFirstBuffer) {
writeBufferPointer = writeBufferPointerOne;
writeBuffer = writeBufferOne;
} else {
writeBufferPointer = writeBufferPointerTwo;
writeBuffer = writeBufferTwo;
try {
writeBufferPointer = pointersPool.take();
} catch (InterruptedException interruptedException) {
throw
OException.wrapException(
new OInterruptedException(
"Writing of data into the WAL was interrupted"),
interruptedException);
}
writeBuffer = writeBufferPointer.getNativeByteBuffer();

writeBuffer.limit((writeBuffer.capacity() / pageSize) * pageSize);

writeBuffer.limit(writeBuffer.capacity());
writeBuffer.rewind();
useFirstBuffer = !useFirstBuffer;

writeBufferPageIndex = -1;

Expand Down Expand Up @@ -1960,7 +1995,7 @@ assert record != null;
}

if ((makeFSync || fullWrite) && writeBufferPointer != null) {
writeBuffer(walFile, writeBuffer, lastLSN, checkPointLSN);
writeBuffer(walFile, writeBuffer, writeBufferPointer, lastLSN, checkPointLSN);

writeBufferPointer = null;
writeBuffer = null;
Expand All @@ -1983,16 +2018,16 @@ assert record != null;
if (makeFSync) {
try {
try {
if (writeFuture != null) {
writeFuture.get();
if (latestWriteFuture != null) {
latestWriteFuture.get();
}
} catch (final InterruptedException e) {
OLogManager.instance().errorNoDb(this, "WAL write was interrupted", e);
}

assert walFile.position() == currentPosition;

writeFuture = writeExecutor.submit((Callable<?>) () -> {
latestWriteFuture = writeExecutor.submit((Callable<?>) () -> {
try {
long startTs = 0;
if (printPerformanceStatistic) {
Expand All @@ -2006,7 +2041,6 @@ assert record != null;
while (counter < cqSize) {
final OPair<Long, OWALFile> pair = fileCloseQueue.poll();
if (pair != null) {
@SuppressWarnings("resource")
final OWALFile file = pair.value;

assert file.position() % pageSize == 0;
Expand Down Expand Up @@ -2068,7 +2102,10 @@ assert record != null;
}
}

private void writeBuffer(final OWALFile file, final ByteBuffer buffer, final OLogSequenceNumber lastLSN,
private void writeBuffer(final OWALFile file,
final ByteBuffer buffer,
final OPointer bufferPointer,
final OLogSequenceNumber lastLSN,
final OLogSequenceNumber checkpointLSN) throws IOException {

if (buffer.position() <= OCASWALPage.RECORDS_OFFSET) {
Expand Down Expand Up @@ -2111,23 +2148,12 @@ private void writeBuffer(final OWALFile file, final ByteBuffer buffer, final OLo
final int limit = maxPage * pageSize;
buffer.limit(limit);

try {
if (writeFuture != null) {
writeFuture.get();
}
} catch (final InterruptedException e) {
OLogManager.instance().errorNoDb(this, "WAL write was interrupted", e);
} catch (final Exception e) {
OLogManager.instance().errorNoDb(this, "Error during WAL write", e);
throw OException.wrapException(new OStorageException("Error during WAL data write"), e);
}

assert file.position() == currentPosition;
currentPosition += buffer.limit();

final long expectedPosition = currentPosition;

writeFuture = writeExecutor.submit((Callable<?>) () -> {
latestWriteFuture = writeExecutor.submit((Callable<?>) () -> {
try {
long startTs = 0;
if (printPerformanceStatistic) {
Expand Down Expand Up @@ -2175,6 +2201,11 @@ private void writeBuffer(final OWALFile file, final ByteBuffer buffer, final OLo
//noinspection NonAtomicOperationOnVolatileField
bytesWrittenTime += (endTs - startTs);
}

final boolean added = pointersPool.offer(writeBufferPointer);
if (!added) {
throw new OStorageException("Can not return byte buffer back to the pool.");
}
} catch (final IOException e) {
OLogManager.instance().errorNoDb(this, "Error during WAL data write", e);
throw e;
Expand Down

0 comments on commit c0b3518

Please sign in to comment.