Skip to content

Commit

Permalink
[BugFix] Fix snapshot was deleted by mistake after restore from it (#…
Browse files Browse the repository at this point in the history
…56098)

Signed-off-by: srlch <linzichao@starrocks.com>
(cherry picked from commit 2eb60cf)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgr.java
#	fe/fe-core/src/test/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgrTest.java
  • Loading branch information
srlch authored and mergify[bot] committed Feb 24, 2025
1 parent 4687af4 commit 31c09e0
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class ClusterSnapshotCheckpointScheduler extends FrontendDaemon {

private final CheckpointController feController;
private final CheckpointController starMgrController;
// cluster snapshot information used for start
private final RestoredSnapshotInfo restoredSnapshotInfo;

private boolean firstRun;

Expand All @@ -41,6 +43,7 @@ public ClusterSnapshotCheckpointScheduler(CheckpointController feController,
this.feController = feController;
this.starMgrController = starMgrController;
this.firstRun = true;
this.restoredSnapshotInfo = RestoreClusterSnapshotMgr.getRestoredSnapshotInfo();
}

@Override
Expand All @@ -56,7 +59,8 @@ protected void runAfterCatalogReady() {

// skip first run when the scheduler start
if (firstRun) {
GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().resetAutomatedJobsStateForTheFirstRun();
GlobalStateMgr.getCurrentState().getClusterSnapshotMgr()
.resetSnapshotJobsStateAfterRestarted(restoredSnapshotInfo);
firstRun = false;
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ public enum ClusterSnapshotJobState {
private ClusterSnapshotJobState state;
@SerializedName(value = "errMsg")
private String errMsg;
@SerializedName(value = "detailInfo")
private String detailInfo;

public ClusterSnapshotJob(long id, String snapshotName, String storageVolumeName, long createdTimeMs) {
this.snapshot = new ClusterSnapshot(id, snapshotName, storageVolumeName, createdTimeMs, -1, 0, 0);
this.state = ClusterSnapshotJobState.INITIALIZING;
this.errMsg = "";
this.detailInfo = "";
}

public void setState(ClusterSnapshotJobState state) {
Expand Down Expand Up @@ -118,6 +121,10 @@ public boolean isUnFinishedState() {
state == ClusterSnapshotJobState.UPLOADING;
}

public boolean isInitializing() {
return state == ClusterSnapshotJobState.INITIALIZING;
}

public boolean isError() {
return state == ClusterSnapshotJobState.ERROR;
}
Expand All @@ -138,6 +145,10 @@ public boolean isFinalState() {
return state == ClusterSnapshotJobState.DELETED || state == ClusterSnapshotJobState.ERROR;
}

public void setDetailInfo(String detailInfo) {
this.detailInfo = detailInfo;
}

public void logJob() {
ClusterSnapshotLog log = new ClusterSnapshotLog();
log.setSnapshotJob(this);
Expand All @@ -151,7 +162,7 @@ public TClusterSnapshotJobsItem getInfo() {
item.setCreated_time(getCreatedTimeMs() / 1000);
item.setFinished_time(getFinishedTimeMs() / 1000);
item.setState(state.name());
item.setDetail_info("");
item.setDetail_info(detailInfo);
item.setError_message(errMsg);
return item;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

Expand Down Expand Up @@ -224,21 +225,49 @@ public NavigableMap<Long, ClusterSnapshotJob> getAutomatedSnapshotJobs() {
return automatedSnapshotJobs;
}

public void resetAutomatedJobsStateForTheFirstRun() {
public void resetSnapshotJobsStateAfterRestarted(RestoredSnapshotInfo restoredSnapshotInfo) {
setLastJobFinishedAfterRestored(restoredSnapshotInfo);
resetLastUnFinishedAutomatedSnapshotJob();
clearFinishedAutomatedClusterSnapshotExceptLastFinished();
clearFinishedAutomatedClusterSnapshotExceptLast();
}

public void clearFinishedAutomatedClusterSnapshotExceptLastFinished() {
public void setLastJobFinishedAfterRestored(RestoredSnapshotInfo restoredSnapshotInfo) {
if (restoredSnapshotInfo == null) {
return;
}

String restoredSnapshotName = restoredSnapshotInfo.getSnapshotName();
long feJournalId = restoredSnapshotInfo.getFeJournalId();
long starMgrJournalId = restoredSnapshotInfo.getStarMgrJournalId();
if (restoredSnapshotName == null) {
return;
}

Entry<Long, ClusterSnapshotJob> entry = automatedSnapshotJobs.lastEntry();
if (entry != null) {
ClusterSnapshotJob job = entry.getValue();
// Last snapshot may in init state, because the last snapshot checkpoint does not include the
// editlog for the state transtition after ClusterSnapshotJobState.INITIALIZING
if (job.getSnapshotName().equals(restoredSnapshotName) && job.isInitializing()) {
job.setJournalIds(feJournalId, starMgrJournalId);
job.setState(ClusterSnapshotJobState.FINISHED);
job.setDetailInfo("Finished time was reset after cluster restored");
job.logJob();
}
}
}

public void clearFinishedAutomatedClusterSnapshotExceptLast() {
ClusterSnapshotJob lastFinishedJob = getLastFinishedAutomatedClusterSnapshotJob();
if (lastFinishedJob != null) {
clearFinishedAutomatedClusterSnapshot(lastFinishedJob.getSnapshotName());
}
}

public void resetLastUnFinishedAutomatedSnapshotJob() {
if (!automatedSnapshotJobs.isEmpty()) {
ClusterSnapshotJob job = automatedSnapshotJobs.lastEntry().getValue();
Entry<Long, ClusterSnapshotJob> entry = automatedSnapshotJobs.lastEntry();
if (entry != null) {
ClusterSnapshotJob job = entry.getValue();
if (job.isUnFinishedState()) {
job.setErrMsg("Snapshot job has been failed because of FE restart or leader change");
job.setState(ClusterSnapshotJobState.ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import com.starrocks.fs.HdfsUtil;
import com.starrocks.ha.FrontendNodeType;
import com.starrocks.journal.bdbje.BDBEnvironment;
import com.starrocks.persist.Storage;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.NodeMgr;
import com.starrocks.server.StorageVolumeMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.staros.StarMgrServer;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.Frontend;
Expand All @@ -32,6 +34,7 @@
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;

Expand All @@ -43,14 +46,23 @@ public class RestoreClusterSnapshotMgr {
private ClusterSnapshotConfig config;
private boolean oldStartWithIncompleteMeta;
private boolean oldResetElectionGroup;
private RestoredSnapshotInfo restoredSnapshotInfo;

<<<<<<< HEAD
private RestoreClusterSnapshotMgr(String clusterSnapshotYamlFile) throws UserException {
=======
private RestoreClusterSnapshotMgr(String clusterSnapshotYamlFile) throws StarRocksException, IOException {
>>>>>>> 2eb60cf22 ([BugFix] Fix snapshot was deleted by mistake after restore from it (#56098))
config = ClusterSnapshotConfig.load(clusterSnapshotYamlFile);
downloadSnapshot();
updateConfig();
}

<<<<<<< HEAD
public static void init(String clusterSnapshotYamlFile, String[] args) throws UserException {
=======
public static void init(String clusterSnapshotYamlFile, String[] args) throws StarRocksException, IOException {
>>>>>>> 2eb60cf22 ([BugFix] Fix snapshot was deleted by mistake after restore from it (#56098))
for (String arg : args) {
if (arg.equalsIgnoreCase("-cluster_snapshot")) {
LOG.info("FE start to restore from a cluster snapshot (-cluster_snapshot)");
Expand Down Expand Up @@ -95,6 +107,14 @@ public static void finishRestoring() throws UserException {
}
}

public static RestoredSnapshotInfo getRestoredSnapshotInfo() {
RestoreClusterSnapshotMgr self = instance;
if (self == null) {
return null;
}
return self.restoredSnapshotInfo;
}

private void updateConfig() {
// Save the old config
oldStartWithIncompleteMeta = Config.start_with_incomplete_meta;
Expand All @@ -111,7 +131,11 @@ private void rollbackConfig() {
Config.bdbje_reset_election_group = oldResetElectionGroup;
}

<<<<<<< HEAD
private void downloadSnapshot() throws UserException {
=======
private void downloadSnapshot() throws StarRocksException, IOException {
>>>>>>> 2eb60cf22 ([BugFix] Fix snapshot was deleted by mistake after restore from it (#56098))
ClusterSnapshotConfig.ClusterSnapshot clusterSnapshot = config.getClusterSnapshot();
if (clusterSnapshot == null) {
return;
Expand All @@ -131,6 +155,32 @@ private void downloadSnapshot() throws UserException {

LOG.info("Download cluster snapshot {} to local dir {}", snapshotImagePath, localImagePath);
HdfsUtil.copyToLocal(snapshotImagePath, localImagePath, clusterSnapshot.getStorageVolume().getProperties());
collectSnapshotInfoAfterDownload(snapshotImagePath, localImagePath);
}

private void collectSnapshotInfoAfterDownload(String snapshotImagePath, String localImagePath) throws IOException {
String restoredSnapshotName = null;
long feImageJournalId = 0L;
long starMgrImageJournalId = 0L;

Storage storageFe = new Storage(localImagePath);
Storage storageStarMgr = new Storage(localImagePath + StarMgrServer.IMAGE_SUBDIR);
// get image version
feImageJournalId = storageFe.getImageJournalId();
starMgrImageJournalId = storageStarMgr.getImageJournalId();

LOG.info("Download cluster snapshot successfully with FE image version: {}, StarMgr image version: {}",
feImageJournalId, starMgrImageJournalId);

String normalizePath = snapshotImagePath.replaceAll("/+$", "");
int lastSlashIndex = normalizePath.lastIndexOf('/');
if (lastSlashIndex != -1) {
restoredSnapshotName = normalizePath.substring(lastSlashIndex + 1);
}

if (restoredSnapshotName != null) {
restoredSnapshotInfo = new RestoredSnapshotInfo(restoredSnapshotName, feImageJournalId, starMgrImageJournalId);
}
}

private void updateFrontends() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.lake.snapshot;

// save the infomation parsed from the cluster snapshot when start in restore mode
public class RestoredSnapshotInfo {
private String snapshotName;
private long feJournalId;
private long starMgrJournalId;

public RestoredSnapshotInfo(String snapshotName, long feJournalId, long starMgrJournalId) {
this.snapshotName = snapshotName;
this.feJournalId = feJournalId;
this.starMgrJournalId = starMgrJournalId;
}

public String getSnapshotName() {
return snapshotName;
}

public long getFeJournalId() {
return feJournalId;
}

public long getStarMgrJournalId() {
return starMgrJournalId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AWS_S3_ENDPOINT;
import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AWS_S3_REGION;
Expand All @@ -70,7 +71,7 @@ public class ClusterSnapshotTest {
private ClusterSnapshotMgr clusterSnapshotMgr = new ClusterSnapshotMgr();
private boolean initSv = false;

private long nextId = 0;
private AtomicLong nextId = new AtomicLong(0);

@BeforeClass
public static void beforeClass() throws Exception {
Expand Down Expand Up @@ -109,8 +110,8 @@ public ClusterSnapshotMgr getClusterSnapshotMgr() {

@Mock
public long getNextId() {
nextId = nextId + 1;
return nextId;
long id = nextId.incrementAndGet();
return id;
}
};

Expand Down Expand Up @@ -378,4 +379,28 @@ public MaterializedViewHandler getRollupHandler() {
Assert.assertTrue(localClusterSnapshotMgr.isTableSafeToDeleteTablet(11));
localClusterSnapshotMgr.setAutomatedSnapshotOff();
}

@Test
public void testResetStateAfterRestore() {
new MockUp<RunMode>() {
@Mock
public boolean isSharedDataMode() {
return true;
}
};

ClusterSnapshotMgr localClusterSnapshotMgr = new ClusterSnapshotMgr();
localClusterSnapshotMgr.setAutomatedSnapshotOn(storageVolumeName);

ClusterSnapshotJob job1 = localClusterSnapshotMgr.createAutomatedSnapshotJob();
job1.setState(ClusterSnapshotJobState.FINISHED);
ClusterSnapshotJob job2 = localClusterSnapshotMgr.createAutomatedSnapshotJob();
RestoredSnapshotInfo restoredSnapshotInfo = new RestoredSnapshotInfo(job2.getSnapshotName(), 666L, 6666L);
localClusterSnapshotMgr.setLastJobFinishedAfterRestored(restoredSnapshotInfo);

Assert.assertTrue(job2.getFeJournalId() == 666L);
Assert.assertTrue(job2.getStarMgrJournalId() == 6666L);
Assert.assertTrue(job2.isFinished());
localClusterSnapshotMgr.setAutomatedSnapshotOff();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@

package com.starrocks.lake.snapshot;

<<<<<<< HEAD
import com.starrocks.common.UserException;
=======
import com.starrocks.common.StarRocksException;
import com.starrocks.fs.hdfs.HdfsFsManager;
import com.starrocks.persist.Storage;
>>>>>>> 2eb60cf22 ([BugFix] Fix snapshot was deleted by mistake after restore from it (#56098))
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.utframe.UtFrameUtils;
import mockit.Mock;
import mockit.MockUp;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

public class RestoreClusterSnapshotMgrTest {
Expand All @@ -44,8 +53,26 @@ public void testDownloadSnapshotFailed() throws Exception {

@Test
public void testNormal() throws Exception {
RestoreClusterSnapshotMgr.init("src/test/resources/conf/cluster_snapshot2.yaml",
new MockUp<Storage>() {
@Mock
public long getImageJournalId() {
return 10L;
}
};

new MockUp<HdfsFsManager>() {
@Mock
public void copyToLocal(String srcPath, String destPath, Map<String, String> properties) {
return;
} // IOException
};

RestoreClusterSnapshotMgr.init("src/test/resources/conf/cluster_snapshot.yaml",
new String[] { "-cluster_snapshot" });
Assert.assertTrue(RestoreClusterSnapshotMgr.getRestoredSnapshotInfo().getSnapshotName()
.equals("automated_cluster_snapshot_1704038400000"));
Assert.assertTrue(RestoreClusterSnapshotMgr.getRestoredSnapshotInfo().getFeJournalId() == 10L);
Assert.assertTrue(RestoreClusterSnapshotMgr.getRestoredSnapshotInfo().getStarMgrJournalId() == 10L);
Assert.assertTrue(RestoreClusterSnapshotMgr.isRestoring());

for (ClusterSnapshotConfig.StorageVolume sv : RestoreClusterSnapshotMgr.getConfig().getStorageVolumes()) {
Expand Down
Loading

0 comments on commit 31c09e0

Please sign in to comment.