Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve call performance for sync_partition_metadata utility #18384

Merged
merged 3 commits into from
Jan 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class HdfsContext
private final Optional<String> tablePath;
// true if the table already exist in the metastore, false if the table is about to be created in the current transaction
private final Optional<Boolean> isNewTable;
private final Optional<Boolean> isPathValidationNeeded;
private final Optional<String> clientInfo;
private final Optional<Set<String>> clientTags;
private final Optional<ConnectorSession> session;
Expand All @@ -53,6 +54,7 @@ public HdfsContext(ConnectorIdentity identity)
this.tablePath = Optional.empty();
this.isNewTable = Optional.empty();
this.session = Optional.empty();
this.isPathValidationNeeded = Optional.empty();
}

/**
Expand Down Expand Up @@ -92,6 +94,22 @@ public HdfsContext(ConnectorSession session, String schemaName, String tableName
Optional.empty());
}

public HdfsContext(
ConnectorSession session,
String schemaName,
String tableName,
String tablePath,
boolean isNewTable,
boolean isPathValidationNeeded)
{
this(
session,
Optional.of(requireNonNull(schemaName, "schemaName is null")),
Optional.of(requireNonNull(tableName, "tableName is null")),
Optional.of(requireNonNull(tablePath, "tablePath is null")),
Optional.of(isNewTable),
Optional.of(isPathValidationNeeded));
}
public HdfsContext(
ConnectorSession session,
String schemaName,
Expand All @@ -104,7 +122,8 @@ public HdfsContext(
Optional.of(requireNonNull(schemaName, "schemaName is null")),
Optional.of(requireNonNull(tableName, "tableName is null")),
Optional.of(requireNonNull(tablePath, "tablePath is null")),
Optional.of(isNewTable));
Optional.of(isNewTable),
Optional.empty());
}

private HdfsContext(
Expand All @@ -113,6 +132,22 @@ private HdfsContext(
Optional<String> tableName,
Optional<String> tablePath,
Optional<Boolean> isNewTable)
{
this(
session,
schemaName,
tableName,
tablePath,
isNewTable,
Optional.empty());
}
private HdfsContext(
ConnectorSession session,
Optional<String> schemaName,
Optional<String> tableName,
Optional<String> tablePath,
Optional<Boolean> isNewTable,
Optional<Boolean> isPathValidationNeeded)
{
this.session = Optional.of(requireNonNull(session, "session is null"));
this.identity = requireNonNull(session.getIdentity(), "session.getIdentity() is null");
Expand All @@ -124,6 +159,7 @@ private HdfsContext(
this.clientTags = Optional.of(session.getClientTags());
this.tablePath = requireNonNull(tablePath, "tablePath is null");
this.isNewTable = requireNonNull(isNewTable, "isNewTable is null");
this.isPathValidationNeeded = requireNonNull(isPathValidationNeeded, "isPathValidationNeeded is null");
}

public ConnectorIdentity getIdentity()
Expand Down Expand Up @@ -176,6 +212,11 @@ public Optional<ConnectorSession> getSession()
return session;
}

public Optional<Boolean> getIsPathValidationNeeded()
{
return isPathValidationNeeded;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,11 @@ default List<TableConstraint<String>> getTableConstraints(MetastoreContext metas
{
return ImmutableList.of();
}

// Different metastore systems could implement this commit batch size differently based on different underlying database capacity.
// Default batch partition commit size is set to 10.
default int getPartitionCommitBatchSize()
{
return 10;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@
public class SemiTransactionalHiveMetastore
{
private static final Logger log = Logger.get(SemiTransactionalHiveMetastore.class);
private static final int PARTITION_COMMIT_BATCH_SIZE = 8;
private static final int MAX_LAST_DATA_COMMIT_TIME_ENTRY_PER_TABLE = 100;
private static final int MAX_LAST_DATA_COMMIT_TIME_ENTRY_PER_TRANSACTION = 10_000;

Expand Down Expand Up @@ -823,12 +822,39 @@ public synchronized void addPartition(
Partition partition,
Path currentLocation,
PartitionStatistics statistics)
{
addPartition(session, databaseName, tableName, tablePath, isNewTable, partition, currentLocation, statistics, false);
}

/**
* Add a new partition metadata in metastore
*
* @param session Connector level session
* @param databaseName Name of the schema
* @param tableName Name of the table
* @param tablePath Storage location of the table
* @param isNewTable The new partition is from an existing table or a new table
* @param partition The new partition object to be added
* @param currentLocation The path for which the partition is added in the table
* @param statistics The basic statistics and column statistics for the added partition
* @param noNeedToValidatePath check metastore file path. True for no check which is enabled by the sync partition code path only
*/
public synchronized void addPartition(
fgwang7w marked this conversation as resolved.
Show resolved Hide resolved
ConnectorSession session,
String databaseName,
String tableName,
String tablePath,
boolean isNewTable,
Partition partition,
Path currentLocation,
PartitionStatistics statistics,
boolean isPathValidationNeeded)
{
setShared();
checkArgument(getPrestoQueryId(partition).isPresent());
Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>());
Action<PartitionAndMore> oldPartitionAction = partitionActionsOfTable.get(partition.getValues());
HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, isNewTable);
HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, isNewTable, isPathValidationNeeded);
if (oldPartitionAction == null) {
partitionActionsOfTable.put(
partition.getValues(),
Expand Down Expand Up @@ -1486,21 +1512,25 @@ private void prepareAddPartition(MetastoreContext metastoreContext, HdfsContext

PartitionAdder partitionAdder = partitionAdders.computeIfAbsent(
partition.getSchemaTableName(),
ignored -> new PartitionAdder(metastoreContext, partition.getDatabaseName(), partition.getTableName(), delegate, PARTITION_COMMIT_BATCH_SIZE));

if (pathExists(context, hdfsEnvironment, currentPath)) {
if (!targetPath.equals(currentPath)) {
renameDirectory(
context,
hdfsEnvironment,
currentPath,
targetPath,
() -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)));
ignored -> new PartitionAdder(metastoreContext, partition.getDatabaseName(), partition.getTableName(), delegate));

// we can bypass the file storage path checking logic for sync partition code path
// because the file paths have been verified during early phase of the sync logic already
if (!context.getIsPathValidationNeeded().orElse(false)) {
if (pathExists(context, hdfsEnvironment, currentPath)) {
if (!targetPath.equals(currentPath)) {
renameDirectory(
context,
hdfsEnvironment,
currentPath,
targetPath,
() -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)));
}
}
else {
cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true));
createDirectory(context, hdfsEnvironment, targetPath);
}
}
else {
cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true));
createDirectory(context, hdfsEnvironment, targetPath);
}
String partitionName = getPartitionName(metastoreContext, partition.getDatabaseName(), partition.getTableName(), partition.getValues());
partitionAdder.addPartition(new PartitionWithStatistics(partition, partitionName, partitionAndMore.getStatisticsUpdate()));
Expand Down Expand Up @@ -2930,13 +2960,13 @@ private static class PartitionAdder
private List<List<String>> createdPartitionValues = new ArrayList<>();
private List<MetastoreOperationResult> operationResults;

