Skip to content

Commit

Permalink
Issue #7558 was fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii0lomakin committed Jul 17, 2017
1 parent 39edd7d commit ba6c02a
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.orientechnologies.orient.core.exception;

import com.orientechnologies.common.exception.OSystemException;
import com.orientechnologies.orient.core.storage.cache.OWriteCache;

/**
* This exception is thrown if it is impossible to read data from file which contains state of 2Q cache.
*
* @see com.orientechnologies.orient.core.storage.cache.local.twoq.O2QCache#loadCacheState(OWriteCache)
*/
public class OLoadCacheStateException extends OSystemException {
public OLoadCacheStateException(OSystemException exception) {
super(exception);
}

public OLoadCacheStateException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.orientechnologies.common.util.OPair;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.exception.OAllCacheEntriesAreUsedException;
import com.orientechnologies.orient.core.exception.OLoadCacheStateException;
import com.orientechnologies.orient.core.exception.OReadCacheException;
import com.orientechnologies.orient.core.exception.OStorageException;
import com.orientechnologies.orient.core.storage.cache.*;
Expand Down Expand Up @@ -678,9 +679,11 @@ public void loadCacheState(final OWriteCache writeCache) {
}

}
} catch (OLoadCacheStateException lcse) {
OLogManager.instance().warn(this, "Cannot restore state of cache for storage placed under " + writeCache.getRootDirectory());
} catch (Exception e) {
OLogManager.instance()
.warn(this, "Cannot restore state of cache for storage placed under %s", writeCache.getRootDirectory(), e);
throw OException.wrapException(
new OStorageException("Cannot restore state of cache for storage placed under " + writeCache.getRootDirectory()), e);
} finally {
cacheLock.releaseWriteLock();
}
Expand Down Expand Up @@ -724,34 +727,49 @@ private void restoreQueue(OWriteCache writeCache, LRUList queue, DataInputStream
*/
private void restoreQueueWithoutPageLoad(OWriteCache writeCache, LRUList queue, DataInputStream dataInputStream)
throws IOException {
// used only for statistics, and there is passed merely as stub
final OModifiableBoolean cacheHit = new OModifiableBoolean();

int internalFileId = dataInputStream.readInt();
while (internalFileId >= 0) {
final long pageIndex = dataInputStream.readLong();
try {
final long fileId = writeCache.externalFileId(internalFileId);
final OCacheEntry cacheEntry = new OCacheEntry(fileId, pageIndex, null, false);
//this set is only needed to rollback changes in case of IO Exception
final Set<PageKey> addedPages = new HashSet<PageKey>();

try {
int internalFileId = dataInputStream.readInt();

while (internalFileId >= 0) {
final long pageIndex = dataInputStream.readLong();
try {
final long fileId = writeCache.externalFileId(internalFileId);
if (get(fileId, pageIndex, true) == null && !pinnedPages.containsKey(new PinnedPage(fileId, pageIndex))) {
final OCacheEntry cacheEntry = new OCacheEntry(fileId, pageIndex, null, false);

Set<Long> pages = filePages.get(fileId);
if (pages == null) {
pages = new HashSet<Long>();
Set<Long> pages = filePages.get(fileId);
if (pages == null) {
pages = new HashSet<Long>();

Set<Long> op = filePages.putIfAbsent(fileId, pages);
if (op != null) {
pages = op;
Set<Long> op = filePages.putIfAbsent(fileId, pages);
if (op != null) {
pages = op;
}
}

queue.putToMRU(cacheEntry);
pages.add(cacheEntry.getPageIndex());

addedPages.add(new PageKey(fileId, pageIndex));
removeColdPagesWithCacheLock();
}
} finally {
internalFileId = dataInputStream.readInt();
}
}
} catch (IOException e) {
for (PageKey pageKey : addedPages) {
queue.remove(pageKey.fileId, pageKey.pageIndex);

queue.putToMRU(cacheEntry);
pages.add(cacheEntry.getPageIndex());

//truncate tail of the queue if queue will exceed limit is set by configuration
removeColdPagesWithCacheLock();
} finally {
internalFileId = dataInputStream.readInt();
final Set<Long> pages = filePages.get(pageKey.fileId);
pages.remove(pageKey.pageIndex);
}

throw OException.wrapException(new OLoadCacheStateException("Can not restore state of cache from file"), e);
}
}

Expand All @@ -777,40 +795,53 @@ private void restoreQueueWithPageLoad(OWriteCache writeCache, LRUList queue, Dat
// then to put data into the queue to restore position of entries in LRU list.
final TreeSet<PageKey> filePositions = new TreeSet<PageKey>();

int internalFileId = dataInputStream.readInt();
while (internalFileId >= 0) {
final long pageIndex = dataInputStream.readLong();
try {
final long fileId = writeCache.externalFileId(internalFileId);
try {
int internalFileId = dataInputStream.readInt();

while (internalFileId >= 0) {
final long pageIndex = dataInputStream.readLong();
try {
final long fileId = writeCache.externalFileId(internalFileId);

//we replace only pages which are not loaded yet
if (queue.get(fileId, pageIndex) == null) {
filePositions.add(new PageKey(fileId, pageIndex));
//we replace only pages which are not loaded yet
if (get(fileId, pageIndex, true) == null && !pinnedPages.containsKey(new PinnedPage(fileId, pageIndex))) {
filePositions.add(new PageKey(fileId, pageIndex));

//we put placeholder to the queue, later we will replace it with real data
//it is done to prevent cases when disk cache size will exceed limits
//set by configuration
final OCacheEntry cacheEntry = new OCacheEntry(fileId, pageIndex, null, false);
queue.putToMRU(cacheEntry);
//we put placeholder to the queue, later we will replace it with real data
//it is done to prevent cases when disk cache size will exceed limits
//set by configuration
final OCacheEntry cacheEntry = new OCacheEntry(fileId, pageIndex, null, false);
queue.putToMRU(cacheEntry);

Set<Long> pages = filePages.get(fileId);
if (pages == null) {
pages = new HashSet<Long>();
Set<Long> pages = filePages.get(fileId);
if (pages == null) {
pages = new HashSet<Long>();

Set<Long> op = filePages.putIfAbsent(fileId, pages);
if (op != null) {
pages = op;
Set<Long> op = filePages.putIfAbsent(fileId, pages);
if (op != null) {
pages = op;
}
}
}

pages.add(pageIndex);
pages.add(pageIndex);

//remove part of the queue if queue size is bigger than allowed
removeColdPagesWithCacheLock();
//remove part of the queue if queue size is bigger than allowed
removeColdPagesWithCacheLock();
}
} finally {
internalFileId = dataInputStream.readInt();
}
} finally {
internalFileId = dataInputStream.readInt();
}
} catch (IOException e) {
for (PageKey pageKey : filePositions) {
//clear all place holders
queue.remove(pageKey.fileId, pageKey.pageIndex);

final Set<Long> pages = filePages.get(pageKey.fileId);
pages.remove(pageKey.pageIndex);
}

throw OException.wrapException(new OLoadCacheStateException("Can not restore state of cache from file"), e);
}

//second step: load pages sorted by position in a file and replace placeholders by real data
Expand Down

0 comments on commit ba6c02a

Please sign in to comment.