Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](restore) support force_replace restore #47314

Merged
merged 1 commit into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
public static final String PROP_CLEAN_TABLES = "clean_tables";
public static final String PROP_CLEAN_PARTITIONS = "clean_partitions";
public static final String PROP_ATOMIC_RESTORE = "atomic_restore";
public static final String PROP_FORCE_REPLACE = "force_replace";

private boolean allowLoad = false;
private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
Expand All @@ -58,6 +59,7 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
private boolean isCleanTables = false;
private boolean isCleanPartitions = false;
private boolean isAtomicRestore = false;
private boolean isForceReplace = false;
private byte[] meta = null;
private byte[] jobInfo = null;

Expand Down Expand Up @@ -133,6 +135,10 @@ public boolean isAtomicRestore() {
return isAtomicRestore;
}

public boolean isForceReplace() {
return isForceReplace;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
Expand Down Expand Up @@ -219,6 +225,9 @@ public void analyzeProperties() throws AnalysisException {
// is atomic restore
isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore);

// is force replace
isForceReplace = eatBooleanProperty(copiedProperties, PROP_FORCE_REPLACE, isForceReplace);

if (!copiedProperties.isEmpty()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Unknown restore job properties: " + copiedProperties.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,14 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveColocate(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.isForceReplace(),
env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveColocate(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), stmt.isCleanTables(),
stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.isForceReplace(),
env, repository.getId());
}

Expand Down
96 changes: 66 additions & 30 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable {
private static final String PROP_CLEAN_TABLES = RestoreStmt.PROP_CLEAN_TABLES;
private static final String PROP_CLEAN_PARTITIONS = RestoreStmt.PROP_CLEAN_PARTITIONS;
private static final String PROP_ATOMIC_RESTORE = RestoreStmt.PROP_ATOMIC_RESTORE;
private static final String PROP_FORCE_REPLACE = RestoreStmt.PROP_FORCE_REPLACE;
private static final String ATOMIC_RESTORE_TABLE_PREFIX = "__doris_atomic_restore_prefix__";

private static final Logger LOG = LogManager.getLogger(RestoreJob.class);
Expand Down Expand Up @@ -215,6 +216,8 @@ public enum RestoreJobState {
private boolean isCleanPartitions = false;
// Whether to restore the data into a temp table, and then replace the origin one.
private boolean isAtomicRestore = false;
// Whether to restore the table by replacing the exists but conflicted table.
private boolean isForceReplace = false;

// restore properties
@SerializedName("prop")
Expand All @@ -233,7 +236,8 @@ public RestoreJob(JobType jobType) {
public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveColocate, boolean reserveDynamicPartitionEnable, boolean isBeingSynced,
boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId) {
boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, boolean isForceReplace, Env env,
long repoId) {
super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
this.backupTimestamp = backupTs;
this.jobInfo = jobInfo;
Expand All @@ -252,23 +256,28 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu
this.isCleanTables = isCleanTables;
this.isCleanPartitions = isCleanPartitions;
this.isAtomicRestore = isAtomicRestore;
if (this.isAtomicRestore) {
this.isForceReplace = isForceReplace;
}
properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica));
properties.put(PROP_RESERVE_COLOCATE, String.valueOf(reserveColocate));
properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable));
properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced));
properties.put(PROP_CLEAN_TABLES, String.valueOf(isCleanTables));
properties.put(PROP_CLEAN_PARTITIONS, String.valueOf(isCleanPartitions));
properties.put(PROP_ATOMIC_RESTORE, String.valueOf(isAtomicRestore));
properties.put(PROP_FORCE_REPLACE, String.valueOf(isForceReplace));
}

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveColocate, boolean reserveDynamicPartitionEnable, boolean isBeingSynced,
boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, Env env, long repoId,
boolean isCleanTables, boolean isCleanPartitions, boolean isAtomicRestore, boolean isForeReplace, Env env,
long repoId,
BackupMeta backupMeta) {
this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica,
reserveColocate, reserveDynamicPartitionEnable, isBeingSynced, isCleanTables, isCleanPartitions,
isAtomicRestore, env, repoId);
isAtomicRestore, isForeReplace, env, repoId);

