diff --git a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/wal/cas/OCASDiskWriteAheadLog.java b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/wal/cas/OCASDiskWriteAheadLog.java index b69f4f8b2de..24518013d51 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/wal/cas/OCASDiskWriteAheadLog.java +++ b/core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/wal/cas/OCASDiskWriteAheadLog.java @@ -1,5 +1,6 @@ package com.orientechnologies.orient.core.storage.impl.local.paginated.wal.cas; +import com.orientechnologies.common.concur.lock.OInterruptedException; import com.orientechnologies.common.concur.lock.ScalableRWLock; import com.orientechnologies.common.directmemory.ODirectMemoryAllocator; import com.orientechnologies.common.directmemory.OPointer; @@ -70,7 +71,8 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog { return thread; }); - writeExecutor = new OThreadPoolExecutorWithLogging(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), r -> { + writeExecutor = new OThreadPoolExecutorWithLogging(1, 1, + 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), r -> { final Thread thread = new Thread(OStorageAbstract.storageThreadGroup, r); thread.setDaemon(true); thread.setName("OrientDB WAL Write Task Thread)"); @@ -149,7 +151,7 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog { private final AtomicInteger fileCloseQueueSize = new AtomicInteger(); private final AtomicReference flushLatch = new AtomicReference<>(new CountDownLatch(0)); - private volatile Future writeFuture = null; + private volatile Future latestWriteFuture = null; //not volatile because used only inside of write thread. private OLogSequenceNumber writtenCheckpoint = null; @@ -161,17 +163,11 @@ public final class OCASDiskWriteAheadLog implements OWriteAheadLog { private long currentPosition = 0; - private boolean useFirstBuffer = true; - private ByteBuffer writeBuffer = null; private OPointer writeBufferPointer = null; private int writeBufferPageIndex = -1; - private final ByteBuffer writeBufferOne; - private final OPointer writeBufferPointerOne; - - private final ByteBuffer writeBufferTwo; - private final OPointer writeBufferPointerTwo; + private final ArrayBlockingQueue pointersPool = new ArrayBlockingQueue<>(2); private OLogSequenceNumber lastLSN = null; private OLogSequenceNumber checkPointLSN = null; @@ -200,7 +196,7 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f boolean keepSingleWALSegment, boolean callFsync, boolean printPerformanceStatistic, int statisticPrintInterval) throws IOException { - int bufferSize1 = bufferSize * 1024 * 1024; + final int bufferSizeInBytes = bufferSize * 1024 * 1024; this.segmentsInterval = segmentsInterval; this.keepSingleWALSegment = keepSingleWALSegment; @@ -248,7 +244,7 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f OLogManager.instance().infoNoDb(this, "Page size for WAL located in %s is set to %d bytes.", walLocation.toString(), pageSize); - this.maxCacheSize = multiplyIntsWithOverflowDefault(maxPagesCacheSize, pageSize, DEFAULT_MAX_CACHE_SIZE); + this.maxCacheSize = multiplyIntsWithOverflowDefault(maxPagesCacheSize, pageSize); masterRecordPath = walLocation.resolve(storageName + MASTER_RECORD_EXTENSION); masterRecordLSNHolder = FileChannel @@ -284,11 +280,15 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f this.commitDelay = commitDelay; - writeBufferPointerOne = allocator.allocate(bufferSize1, blockSize); - writeBufferOne = writeBufferPointerOne.getNativeByteBuffer().order(ByteOrder.nativeOrder()); + for (int i = 0; i < 2; i++) { + try { + pointersPool.put(allocator.allocate(bufferSizeInBytes, blockSize)); + } catch (final InterruptedException interruptedException) { + throw OException.wrapException( + new OInterruptedException("WAL initialization was interrupted"), interruptedException); + } + } - writeBufferPointerTwo = allocator.allocate(bufferSize1, blockSize); - writeBufferTwo = writeBufferPointerTwo.getNativeByteBuffer().order(ByteOrder.nativeOrder()); log(new OEmptyWALRecord()); @@ -296,10 +296,11 @@ public OCASDiskWriteAheadLog(final String storageName, final Path storagePath, f flush(); } - private int multiplyIntsWithOverflowDefault(final int maxPagesCacheSize, final int pageSize, final int defaultValue) { + private static int multiplyIntsWithOverflowDefault(final int maxPagesCacheSize, + final int pageSize) { long maxCacheSize = (long)maxPagesCacheSize * (long)pageSize; if ((int)maxCacheSize != maxCacheSize) { - return defaultValue; + return OCASDiskWriteAheadLog.DEFAULT_MAX_CACHE_SIZE; } return (int)maxCacheSize; } @@ -476,24 +477,44 @@ private OLogSequenceNumber readMasterRecord(final int index) throws IOException } private long initSegmentSet(final boolean filterWALFiles, final Locale locale) throws IOException { - final Stream walFiles; + Stream walFiles = null; final OModifiableLong walSize = new OModifiableLong(); - if (filterWALFiles) - walFiles = Files.find(walLocation, 1, - (Path path, BasicFileAttributes attributes) -> validateName(path.getFileName().toString(), storageName, locale)); - else - walFiles = Files.find(walLocation, 1, - (Path path, BasicFileAttributes attrs) -> validateSimpleName(path.getFileName().toString(), locale)); - - if (walFiles == null) - throw new IllegalStateException( - "Location passed in WAL does not exist, or IO error was happened. DB cannot work in durable mode in such case"); - - walFiles.forEach((Path path) -> { - segments.add(extractSegmentId(path.getFileName().toString())); - walSize.increment(path.toFile().length()); - }); + try{ + if (filterWALFiles) { + //noinspection resource + walFiles = + Files.find( + walLocation, + 1, + (Path path, BasicFileAttributes attributes) -> + validateName(path.getFileName().toString(), storageName, locale)); + } else { + //noinspection resource + walFiles = + Files.find( + walLocation, + 1, + (Path path, BasicFileAttributes attrs) -> + validateSimpleName(path.getFileName().toString(), locale)); + } + + if (walFiles == null) { + throw new IllegalStateException( + "Location passed in WAL does not exist, or IO error was happened. " + + "DB cannot work in durable mode in such case"); + } + + walFiles.forEach((Path path) -> { + segments.add(extractSegmentId(path.getFileName().toString())); + walSize.increment(path.toFile().length()); + }); + } finally{ + if (walFiles != null) { + walFiles.close(); + } + } + return walSize.value; } @@ -645,7 +666,7 @@ record = recordCursor.getItem(); } private void waitTillWriteWillBeFinished() { - final Future wf = writeFuture; + final Future wf = latestWriteFuture; if (wf != null) { try { wf.get(); @@ -1375,9 +1396,9 @@ public void close(final boolean flush) throws IOException { } } - if (writeFuture != null) { + if (latestWriteFuture != null) { try { - writeFuture.get(); + latestWriteFuture.get(); } catch (InterruptedException | ExecutionException e) { throw OException.wrapException(new OStorageException("Error during writing of WAL records in storage " + storageName), e); } @@ -1393,8 +1414,8 @@ record = records.poll(); } try { - if (writeFuture != null) { - writeFuture.get(); + if (latestWriteFuture != null) { + latestWriteFuture.get(); } } catch (final InterruptedException e) { @@ -1425,8 +1446,18 @@ record = records.poll(); segments.clear(); fileCloseQueue.clear(); - allocator.deallocate(writeBufferPointerOne); - allocator.deallocate(writeBufferPointerTwo); + for (int i = 0; i < 2; i++) { + try { + final OPointer pointer = pointersPool.poll(1, TimeUnit.MINUTES); + if (pointer == null) { + throw new OStorageException("Can not clear memory allocated by WAL buffer"); + } + allocator.deallocate(pointer); + } catch (InterruptedException interruptedException) { + throw OException.wrapException(new OInterruptedException("WAL close was interrupted"), + interruptedException); + } + } if (writeBufferPointer != null) { writeBufferPointer = null; @@ -1822,7 +1853,7 @@ assert record != null; if (segmentId != lsn.getSegment()) { if (walFile != null) { if (writeBufferPointer != null) { - writeBuffer(walFile, writeBuffer, lastLSN, checkPointLSN); + writeBuffer(walFile, writeBuffer, writeBufferPointer, lastLSN, checkPointLSN); } writeBufferPointer = null; @@ -1833,8 +1864,8 @@ assert record != null; lastLSN = null; try { - if (writeFuture != null) { - writeFuture.get(); + if (latestWriteFuture != null) { + latestWriteFuture.get(); } } catch (final InterruptedException e) { OLogManager.instance().errorNoDb(this, "WAL write was interrupted", e); @@ -1872,20 +1903,24 @@ assert record != null; if (writeBuffer == null || writeBuffer.remaining() == 0) { if (writeBufferPointer != null) { assert writeBuffer != null; - writeBuffer(walFile, writeBuffer, lastLSN, checkPointLSN); + writeBuffer(walFile, writeBuffer, writeBufferPointer, + lastLSN, checkPointLSN); } - if (useFirstBuffer) { - writeBufferPointer = writeBufferPointerOne; - writeBuffer = writeBufferOne; - } else { - writeBufferPointer = writeBufferPointerTwo; - writeBuffer = writeBufferTwo; + try { + writeBufferPointer = pointersPool.take(); + } catch (InterruptedException interruptedException) { + throw + OException.wrapException( + new OInterruptedException( + "Writing of data into the WAL was interrupted"), + interruptedException); } + writeBuffer = writeBufferPointer.getNativeByteBuffer(); + + writeBuffer.limit((writeBuffer.capacity() / pageSize) * pageSize); - writeBuffer.limit(writeBuffer.capacity()); writeBuffer.rewind(); - useFirstBuffer = !useFirstBuffer; writeBufferPageIndex = -1; @@ -1960,7 +1995,7 @@ assert record != null; } if ((makeFSync || fullWrite) && writeBufferPointer != null) { - writeBuffer(walFile, writeBuffer, lastLSN, checkPointLSN); + writeBuffer(walFile, writeBuffer, writeBufferPointer, lastLSN, checkPointLSN); writeBufferPointer = null; writeBuffer = null; @@ -1983,8 +2018,8 @@ assert record != null; if (makeFSync) { try { try { - if (writeFuture != null) { - writeFuture.get(); + if (latestWriteFuture != null) { + latestWriteFuture.get(); } } catch (final InterruptedException e) { OLogManager.instance().errorNoDb(this, "WAL write was interrupted", e); @@ -1992,7 +2027,7 @@ assert record != null; assert walFile.position() == currentPosition; - writeFuture = writeExecutor.submit((Callable) () -> { + latestWriteFuture = writeExecutor.submit((Callable) () -> { try { long startTs = 0; if (printPerformanceStatistic) { @@ -2006,7 +2041,6 @@ assert record != null; while (counter < cqSize) { final OPair pair = fileCloseQueue.poll(); if (pair != null) { - @SuppressWarnings("resource") final OWALFile file = pair.value; assert file.position() % pageSize == 0; @@ -2068,7 +2102,10 @@ assert record != null; } } - private void writeBuffer(final OWALFile file, final ByteBuffer buffer, final OLogSequenceNumber lastLSN, + private void writeBuffer(final OWALFile file, + final ByteBuffer buffer, + final OPointer bufferPointer, + final OLogSequenceNumber lastLSN, final OLogSequenceNumber checkpointLSN) throws IOException { if (buffer.position() <= OCASWALPage.RECORDS_OFFSET) { @@ -2111,23 +2148,12 @@ private void writeBuffer(final OWALFile file, final ByteBuffer buffer, final OLo final int limit = maxPage * pageSize; buffer.limit(limit); - try { - if (writeFuture != null) { - writeFuture.get(); - } - } catch (final InterruptedException e) { - OLogManager.instance().errorNoDb(this, "WAL write was interrupted", e); - } catch (final Exception e) { - OLogManager.instance().errorNoDb(this, "Error during WAL write", e); - throw OException.wrapException(new OStorageException("Error during WAL data write"), e); - } - assert file.position() == currentPosition; currentPosition += buffer.limit(); final long expectedPosition = currentPosition; - writeFuture = writeExecutor.submit((Callable) () -> { + latestWriteFuture = writeExecutor.submit((Callable) () -> { try { long startTs = 0; if (printPerformanceStatistic) { @@ -2175,6 +2201,11 @@ private void writeBuffer(final OWALFile file, final ByteBuffer buffer, final OLo //noinspection NonAtomicOperationOnVolatileField bytesWrittenTime += (endTs - startTs); } + + final boolean added = pointersPool.offer(writeBufferPointer); + if (!added) { + throw new OStorageException("Can not return byte buffer back to the pool."); + } } catch (final IOException e) { OLogManager.instance().errorNoDb(this, "Error during WAL data write", e); throw e;