public PartitionAdder(MetastoreContext metastoreContext, String schemaName, String tableName, ExtendedHiveMetastore metastore, int batchSize)
public PartitionAdder(MetastoreContext metastoreContext, String schemaName, String tableName, ExtendedHiveMetastore metastore)
{
this.metastoreContext = requireNonNull(metastoreContext, "metastoreContext is null");
this.schemaName = schemaName;
this.tableName = tableName;
this.metastore = metastore;
this.batchSize = batchSize;
this.batchSize = metastore.getPartitionCommitBatchSize();
this.partitions = new ArrayList<>(batchSize);
this.operationResults = new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,24 @@ public class GlueHiveMetastore
private static final String PUBLIC_ROLE_NAME = "public";
private static final String DEFAULT_METASTORE_USER = "presto";
private static final String WILDCARD_EXPRESSION = "";

// This is the total number of partitions allowed to process in a big batch chunk which splits multiple smaller batch of partitions allowed by BATCH_CREATE_PARTITION_MAX_PAGE_SIZE
// Here's an example diagram on how async batches are handled for Create Partition:
// |--------BATCH_CREATE_PARTITION_MAX_PAGE_SIZE------------| ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// | p0, p1, p2 ..................................... p99 |
// |--------------------------------------------------------|
// | p0, p1, p2 ..................................... p99 |
// |--------------------------------------------------------|
// BATCH_PARTITION_COMMIT_TOTAL_SIZE / BATCH_CREATE_PARTITION_MAX_PAGE_SIZE ..... (10k/100=100 batches)
// |--------------------------------------------------------|++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// | p0, p1, p2 ..................................... p99 |
// |--------------------------------------------------------|
// | p0, p1, p2 ..................................... p99 |
// |--------------------------------------------------------|.......... (100 batches)
// |--------------------------------------------------------|++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
private static final int BATCH_PARTITION_COMMIT_TOTAL_SIZE = 10000;
private static final int BATCH_GET_PARTITION_MAX_PAGE_SIZE = 1000;
// this is the total number of partitions allowed per batch that glue metastore can process to create partitions
private static final int BATCH_CREATE_PARTITION_MAX_PAGE_SIZE = 100;
private static final int AWS_GLUE_GET_PARTITIONS_MAX_RESULTS = 1000;
private static final Comparator<Partition> PARTITION_COMPARATOR = comparing(Partition::getValues, lexicographical(String.CASE_INSENSITIVE_ORDER));
Expand Down Expand Up @@ -226,6 +243,14 @@ public GlueMetastoreStats getStats()
return stats;
}