this.backupMeta = backupMeta;
}
Expand Down Expand Up @@ -691,6 +700,7 @@ private void checkAndPrepareMeta() {
Table remoteTbl = backupMeta.getTable(tableName);
Preconditions.checkNotNull(remoteTbl);
Table localTbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName));
boolean isSchemaChanged = false;
if (localTbl != null && localTbl.getType() != TableType.OLAP) {
// table already exist, but is not OLAP
status = new Status(ErrCode.COMMON_ERROR,
Expand All @@ -714,8 +724,14 @@ private void checkAndPrepareMeta() {
List<String> intersectPartNames = Lists.newArrayList();
Status st = localOlapTbl.getIntersectPartNamesWith(remoteOlapTbl, intersectPartNames);
if (!st.ok()) {
status = st;
return;
if (isForceReplace) {
LOG.info("{}, will force replace, job: {}",
st.getErrMsg(), this);
isSchemaChanged = true;
} else {
status = st;
return;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("get intersect part names: {}, job: {}", intersectPartNames, this);
Expand All @@ -725,13 +741,20 @@ private void checkAndPrepareMeta() {
String remoteTblSignature = remoteOlapTbl.getSignature(
BackupHandler.SIGNATURE_VERSION, intersectPartNames);
if (!localTblSignature.equals(remoteTblSignature)) {
String alias = jobInfo.getAliasByOriginNameIfSet(tableName);
LOG.warn("Table {} already exists but with different schema, "
+ "local table: {}, remote table: {}",
alias, localTblSignature, remoteTblSignature);
status = new Status(ErrCode.COMMON_ERROR, "Table "
+ alias + " already exist but with different schema");
return;
if (isForceReplace) {
LOG.info("Table {} already exists but with different schema, will force replace, "
+ "local table: {}, remote table: {}",
tableName, localTblSignature, remoteTblSignature);
isSchemaChanged = true;
} else {
String alias = jobInfo.getAliasByOriginNameIfSet(tableName);
LOG.warn("Table {} already exists but with different schema, "
+ "local table: {}, remote table: {}",
alias, localTblSignature, remoteTblSignature);
status = new Status(ErrCode.COMMON_ERROR, "Table "
+ alias + " already exist but with different schema");
return;
}
}

// Table with same name and has same schema. Check partition
Expand All @@ -753,10 +776,14 @@ private void checkAndPrepareMeta() {
.getPartitionInfo().getItem(backupPartInfo.id);
if (!localItem.equals(remoteItem)) {
// Same partition name, different range
status = new Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
+ " in table " + localTbl.getName()
+ " has different partition item with partition in repository");
return;
if (isForceReplace) {
isSchemaChanged = true;
} else {
status = new Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
+ " in table " + localTbl.getName()
+ " has different partition item with partition in repository");
return;
}
}
}

Expand Down Expand Up @@ -843,7 +870,7 @@ private void checkAndPrepareMeta() {
// remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
remoteOlapTbl.setState(allowLoad ? OlapTableState.RESTORE_WITH_LOAD : OlapTableState.RESTORE);

if (isAtomicRestore && localTbl != null) {
if (isAtomicRestore && localTbl != null && !isSchemaChanged) {
// bind the backends and base tablets from local tbl.
status = bindLocalAndRemoteOlapTableReplicas((OlapTable) localTbl, remoteOlapTbl, tabletBases);
if (!status.ok()) {
Expand Down Expand Up @@ -2428,7 +2455,6 @@ private void cancelInternal(boolean isReplay) {
}

private Status atomicReplaceOlapTables(Database db, boolean isReplay) {
assert isAtomicRestore;
for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
String originName = jobInfo.getAliasByOriginNameIfSet(tableName);
if (Env.isStoredTableNamesLowerCase()) {
Expand All @@ -2442,13 +2468,16 @@ private Status atomicReplaceOlapTables(Database db, boolean isReplay) {
try {
Table newTbl = db.getTableNullable(aliasName);
if (newTbl == null) {
LOG.warn("replace table from {} to {}, but the temp table is not found", aliasName, originName);
LOG.warn("replace table from {} to {}, but the temp table is not found" + " isAtomicRestore: {}",
aliasName, originName, isAtomicRestore);
return new Status(ErrCode.COMMON_ERROR, "replace table failed, the temp table "
+ aliasName + " is not found");
}
if (newTbl.getType() != TableType.OLAP) {
LOG.warn("replace table from {} to {}, but the temp table is not OLAP, it type is {}",
aliasName, originName, newTbl.getType());
LOG.warn(
"replace table from {} to {}, but the temp table is not OLAP, it type is {}"
+ " isAtomicRestore: {}",
aliasName, originName, newTbl.getType(), isAtomicRestore);
return new Status(ErrCode.COMMON_ERROR, "replace table failed, the temp table " + aliasName
+ " is not OLAP table, it is " + newTbl.getType());
}
Expand All @@ -2457,12 +2486,14 @@ private Status atomicReplaceOlapTables(Database db, boolean isReplay) {
Table originTbl = db.getTableNullable(originName);
if (originTbl != null) {
if (originTbl.getType() != TableType.OLAP) {
LOG.warn("replace table from {} to {}, but the origin table is not OLAP, it type is {}",
aliasName, originName, originTbl.getType());
LOG.warn(
"replace table from {} to {}, but the origin table is not OLAP, it type is {}"
+ " isAtomicRestore: {}",
aliasName, originName, originTbl.getType(), isAtomicRestore);
return new Status(ErrCode.COMMON_ERROR, "replace table failed, the origin table "
+ originName + " is not OLAP table, it is " + originTbl.getType());
}
originOlapTbl = (OlapTable) originTbl; // save the origin olap table, then drop it.
originOlapTbl = (OlapTable) originTbl; // save the origin olap table, then drop it.
}

// replace the table.
Expand All @@ -2477,11 +2508,14 @@ private Status atomicReplaceOlapTables(Database db, boolean isReplay) {

// set the olap table state to normal immediately for querying
newOlapTbl.setState(OlapTableState.NORMAL);
LOG.info("atomic restore replace table {} name to {}, and set state to normal, origin table={}",
newOlapTbl.getId(), originName, originOlapTbl == null ? -1L : originOlapTbl.getId());
LOG.info(
"restore with replace table {} name to {}, and set state to normal, origin table={}"
+ " isAtomicRestore: {}",
newOlapTbl.getId(), originName, originOlapTbl == null ? -1L : originOlapTbl.getId(),
isAtomicRestore);
} catch (DdlException e) {
LOG.warn("atomic restore replace table {} name from {} to {}",
newOlapTbl.getId(), aliasName, originName, e);
LOG.warn("restore with replace table {} name from {} to {}, isAtomicRestore: {}",
newOlapTbl.getId(), aliasName, originName, isAtomicRestore, e);
return new Status(ErrCode.COMMON_ERROR, "replace table from " + aliasName + " to " + originName
+ " failed, reason=" + e.getMessage());
} finally {
Expand All @@ -2492,8 +2526,8 @@ private Status atomicReplaceOlapTables(Database db, boolean isReplay) {
// The origin table is not used anymore, need to drop all its tablets.
originOlapTbl.writeLock();
try {
LOG.info("drop the origin olap table {} by atomic restore. table={}",
originOlapTbl.getName(), originOlapTbl.getId());
LOG.info("drop the origin olap table {}. table={}" + " isAtomicRestore: {}",
originOlapTbl.getName(), originOlapTbl.getId(), isAtomicRestore);
Env.getCurrentEnv().onEraseOlapTable(originOlapTbl, isReplay);
} finally {
originOlapTbl.writeUnlock();
Expand Down Expand Up @@ -2681,6 +2715,7 @@ private void readOthers(DataInput in) throws IOException {
isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE));
isForceReplace = Boolean.parseBoolean(properties.get(PROP_FORCE_REPLACE));
}

@Override
Expand All @@ -2691,6 +2726,7 @@ public void gsonPostProcess() throws IOException {
isCleanTables = Boolean.parseBoolean(properties.get(PROP_CLEAN_TABLES));
isCleanPartitions = Boolean.parseBoolean(properties.get(PROP_CLEAN_PARTITIONS));
isAtomicRestore = Boolean.parseBoolean(properties.get(PROP_ATOMIC_RESTORE));
isForceReplace = Boolean.parseBoolean(properties.get(PROP_FORCE_REPLACE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3005,6 +3005,9 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
if (request.isAtomicRestore()) {
properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true");
}
if (request.isForceReplace()) {
properties.put(RestoreStmt.PROP_FORCE_REPLACE, "true");
}

AbstractBackupTableRefClause restoreTableRefClause = null;
if (request.isSetTableRefs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ boolean await(long timeout, TimeUnit unit) {
db.unregisterTable(expectedRestoreTbl.getName());

job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, false, false,
env, repo.getId());

List<Table> tbls = Lists.newArrayList();
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ struct TRestoreSnapshotRequest {
14: optional bool clean_partitions
15: optional bool atomic_restore
16: optional bool compressed;
17: optional bool force_replace
}

struct TRestoreSnapshotResult {
Expand Down
Loading
Loading