Skip to content

Commit

Permalink
[feature](restore) support force_replace restore (#47314)
Browse files Browse the repository at this point in the history
selectdb/ccr-syncer#396

force_replace flag will only replace table with different schema
except for non-OLAP tables
  • Loading branch information
wyxxxcat authored Feb 8, 2025
1 parent 46cc249 commit e3cee61
Show file tree
Hide file tree
Showing 9 changed files with 376 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
public static final String PROP_CLEAN_TABLES = "clean_tables";
public static final String PROP_CLEAN_PARTITIONS = "clean_partitions";
public static final String PROP_ATOMIC_RESTORE = "atomic_restore";
public static final String PROP_FORCE_REPLACE = "force_replace";

private boolean allowLoad = false;
private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
Expand All @@ -58,6 +59,7 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
private boolean isCleanTables = false;
private boolean isCleanPartitions = false;
private boolean isAtomicRestore = false;
private boolean isForceReplace = false;
private byte[] meta = null;
private byte[] jobInfo = null;

Expand Down Expand Up @@ -133,6 +135,10 @@ public boolean isAtomicRestore() {
return isAtomicRestore;
}

public boolean isForceReplace() {
return isForceReplace;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
Expand Down Expand Up @@ -219,6 +225,9 @@ public void analyzeProperties() throws AnalysisException {
// is atomic restore
isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore);

// is force replace
isForceReplace = eatBooleanProperty(copiedProperties, PROP_FORCE_REPLACE, isForceReplace);

if (!copiedProperties.isEmpty()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Unknown restore job properties: " + copiedProperties.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,14 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveColocate(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.isForceReplace(),
env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveColocate(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), stmt.isCleanTables(),
stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.isForceReplace(),
env, repository.getId());
}

Expand Down
96 changes: 66 additions & 30 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable {
private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES;
private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS;
private static final String PROP_ATOMIC_RESTORE = RestoreStmt.PROP_ATOMIC_RESTORE;
private static final String PROP_FORCE_REPLACE = RestoreStmt.PROP_FORCE_REPLACE;
private static final String ATOMIC_RESTORE_TABLE_PREFIX = "__doris_atomic_restore_prefix__";

private static final Logger LOG = LogManager.getLogger(RestoreJob.class);
Expand Down Expand Up @@ -216,6 +217,8 @@ public enum RestoreJobState {
private boolean isCleanPartitions = false;
// Whether to restore the data into a temp table, and then replace the origin one.
private boolean isAtomicRestore = false;
// Whether to restore the table by replacing the exists but conflicted table.
private boolean isForceReplace = false;

// restore properties
@SerializedName("prop")
Expand All @@ -234,7 +237,8 @@ public RestoreJob(JobType jobType) {
public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveColocate, boolean reserveDynamicPartitionEnable, boolean isBeingSynced,
boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) {
boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, boolean isForceReplace, Env env,
long repoId) {
super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
this.backupTimestamp = backupTs;
this.jobInfo = jobInfo;
Expand All @@ -253,23 +257,28 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu
this.isCleanTables = isCleanTables;
this.isCleanPartitions = isCleanPartitions;
this.isAtomicRestore = isAtomicRestore;
if (this.isAtomicRestore) {
this.isForceReplace = isForceReplace;
}
properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica));
properties.put(PROP_RESERVE_COLOCATE, String.valueOf(reserveColocate));
properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable));
properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced));
properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables));
properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions));
properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore));
properties.put(PROP_FORCE_REPLACE, String.valueOf(isForceReplace));
}

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveColocate, boolean reserveDynamicPartitionEnable, boolean isBeingSynced,
boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId,
boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, boolean isForeReplace, Env env,
long repoId,
BackupMeta backupMeta) {
this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica,
reserveColocate, reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions,
isAtomicRestore, env, repoId);
isAtomicRestore, isForeReplace, env, repoId);