// For Glue metastore there's an upper bound limit on 100 partitions per batch.
// Here's the reference: https://docs.aws.amazon.com/glue/latest/webapi/API_BatchCreatePartition.html
@Override
public int getPartitionCommitBatchSize()
{
return BATCH_PARTITION_COMMIT_TOTAL_SIZE;
}

@Override
public Optional<Database> getDatabase(MetastoreContext metastoreContext, String databaseName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.lang.invoke.MethodHandle;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand All @@ -54,13 +56,16 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Lists.partition;
import static java.lang.Boolean.TRUE;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class SyncPartitionMetadataProcedure
implements Provider<Procedure>
{
private static final int GET_PARTITION_BY_NAMES_BATCH_SIZE = 1000;

public enum SyncMode
{
ADD, DROP, FULL
Expand Down Expand Up @@ -114,7 +119,15 @@ private void doSyncPartitionMetadata(ConnectorSession session, String schemaName
SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.get().getMetastore();
SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName);

Table table = metastore.getTable(new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), metastore.getColumnConverterProvider()), schemaName, tableName)
MetastoreContext metastoreContext = new MetastoreContext(
session.getIdentity(),
session.getQueryId(),
session.getClientInfo(),
session.getSource(),
getMetastoreHeaders(session),
isUserDefinedTypeEncodingEnabled(session),
metastore.getColumnConverterProvider());
Table table = metastore.getTable(metastoreContext, schemaName, tableName)
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
if (table.getPartitionColumns().isEmpty()) {
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Table is not partitioned: " + schemaTableName);
Expand All @@ -127,17 +140,26 @@ private void doSyncPartitionMetadata(ConnectorSession session, String schemaName

try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, tableLocation);
List<String> partitionsInMetastore = metastore.getPartitionNames(new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), metastore.getColumnConverterProvider()), schemaName, tableName)
List<String> partitionNamesInMetastore = metastore.getPartitionNames(metastoreContext, schemaName, tableName)
fgwang7w marked this conversation as resolved.
Show resolved Hide resolved
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
ImmutableList.Builder<String> partitionsInMetastore = new ImmutableList.Builder<>();
for (List<String> batchPartitionNames : partition(partitionNamesInMetastore, GET_PARTITION_BY_NAMES_BATCH_SIZE)) {
Map<String, Optional<Partition>> partitionsOptionalMap = metastore.getPartitionsByNames(metastoreContext, schemaName, tableName, batchPartitionNames);
for (Map.Entry<String, Optional<Partition>> entry : partitionsOptionalMap.entrySet()) {
if (entry.getValue().isPresent()) {
partitionsInMetastore.add(tableLocation.toUri().relativize(new Path(entry.getValue().get().getStorage().getLocation()).toUri()).getPath());
}
}
}
List<String> partitionsInFileSystem = listDirectory(fileSystem, fileSystem.getFileStatus(tableLocation), table.getPartitionColumns(), table.getPartitionColumns().size(), caseSensitive).stream()
.map(fileStatus -> fileStatus.getPath().toUri())
.map(uri -> tableLocation.toUri().relativize(uri).getPath())
.collect(toImmutableList());

