Skip to content

Commit

Permalink
Issue #7274 was fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii0lomakin committed Mar 27, 2017
1 parent e7562ea commit 3208c3b
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.storage.OStorage;
import com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage;
import com.orientechnologies.orient.core.storage.impl.local.paginated.OLocalPaginatedStorage;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.io.IOException;
import java.util.*;

/**
Expand Down Expand Up @@ -345,7 +347,7 @@ public boolean autoRecreateIndexesAfterCrash() {
final OStorage storage = database.getStorage().getUnderlying();
if (storage instanceof OAbstractPaginatedStorage) {
OAbstractPaginatedStorage paginatedStorage = (OAbstractPaginatedStorage) storage;
return paginatedStorage.wereDataRestoredAfterOpen() && paginatedStorage.wereNonTxOperationsPerformedInPreviousOpen();
return paginatedStorage.isIndexRebuildScheduled();
}

return false;
Expand Down Expand Up @@ -531,6 +533,16 @@ private void recreateIndexes() {
newDb.getMetadata().getIndexManager().save();

rebuildCompleted = true;
final OStorage storage = newDb.getStorage().getUnderlying();

if (storage instanceof OLocalPaginatedStorage) {
final OLocalPaginatedStorage paginatedStorage = (OLocalPaginatedStorage) storage;
try {
paginatedStorage.cancelIndexRebuild();
} catch (IOException e) {
OLogManager.instance().error(this, "Storage index rebuild flag can not be canceled after index rebuild", e);
}
}

OLogManager.instance().info(this, "%d indexes were restored successfully, %d errors", ok, errors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,15 @@ public abstract class OAbstractPaginatedStorage extends OStorageAbstract

private volatile int defaultClusterId = -1;
protected volatile OAtomicOperationsManager atomicOperationsManager;
private volatile boolean wereNonTxOperationsPerformedInPreviousOpen = false;
private volatile OLowDiskSpaceInformation lowDiskSpace = null;
private volatile boolean checkpointRequest = false;
private volatile OLowDiskSpaceInformation lowDiskSpace = null;
private volatile boolean checkpointRequest = false;

private volatile Throwable dataFlushException = null;

private final int id;

private Map<String, OIndexEngine> indexEngineNameMap = new HashMap<String, OIndexEngine>();
private List<OIndexEngine> indexEngines = new ArrayList<OIndexEngine>();
private boolean wereDataRestoredAfterOpen = false;
private Map<String, OIndexEngine> indexEngineNameMap = new HashMap<String, OIndexEngine>();
private List<OIndexEngine> indexEngines = new ArrayList<OIndexEngine>();

private volatile long fullCheckpointCount;

Expand Down Expand Up @@ -502,10 +500,11 @@ public void create(final Map<String, Object> iProperties) {
// ADD THE DEFAULT CLUSTER
defaultClusterId = doAddCluster(CLUSTER_DEFAULT_NAME, null);

clearStorageDirty();
if (OGlobalConfiguration.STORAGE_MAKE_FULL_CHECKPOINT_AFTER_CREATE.getValueAsBoolean())
makeFullCheckpoint();

clearStorageDirty();

writeCache.startFuzzyCheckpoints();
postCreateSteps();

Expand Down Expand Up @@ -2526,14 +2525,6 @@ public boolean isRemote() {
return false;
}

public boolean wereDataRestoredAfterOpen() {
return wereDataRestoredAfterOpen;
}

public boolean wereNonTxOperationsPerformedInPreviousOpen() {
return wereNonTxOperationsPerformedInPreviousOpen;
}

public void reload() {
close();
open(null, null, null);
Expand Down Expand Up @@ -2868,6 +2859,16 @@ protected boolean isDirty() throws IOException {
return false;
}

public boolean isIndexRebuildScheduled() {
return false;
}

protected void scheduleIndexRebuild() throws IOException {
}

public void cancelIndexRebuild() throws IOException {
}

private ORawBuffer readRecordIfNotLatest(final OCluster cluster, final ORecordId rid, final int recordVersion)
throws ORecordNotFoundException {
checkOpeness();
Expand Down Expand Up @@ -2994,7 +2995,7 @@ private void recoverIfNeeded() throws Exception {
if (isDirty()) {
OLogManager.instance().warn(this, "Storage '" + name + "' was not closed properly. Will try to recover from write ahead log");
try {
wereDataRestoredAfterOpen = restoreFromWAL() != null;
restoreFromWAL();

if (recoverListener != null)
recoverListener.onStorageRecover();
Expand Down Expand Up @@ -3914,9 +3915,9 @@ protected OLogSequenceNumber restoreFrom(OLogSequenceNumber lsn, OWriteAheadLog

operationList.add(operationUnitRecord);
} else if (walRecord instanceof ONonTxOperationPerformedWALRecord) {
if (!wereNonTxOperationsPerformedInPreviousOpen) {
if (!isIndexRebuildScheduled()) {
OLogManager.instance().warn(this, "Non tx operation was used during data modification we will need index rebuild.");
wereNonTxOperationsPerformedInPreviousOpen = true;
scheduleIndexRebuild();
}
} else
OLogManager.instance().warn(this, "Record %s will be skipped during data restore", walRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,24 @@
*/
public class OLocalPaginatedStorage extends OAbstractPaginatedStorage implements OFreezableStorageComponent {

private static String[] ALL_FILE_EXTENSIONS = { ".ocf", ".pls", ".pcl", ".oda", ".odh", ".otx",
".ocs", ".oef", ".oem", ".oet", ".fl", ".json", ".DS_Store", ODiskWriteAheadLog.WAL_SEGMENT_EXTENSION, ODiskWriteAheadLog.MASTER_RECORD_EXTENSION,
private static String[] ALL_FILE_EXTENSIONS = { ".ocf", ".pls", ".pcl", ".oda", ".odh", ".otx", ".ocs", ".oef", ".oem", ".oet",
".fl", ".json", ".DS_Store", ODiskWriteAheadLog.WAL_SEGMENT_EXTENSION, ODiskWriteAheadLog.MASTER_RECORD_EXTENSION,
OHashTableIndexEngine.BUCKET_FILE_EXTENSION, OHashTableIndexEngine.METADATA_FILE_EXTENSION,
OHashTableIndexEngine.TREE_FILE_EXTENSION, OHashTableIndexEngine.NULL_BUCKET_FILE_EXTENSION,
OClusterPositionMap.DEF_EXTENSION, OSBTreeIndexEngine.DATA_FILE_EXTENSION, OWOWCache.NAME_ID_MAP_EXTENSION,
OIndexRIDContainer.INDEX_FILE_EXTENSION, OSBTreeCollectionManagerShared.DEFAULT_EXTENSION,
OSBTreeIndexEngine.NULL_BUCKET_FILE_EXTENSION, O2QCache.CACHE_STATISTIC_FILE_EXTENSION };

private static final int ONE_KB = 1024;
private static final int ONE_KB = 1024;

private final int DELETE_MAX_RETRIES;
private final int DELETE_WAIT_TIME;
private final int DELETE_MAX_RETRIES;
private final int DELETE_WAIT_TIME;

private final OStorageVariableParser variableParser;
private final OPaginatedStorageDirtyFlag dirtyFlag;
private final OStorageVariableParser variableParser;
private final OPaginatedStorageDirtyFlag dirtyFlag;

private final String storagePath;
private ExecutorService checkpointExecutor;
private final String storagePath;
private ExecutorService checkpointExecutor;

private final OClosableLinkedContainer<Long, OFileClassic> files;

Expand Down Expand Up @@ -182,8 +182,9 @@ public List<String> backup(OutputStream out, Map<String, Object> options, final

final OutputStream bo = bufferSize > 0 ? new BufferedOutputStream(out, bufferSize) : out;
try {
return OZIPCompressionUtil.compressDirectory(new File(getStoragePath()).getAbsolutePath(), bo,
new String[] { ".wal", ".fl" }, iOutput, compressionLevel);
return OZIPCompressionUtil
.compressDirectory(new File(getStoragePath()).getAbsolutePath(), bo, new String[] { ".wal", ".fl" }, iOutput,
compressionLevel);
} finally {
if (bufferSize > 0) {
bo.flush();
Expand Down Expand Up @@ -362,8 +363,8 @@ protected void preCloseSteps() throws IOException {
try {
if (writeAheadLog != null) {
checkpointExecutor.shutdown();
if (!checkpointExecutor.awaitTermination(OGlobalConfiguration.WAL_FULL_CHECKPOINT_SHUTDOWN_TIMEOUT.getValueAsInteger(),
TimeUnit.SECONDS))
if (!checkpointExecutor
.awaitTermination(OGlobalConfiguration.WAL_FULL_CHECKPOINT_SHUTDOWN_TIMEOUT.getValueAsInteger(), TimeUnit.SECONDS))
throw new OStorageException("Cannot terminate full checkpoint task");
}
} catch (InterruptedException e) {
Expand Down Expand Up @@ -403,8 +404,9 @@ protected void postDeleteSteps() {
if (notDeletedFiles == 0) {
// TRY TO DELETE ALSO THE DIRECTORY IF IT'S EMPTY
if (!dbDir.delete())
OLogManager.instance().error(this, "Cannot delete storage directory with path " + dbDir.getAbsolutePath()
+ " because directory is not empty. Files: " + Arrays.toString(dbDir.listFiles()));
OLogManager.instance().error(this,
"Cannot delete storage directory with path " + dbDir.getAbsolutePath() + " because directory is not empty. Files: "
+ Arrays.toString(dbDir.listFiles()));
return;
}
} else
Expand All @@ -431,6 +433,40 @@ protected boolean isDirty() throws IOException {
return dirtyFlag.isDirty();
}

@Override
public boolean isIndexRebuildScheduled() {
checkOpeness();

stateLock.acquireReadLock();
try {
checkOpeness();

return dirtyFlag.isIndexRebuildScheduled();
} finally {
stateLock.releaseReadLock();
}
}

@Override
protected void scheduleIndexRebuild() throws IOException {
dirtyFlag.scheduleIndexRebuild();
}

@Override
public void cancelIndexRebuild() throws IOException {
checkOpeness();

stateLock.acquireReadLock();
try {
checkOpeness();

dirtyFlag.clearIndexRebuild();
} finally {
stateLock.releaseReadLock();
}

}

@Override
protected boolean isWriteAllowedDuringIncrementalBackup() {
return true;
Expand Down
Loading

0 comments on commit 3208c3b

Please sign in to comment.