this.backupMeta = backupMeta;
}
Expand Down Expand Up @@ -692,6 +701,7 @@ private void checkAndPrepareMeta() {
Table remoteTbl = backupMeta.getTable(tableName);
Preconditions.checkNotNull(remoteTbl);
Table localTbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName));
boolean isSchemaChanged = false;
if (localTbl != null && localTbl.getType() != TableType.OLAP) {
// table already exist, but is not OLAP
status = new Status(ErrCode.COMMON_ERROR,
Expand All @@ -715,8 +725,14 @@ private void checkAndPrepareMeta() {
List<String> intersectPartNames = Lists.newArrayList();
Status st = localOlapTbl.getIntersectPartNamesWith(remoteOlapTbl, intersectPartNames);
if (!st.ok()) {
status = st;
return;
if (isForceReplace) {
LOG.info("{}, will force replace, job: {}",
st.getErrMsg(), this);
isSchemaChanged = true;
} else {
status = st;
return;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("get intersect part names: {}, job: {}", intersectPartNames, this);
Expand All @@ -726,13 +742,20 @@ private void checkAndPrepareMeta() {
String remoteTblSignature = remoteOlapTbl.getSignature(
BackupHandler.SIGNATURE_VERSION, intersectPartNames);
if (!localTblSignature.equals(remoteTblSignature)) {
String alias = jobInfo.getAliasByOriginNameIfSet(tableName);
LOG.warn("Table {} already exists but with different schema, "
+ "local table: {}, remote table: {}",
alias, localTblSignature, remoteTblSignature);
status = new Status(ErrCode.COMMON_ERROR, "Table "
+ alias + " already exist but with different schema");
return;
if (isForceReplace) {
LOG.info("Table {} already exists but with different schema, will force replace, "
+ "local table: {}, remote table: {}",
tableName, localTblSignature, remoteTblSignature);
isSchemaChanged = true;
} else {
String alias = jobInfo.getAliasByOriginNameIfSet(tableName);
LOG.warn("Table {} already exists but with different schema, "
+ "local table: {}, remote table: {}",
alias, localTblSignature, remoteTblSignature);
status = new Status(ErrCode.COMMON_ERROR, "Table "
+ alias + " already exist but with different schema");
return;
}
}

// Table with same name and has same schema. Check partition
Expand All @@ -754,10 +777,14 @@ private void checkAndPrepareMeta() {
.getPartitionInfo().getItem(backupPartInfo.id);
if (!localItem.equals(remoteItem)) {
// Same partition name, different range
status = new Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
+ " in table " + localTbl.getName()
+ " has different partition item with partition in repository");
return;
if (isForceReplace) {
isSchemaChanged = true;
} else {
status = new Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
+ " in table " + localTbl.getName()
+ " has different partition item with partition in repository");
return;
}
}
}

Expand Down Expand Up @@ -844,7 +871,7 @@ private void checkAndPrepareMeta() {
// remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
remoteOlapTbl.setState(allowLoad ? OlapTableState.RESTORE_WITH_LOAD : OlapTableState.RESTORE);

if (isAtomicRestore && localTbl != null) {
if (isAtomicRestore && localTbl != null && !isSchemaChanged) {
// bind the backends and base tablets from local tbl.
status = bindLocalAndRemoteOlapTableReplicas((OlapTable) localTbl, remoteOlapTbl, tabletBases);
if (!status.ok()) {
Expand Down Expand Up @@ -2439,7 +2466,6 @@ private void cancelInternal(boolean isReplay) {
}

private Status atomicReplaceOlapTables(Database db, boolean isReplay) {
assert isAtomicRestore;
for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
String originName = jobInfo.getAliasByOriginNameIfSet(tableName);
if (Env.isStoredTableNamesLowerCase()) {
Expand All @@ -2453,13 +2479,16 @@ private Status atomicReplaceOlapTables(Database db, boolean isReplay) {
try {
Table newTbl = db.getTableNullable(aliasName);
if (newTbl == null) {
LOG.warn("replace table from {} to {}, but the temp table is not found", aliasName, originName);
LOG.warn("replace table from {} to {}, but the temp table is not found" + " isAtomicRestore: {}",
aliasName, originName, isAtomicRestore);
return new Status(ErrCode.COMMON_ERROR, "replace table failed, the temp table "
+ aliasName + " is not found");
}
if (newTbl.getType() != TableType.OLAP) {
LOG.warn("replace table from {} to {}, but the temp table is not OLAP, it type is {}",
aliasName, originName, newTbl.getType());
LOG.warn(
"replace table from {} to {}, but the temp table is not OLAP, it type is {}"
+ " isAtomicRestore: {}",
aliasName, originName, newTbl.getType(), isAtomicRestore);
return new Status(ErrCode.COMMON_ERROR, "replace table failed, the temp table " + aliasName
+ " is not OLAP table, it is " + newTbl.getType());
}
Expand All @@ -2468,12 +2497,14 @@ private Status atomicReplaceOlapTables(Database db, boolean isReplay) {
Table originTbl = db.getTableNullable(originName);
if (originTbl != null) {
if (originTbl.getType() != TableType.OLAP) {
LOG.warn("replace table from {} to {}, but the origin table is not OLAP, it type is {}",
aliasName, originName, originTbl.getType());
LOG.warn(
"replace table from {} to {}, but the origin table is not OLAP, it type is {}"
+ " isAtomicRestore: {}",
aliasName, originName, originTbl.getType(), isAtomicRestore);
return new Status(ErrCode.COMMON_ERROR, "replace table failed, the origin table "
+ originName + " is not OLAP table, it is " + originTbl.getType());
}
originOlapTbl = (OlapTable) originTbl; // save the origin olap table, then drop it.
originOlapTbl = (OlapTable) originTbl; // save the origin olap table, then drop it.
}

// replace the table.
Expand All @@ -2488,11 +2519,14 @@ private Status atomicReplaceOlapTables(Database db, boolean isReplay) {

// set the olap table state to normal immediately for querying
newOlapTbl.setState(OlapTableState.NORMAL);
LOG.info("atomic restore replace table {} name to {}, and set state to normal, origin table={}",
newOlapTbl.getId(), originName, originOlapTbl == null ? -1L : originOlapTbl.getId());
LOG.info(
"restore with replace table {} name to {}, and set state to normal, origin table={}"
+ " isAtomicRestore: {}",
newOlapTbl.getId(), originName, originOlapTbl == null ? -1L : originOlapTbl.getId(),
isAtomicRestore);
} catch (DdlException e) {
LOG.warn("atomic restore replace table {} name from {} to {}",
newOlapTbl.getId(), aliasName, originName, e);
LOG.warn("restore with replace table {} name from {} to {}, isAtomicRestore: {}",
newOlapTbl.getId(), aliasName, originName, isAtomicRestore, e);
return new Status(ErrCode.COMMON_ERROR, "replace table from " + aliasName + " to " + originName
+ " failed, reason=" + e.getMessage());
} finally {
Expand All @@ -2503,8 +2537,8 @@ private Status atomicReplaceOlapTables(Database db, boolean isReplay) {
// The origin table is not used anymore, need to drop all its tablets.
originOlapTbl.writeLock();
try {
LOG.info("drop the origin olap table {} by atomic restore. table={}",
originOlapTbl.getName(), originOlapTbl.getId());
LOG.info("drop the origin olap table {}. table={}" + " isAtomicRestore: {}",
originOlapTbl.getName(), originOlapTbl.getId(), isAtomicRestore);
Env.getCurrentEnv().onEraseOlapTable(originOlapTbl, isReplay);
} finally {
originOlapTbl.writeUnlock();
Expand Down Expand Up @@ -2692,6 +2726,7 @@ private void readOthers(DataInput in) throws IOException {
isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE));
isForceReplace = Boolean.parseBoolean(properties.get(PROP_FORCE_REPLACE));
}

@Override
Expand All @@ -2702,6 +2737,7 @@ public void gsonPostProcess() throws IOException {
isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE));
isForceReplace = Boolean.parseBoolean(properties.get(PROP_FORCE_REPLACE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3012,6 +3012,9 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
if (request.isAtomicRestore()) {
properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true");
}
if (request.isForceReplace()) {
properties.put(RestoreStmt.PROP_FORCE_REPLACE, "true");
}

AbstractBackupTableRefClause restoreTableRefClause = null;
if (request.isSetTableRefs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ boolean await(long timeout, TimeUnit unit) {
db.unregisterTable(expectedRestoreTbl.getName());

job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, false, false,
env, repo.getId());

List<Table> tbls = Lists.newArrayList();
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,7 @@ struct TRestoreSnapshotRequest {
14: optional bool clean_partitions
15: optional bool atomic_restore
16: optional bool compressed;
17: optional bool force_replace
}

struct TRestoreSnapshotResult {
Expand Down
Loading

0 comments on commit e3cee61

Please sign in to comment.