// partitions in file system but not in metastore
partitionsToAdd = difference(partitionsInFileSystem, partitionsInMetastore);
partitionsToAdd = difference(partitionsInFileSystem, partitionsInMetastore.build());
// partitions in metastore but not in file system
partitionsToDrop = difference(partitionsInMetastore, partitionsInFileSystem);
partitionsToDrop = difference(partitionsInMetastore.build(), partitionsInFileSystem);
}
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, e);
Expand Down Expand Up @@ -211,7 +233,8 @@ private static void addPartitions(
false,
buildPartitionObject(session, table, name),
new Path(table.getStorage().getLocation(), name),
PartitionStatistics.empty());
PartitionStatistics.empty(),
true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,33 @@ public void testConflictingMixedCasePartitionNames()
assertPartitions(tableName, row("a", "1"), row("b", "2"));
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
public void testAddPartitionNeedsUrlEncoding()
{
String tableName = "test_sync_partition_add_partition_url_encoding";
String mirrorTableName = "test_sync_partition_add_partition_url_encoding_mirror";

query("CREATE TABLE " + tableName + " (payload bigint, col_x varchar, col_y varchar) " +
"WITH (format = 'ORC', partitioned_by = ARRAY[ 'col_x', 'col_y' ])");
query("INSERT INTO " + tableName + " VALUES (1, 'a', '1'), (2, 'b', '2')");

String tableLocation = WAREHOUSE_DIRECTORY_PATH + tableName;

query("CREATE TABLE " + mirrorTableName + " (payload bigint, col_x varchar, col_y varchar) " +
"WITH (external_location = '" + tableLocation + "', format = 'ORC', partitioned_by = ARRAY[ 'col_x', 'col_y' ])");
query("CALL system.sync_partition_metadata('default', '" + mirrorTableName + "', 'ADD')");

assertPartitions(tableName, row("a", "1"), row("b", "2"));
assertPartitions(mirrorTableName, row("a", "1"), row("b", "2"));

query("INSERT INTO " + tableName + " VALUES (3, 'c', '3')");
assertPartitions(tableName, row("a", "1"), row("b", "2"), row("c", "3"));
assertPartitions(mirrorTableName, row("a", "1"), row("b", "2"));

query("CALL system.sync_partition_metadata('default', '" + mirrorTableName + "', 'ADD')");
assertPartitions(mirrorTableName, row("a", "1"), row("b", "2"), row("c", "3"));
}

private static void prepare(HdfsClient hdfsClient, HdfsDataSourceWriter hdfsDataSourceWriter, String tableName)
{
query("DROP TABLE IF EXISTS " + tableName);
Expand Down