Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
wyxxxcat committed Jan 23, 2025
1 parent c9cac8f commit 6bdc110
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
public static final String PROP_CLEAN_TABLES = "clean_tables";
public static final String PROP_CLEAN_PARTITIONS = "clean_partitions";
public static final String PROP_ATOMIC_RESTORE = "atomic_restore";
public static final String PROP_FORCE_REPLACE = "force_replace";

private boolean allowLoad = false;
private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
Expand All @@ -58,6 +59,7 @@ public class RestoreStmt extends AbstractBackupStmt implements NotFallbackInPars
private boolean isCleanTables = false;
private boolean isCleanPartitions = false;
private boolean isAtomicRestore = false;
private boolean isForceReplace = false;
private byte[] meta = null;
private byte[] jobInfo = null;

Expand Down Expand Up @@ -133,6 +135,10 @@ public boolean isAtomicRestore() {
return isAtomicRestore;
}

public boolean isForceReplace() {
return isForceReplace;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
Expand Down Expand Up @@ -219,6 +225,9 @@ public void analyzeProperties() throws AnalysisException {
// is atomic restore
isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore);

// is force replace
isForceReplace = eatBooleanProperty(copiedProperties, PROP_FORCE_REPLACE, isForceReplace);

if (!copiedProperties.isEmpty()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Unknown restore job properties: " + copiedProperties.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,14 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(), stmt.reserveColocate(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.isCleanTables(), stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.isForceReplace(),
env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveColocate(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(), stmt.isCleanTables(),
stmt.isCleanPartitions(), stmt.isAtomicRestore(),
stmt.isCleanPartitions(), stmt.isAtomicRestore(), stmt.isForceReplace(),
env, repository.getId());
}

Expand Down
140 changes: 94 additions & 46 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3005,6 +3005,9 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
if (request.isAtomicRestore()) {
properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true");
}
if (request.isForceReplace()) {
properties.put(RestoreStmt.PROP_FORCE_REPLACE, "true");
}

AbstractBackupTableRefClause restoreTableRefClause = null;
if (request.isSetTableRefs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ boolean await(long timeout, TimeUnit unit) {
db.unregisterTable(expectedRestoreTbl.getName());

job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, false,
new ReplicaAllocation((short) 3), 100000, -1, false, false, false, false, false, false, false, false,
env, repo.getId());

List<Table> tbls = Lists.newArrayList();
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ struct TRestoreSnapshotRequest {
14: optional bool clean_partitions
15: optional bool atomic_restore
16: optional bool compressed;
17: optional bool force_replace
}

struct TRestoreSnapshotResult {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_backup_restore_force_replace_diff_column", "backup_restore") {
String suiteName = "test_backup_restore_force_replace_diff_column"
String dbName = "${suiteName}_db_0"
String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
String snapshotName = "${suiteName}_snapshot_" + System.currentTimeMillis()
String tableNamePrefix = "${suiteName}_tables"
String tableName = "${tableNamePrefix}_0"

def syncer = getSyncer()
syncer.createS3Repository(repoName)
sql "DROP DATABASE IF EXISTS ${dbName}"
sql "CREATE DATABASE IF NOT EXISTS ${dbName}"

sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
sql """
CREATE TABLE ${dbName}.${tableName} (
`id` LARGEINT NOT NULL,
`count` LARGEINT SUM DEFAULT "0"
)
PARTITION BY RANGE(`id`)
(
)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES
(
"replication_num" = "1"
)
"""

sql """
BACKUP SNAPSHOT ${dbName}.${snapshotName}
TO `${repoName}`
ON (
${tableName}
)
"""

syncer.waitSnapshotFinish(dbName)

def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
assertTrue(snapshot != null)

sql "DROP TABLE ${dbName}.${tableName}"

sql """
CREATE TABLE ${dbName}.${tableName} (
`id` LARGEINT NOT NULL,
`count` LARGEINT DEFAULT "0",
`desc` VARCHAR(20) DEFAULT ""
)
PARTITION BY RANGE(`id`)
(
)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES
(
"replication_num" = "1"
)
"""

sql """
RESTORE SNAPSHOT ${dbName}.${snapshotName}
FROM `${repoName}`
PROPERTIES
(
"backup_timestamp" = "${snapshot}",
"reserve_replica" = "true",
"atomic_restore" = "true",
"force_replace" = "true"
)
"""

syncer.waitAllRestoreFinish(dbName)

sql "sync"
def desc_res = sql "desc ${dbName}.${tableName}"
assertEquals(desc_res.size(), 2)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_backup_restore_force_replace_diff_part_type", "backup_restore") {
String suiteName = "test_backup_restore_force_replace_diff_part_type"
String dbName = "${suiteName}_db_0"
String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
String snapshotName = "${suiteName}_snapshot_" + System.currentTimeMillis()
String tableNamePrefix = "${suiteName}_tables"
String tableName = "${tableNamePrefix}_0"

def syncer = getSyncer()
syncer.createS3Repository(repoName)
sql "DROP DATABASE IF EXISTS ${dbName}"
sql "CREATE DATABASE IF NOT EXISTS ${dbName}"

sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
sql """
CREATE TABLE ${dbName}.${tableName} (
`id` LARGEINT NOT NULL,
`count` LARGEINT SUM DEFAULT "0"
)
AGGREGATE KEY(`id`)
PARTITION BY LIST(`id`)
(
)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES
(
"replication_num" = "1"
)
"""

sql """
BACKUP SNAPSHOT ${dbName}.${snapshotName}
TO `${repoName}`
ON (
${tableName}
)
"""

syncer.waitSnapshotFinish(dbName)

def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
assertTrue(snapshot != null)

sql "DROP TABLE ${dbName}.${tableName}"

sql """
CREATE TABLE ${dbName}.${tableName} (
`id` LARGEINT NOT NULL,
`count` LARGEINT DEFAULT "0"
)
AGGREGATE KEY(`id`, `count`)
PARTITION BY RANGE(`id`)
(
)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES
(
"replication_num" = "1"
)
"""

sql """
RESTORE SNAPSHOT ${dbName}.${snapshotName}
FROM `${repoName}`
PROPERTIES
(
"backup_timestamp" = "${snapshot}",
"reserve_replica" = "true",
"atomic_restore" = "true",
"force_replace" = "true"
)
"""

syncer.waitAllRestoreFinish(dbName)

sql "sync"
def show_table = sql "show create table ${dbName}.${tableName}"
assertTrue(show_table[0][1].contains("PARTITION BY LIST (`id`)"))
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_backup_restore_force_replace_diff_part_val", "backup_restore") {
String suiteName = "test_backup_restore_force_replace_diff_part_val"
String dbName = "${suiteName}_db_0"
String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
String snapshotName = "${suiteName}_snapshot_" + System.currentTimeMillis()
String tableNamePrefix = "${suiteName}_tables"
String tableName = "${tableNamePrefix}_0"

def syncer = getSyncer()
syncer.createS3Repository(repoName)
sql "CREATE DATABASE IF NOT EXISTS ${dbName}"

sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
sql """
CREATE TABLE ${dbName}.${tableName} (
`id` LARGEINT NOT NULL,
`count` LARGEINT SUM DEFAULT "0"
)
AGGREGATE KEY(`id`)
PARTITION BY RANGE(`id`)
(
PARTITION p0 VALUES LESS THAN ("10")
)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES
(
"replication_num" = "1"
)
"""

sql """
BACKUP SNAPSHOT ${dbName}.${snapshotName}
TO `${repoName}`
ON (
${tableName}
)
"""

syncer.waitSnapshotFinish(dbName)

def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
assertTrue(snapshot != null)

sql "DROP TABLE ${dbName}.${tableName}"

sql """
CREATE TABLE ${dbName}.${tableName} (
`id` LARGEINT NOT NULL,
`count` LARGEINT DEFAULT "0"
)
AGGREGATE KEY(`id`, `count`)
PARTITION BY RANGE(`id`, `count`)
(
PARTITION p0 VALUES LESS THAN ("100")
)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES
(
"replication_num" = "1"
)
"""

sql """
RESTORE SNAPSHOT ${dbName}.${snapshotName}
FROM `${repoName}`
PROPERTIES
(
"backup_timestamp" = "${snapshot}",
"reserve_replica" = "true",
"atomic_restore" = "true",
"force_replace" = "true"
)
"""

syncer.waitAllRestoreFinish(dbName)

sql "sync"
def show_partitions = sql_return_maparray "SHOW PARTITIONS FROM ${dbName}.${tableName} where PartitionName = 'p0'"

logger.info(show_partitions.Range)

assertTrue(show_partitions.Range.contains("[types: [LARGEINT]; keys: [-170141183460469231731687303715884105728]; ..types: [LARGEINT]; keys: [10]; )"))
}

0 comments on commit 6bdc110

Please sign in to comment.