From 3de10b54fb38999fffd015468a897f3bed0e690e Mon Sep 17 00:00:00 2001 From: George Wang Date: Sun, 21 Aug 2022 22:06:31 -0700 Subject: [PATCH 1/3] Use correct API to get partition names in batch for sync partition call Current implementation for getting partition names takes partition values directly without take into account of the actual path. According to the [hive source code](shorturl.at/evT28), this logic may change in the future. This commit updates the partition values with the relative path acquired from the metastore APIs. --- .../hive/SyncPartitionMetadataProcedure.java | 30 ++++++++++++++++--- .../tests/hive/TestSyncPartitionMetadata.java | 27 +++++++++++++++++ 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java b/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java index aae304ba0898c..9f83a64efb5dd 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java @@ -40,6 +40,8 @@ import java.lang.invoke.MethodHandle; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import java.util.stream.Stream; @@ -54,6 +56,7 @@ import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled; import static com.facebook.presto.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Lists.partition; import static java.lang.Boolean.TRUE; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -61,6 +64,8 @@ public class SyncPartitionMetadataProcedure implements Provider { + private static final int GET_PARTITION_BY_NAMES_BATCH_SIZE = 1000; + public enum SyncMode { ADD, DROP, FULL @@ -114,7 +119,15 @@ private void doSyncPartitionMetadata(ConnectorSession session, String schemaName SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.get().getMetastore(); SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); - Table table = metastore.getTable(new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), metastore.getColumnConverterProvider()), schemaName, tableName) + MetastoreContext metastoreContext = new MetastoreContext( + session.getIdentity(), + session.getQueryId(), + session.getClientInfo(), + session.getSource(), + getMetastoreHeaders(session), + isUserDefinedTypeEncodingEnabled(session), + metastore.getColumnConverterProvider()); + Table table = metastore.getTable(metastoreContext, schemaName, tableName) .orElseThrow(() -> new TableNotFoundException(schemaTableName)); if (table.getPartitionColumns().isEmpty()) { throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Table is not partitioned: " + schemaTableName); @@ -127,17 +140,26 @@ private void doSyncPartitionMetadata(ConnectorSession session, String schemaName try { FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, tableLocation); - List partitionsInMetastore = metastore.getPartitionNames(new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), metastore.getColumnConverterProvider()), schemaName, tableName) + List partitionNamesInMetastore = metastore.getPartitionNames(metastoreContext, schemaName, tableName) .orElseThrow(() -> new TableNotFoundException(schemaTableName)); + ImmutableList.Builder partitionsInMetastore = new ImmutableList.Builder<>(); + for (List batchPartitionNames : partition(partitionNamesInMetastore, GET_PARTITION_BY_NAMES_BATCH_SIZE)) { + Map> partitionsOptionalMap = metastore.getPartitionsByNames(metastoreContext, schemaName, tableName, batchPartitionNames); + for (Map.Entry> entry : partitionsOptionalMap.entrySet()) { + if (entry.getValue().isPresent()) { + partitionsInMetastore.add(tableLocation.toUri().relativize(new Path(entry.getValue().get().getStorage().getLocation()).toUri()).getPath()); + } + } + } List partitionsInFileSystem = listDirectory(fileSystem, fileSystem.getFileStatus(tableLocation), table.getPartitionColumns(), table.getPartitionColumns().size(), caseSensitive).stream() .map(fileStatus -> fileStatus.getPath().toUri()) .map(uri -> tableLocation.toUri().relativize(uri).getPath()) .collect(toImmutableList()); // partitions in file system but not in metastore - partitionsToAdd = difference(partitionsInFileSystem, partitionsInMetastore); + partitionsToAdd = difference(partitionsInFileSystem, partitionsInMetastore.build()); // partitions in metastore but not in file system - partitionsToDrop = difference(partitionsInMetastore, partitionsInFileSystem); + partitionsToDrop = difference(partitionsInMetastore.build(), partitionsInFileSystem); } catch (IOException e) { throw new PrestoException(HIVE_FILESYSTEM_ERROR, e); diff --git a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestSyncPartitionMetadata.java b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestSyncPartitionMetadata.java index 108b4035cd99f..0ca528574a4e3 100644 --- a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestSyncPartitionMetadata.java +++ b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestSyncPartitionMetadata.java @@ -126,6 +126,33 @@ public void testConflictingMixedCasePartitionNames() assertPartitions(tableName, row("a", "1"), row("b", "2")); } + @Test(groups = {HIVE_PARTITIONING, SMOKE}) + public void testAddPartitionNeedsUrlEncoding() + { + String tableName = "test_sync_partition_add_partition_url_encoding"; + String mirrorTableName = "test_sync_partition_add_partition_url_encoding_mirror"; + + query("CREATE TABLE " + tableName + " (payload bigint, col_x varchar, col_y varchar) " + + "WITH (format = 'ORC', partitioned_by = ARRAY[ 'col_x', 'col_y' ])"); + query("INSERT INTO " + tableName + " VALUES (1, 'a', '1'), (2, 'b', '2')"); + + String tableLocation = WAREHOUSE_DIRECTORY_PATH + tableName; + + query("CREATE TABLE " + mirrorTableName + " (payload bigint, col_x varchar, col_y varchar) " + + "WITH (external_location = '" + tableLocation + "', format = 'ORC', partitioned_by = ARRAY[ 'col_x', 'col_y' ])"); + query("CALL system.sync_partition_metadata('default', '" + mirrorTableName + "', 'ADD')"); + + assertPartitions(tableName, row("a", "1"), row("b", "2")); + assertPartitions(mirrorTableName, row("a", "1"), row("b", "2")); + + query("INSERT INTO " + tableName + " VALUES (3, 'c', '3')"); + assertPartitions(tableName, row("a", "1"), row("b", "2"), row("c", "3")); + assertPartitions(mirrorTableName, row("a", "1"), row("b", "2")); + + query("CALL system.sync_partition_metadata('default', '" + mirrorTableName + "', 'ADD')"); + assertPartitions(mirrorTableName, row("a", "1"), row("b", "2"), row("c", "3")); + } + private static void prepare(HdfsClient hdfsClient, HdfsDataSourceWriter hdfsDataSourceWriter, String tableName) { query("DROP TABLE IF EXISTS " + tableName); From 08c4ec0ed9e40aaaa22d97ba9645ad435c9441b8 Mon Sep 17 00:00:00 2001 From: George Wang Date: Tue, 6 Sep 2022 18:08:36 -0700 Subject: [PATCH 2/3] Bypass file path check for sync partition --- .../com/facebook/presto/hive/HdfsContext.java | 43 +++++++++++++- .../SemiTransactionalHiveMetastore.java | 57 ++++++++++++++----- .../hive/SyncPartitionMetadataProcedure.java | 3 +- 3 files changed, 88 insertions(+), 15 deletions(-) diff --git a/presto-hive-common/src/main/java/com/facebook/presto/hive/HdfsContext.java b/presto-hive-common/src/main/java/com/facebook/presto/hive/HdfsContext.java index 3209ccdeca672..fe30292219ea3 100644 --- a/presto-hive-common/src/main/java/com/facebook/presto/hive/HdfsContext.java +++ b/presto-hive-common/src/main/java/com/facebook/presto/hive/HdfsContext.java @@ -32,6 +32,7 @@ public class HdfsContext private final Optional tablePath; // true if the table already exist in the metastore, false if the table is about to be created in the current transaction private final Optional isNewTable; + private final Optional isPathValidationNeeded; private final Optional clientInfo; private final Optional> clientTags; private final Optional session; @@ -53,6 +54,7 @@ public HdfsContext(ConnectorIdentity identity) this.tablePath = Optional.empty(); this.isNewTable = Optional.empty(); this.session = Optional.empty(); + this.isPathValidationNeeded = Optional.empty(); } /** @@ -92,6 +94,22 @@ public HdfsContext(ConnectorSession session, String schemaName, String tableName Optional.empty()); } + public HdfsContext( + ConnectorSession session, + String schemaName, + String tableName, + String tablePath, + boolean isNewTable, + boolean isPathValidationNeeded) + { + this( + session, + Optional.of(requireNonNull(schemaName, "schemaName is null")), + Optional.of(requireNonNull(tableName, "tableName is null")), + Optional.of(requireNonNull(tablePath, "tablePath is null")), + Optional.of(isNewTable), + Optional.of(isPathValidationNeeded)); + } public HdfsContext( ConnectorSession session, String schemaName, @@ -104,7 +122,8 @@ public HdfsContext( Optional.of(requireNonNull(schemaName, "schemaName is null")), Optional.of(requireNonNull(tableName, "tableName is null")), Optional.of(requireNonNull(tablePath, "tablePath is null")), - Optional.of(isNewTable)); + Optional.of(isNewTable), + Optional.empty()); } private HdfsContext( @@ -113,6 +132,22 @@ private HdfsContext( Optional tableName, Optional tablePath, Optional isNewTable) + { + this( + session, + schemaName, + tableName, + tablePath, + isNewTable, + Optional.empty()); + } + private HdfsContext( + ConnectorSession session, + Optional schemaName, + Optional tableName, + Optional tablePath, + Optional isNewTable, + Optional isPathValidationNeeded) { this.session = Optional.of(requireNonNull(session, "session is null")); this.identity = requireNonNull(session.getIdentity(), "session.getIdentity() is null"); @@ -124,6 +159,7 @@ private HdfsContext( this.clientTags = Optional.of(session.getClientTags()); this.tablePath = requireNonNull(tablePath, "tablePath is null"); this.isNewTable = requireNonNull(isNewTable, "isNewTable is null"); + this.isPathValidationNeeded = requireNonNull(isPathValidationNeeded, "isPathValidationNeeded is null"); } public ConnectorIdentity getIdentity() @@ -176,6 +212,11 @@ public Optional getSession() return session; } + public Optional getIsPathValidationNeeded() + { + return isPathValidationNeeded; + } + @Override public String toString() { diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java index 914960d90288c..385cfba19c20c 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java @@ -823,12 +823,39 @@ public synchronized void addPartition( Partition partition, Path currentLocation, PartitionStatistics statistics) + { + addPartition(session, databaseName, tableName, tablePath, isNewTable, partition, currentLocation, statistics, false); + } + + /** + * Add a new partition metadata in metastore + * + * @param session Connector level session + * @param databaseName Name of the schema + * @param tableName Name of the table + * @param tablePath Storage location of the table + * @param isNewTable The new partition is from an existing table or a new table + * @param partition The new partition object to be added + * @param currentLocation The path for which the partition is added in the table + * @param statistics The basic statistics and column statistics for the added partition + * @param noNeedToValidatePath check metastore file path. True for no check which is enabled by the sync partition code path only + */ + public synchronized void addPartition( + ConnectorSession session, + String databaseName, + String tableName, + String tablePath, + boolean isNewTable, + Partition partition, + Path currentLocation, + PartitionStatistics statistics, + boolean isPathValidationNeeded) { setShared(); checkArgument(getPrestoQueryId(partition).isPresent()); Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>()); Action oldPartitionAction = partitionActionsOfTable.get(partition.getValues()); - HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, isNewTable); + HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, isNewTable, isPathValidationNeeded); if (oldPartitionAction == null) { partitionActionsOfTable.put( partition.getValues(), @@ -1488,19 +1515,23 @@ private void prepareAddPartition(MetastoreContext metastoreContext, HdfsContext partition.getSchemaTableName(), ignored -> new PartitionAdder(metastoreContext, partition.getDatabaseName(), partition.getTableName(), delegate, PARTITION_COMMIT_BATCH_SIZE)); - if (pathExists(context, hdfsEnvironment, currentPath)) { - if (!targetPath.equals(currentPath)) { - renameDirectory( - context, - hdfsEnvironment, - currentPath, - targetPath, - () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true))); + // we can bypass the file storage path checking logic for sync partition code path + // because the file paths have been verified during early phase of the sync logic already + if (!context.getIsPathValidationNeeded().orElse(false)) { + if (pathExists(context, hdfsEnvironment, currentPath)) { + if (!targetPath.equals(currentPath)) { + renameDirectory( + context, + hdfsEnvironment, + currentPath, + targetPath, + () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true))); + } + } + else { + cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)); + createDirectory(context, hdfsEnvironment, targetPath); } - } - else { - cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)); - createDirectory(context, hdfsEnvironment, targetPath); } String partitionName = getPartitionName(metastoreContext, partition.getDatabaseName(), partition.getTableName(), partition.getValues()); partitionAdder.addPartition(new PartitionWithStatistics(partition, partitionName, partitionAndMore.getStatisticsUpdate())); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java b/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java index 9f83a64efb5dd..ee99489d370a7 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java @@ -233,7 +233,8 @@ private static void addPartitions( false, buildPartitionObject(session, table, name), new Path(table.getStorage().getLocation(), name), - PartitionStatistics.empty()); + PartitionStatistics.empty(), + true); } } From fa39505946c826d460ac77bbe35a3c6127b94d80 Mon Sep 17 00:00:00 2001 From: George Wang Date: Tue, 13 Sep 2022 11:16:47 -0700 Subject: [PATCH 3/3] Add a metastore interface to control add partition commit batch size MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reference to [HMS Repair Utility](shorturl.at/eHIXY), the size for adding partitions can be in a range between 1 and 2,147,483,647. For [Glue metastore](shorturl.at/pqDH9), it’s bounded by 100 for write access. --- .../hive/metastore/ExtendedHiveMetastore.java | 7 ++++++ .../SemiTransactionalHiveMetastore.java | 7 +++--- .../metastore/glue/GlueHiveMetastore.java | 25 +++++++++++++++++++ 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java index c08bae8d12481..ef0cda8a4a8b4 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java @@ -145,4 +145,11 @@ default List> getTableConstraints(MetastoreContext metas { return ImmutableList.of(); } + + // Different metastore systems could implement this commit batch size differently based on different underlying database capacity. + // Default batch partition commit size is set to 10. + default int getPartitionCommitBatchSize() + { + return 10; + } } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java index 385cfba19c20c..5d1985462bf44 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java @@ -115,7 +115,6 @@ public class SemiTransactionalHiveMetastore { private static final Logger log = Logger.get(SemiTransactionalHiveMetastore.class); - private static final int PARTITION_COMMIT_BATCH_SIZE = 8; private static final int MAX_LAST_DATA_COMMIT_TIME_ENTRY_PER_TABLE = 100; private static final int MAX_LAST_DATA_COMMIT_TIME_ENTRY_PER_TRANSACTION = 10_000; @@ -1513,7 +1512,7 @@ private void prepareAddPartition(MetastoreContext metastoreContext, HdfsContext PartitionAdder partitionAdder = partitionAdders.computeIfAbsent( partition.getSchemaTableName(), - ignored -> new PartitionAdder(metastoreContext, partition.getDatabaseName(), partition.getTableName(), delegate, PARTITION_COMMIT_BATCH_SIZE)); + ignored -> new PartitionAdder(metastoreContext, partition.getDatabaseName(), partition.getTableName(), delegate)); // we can bypass the file storage path checking logic for sync partition code path // because the file paths have been verified during early phase of the sync logic already @@ -2961,13 +2960,13 @@ private static class PartitionAdder private List> createdPartitionValues = new ArrayList<>(); private List operationResults; - public PartitionAdder(MetastoreContext metastoreContext, String schemaName, String tableName, ExtendedHiveMetastore metastore, int batchSize) + public PartitionAdder(MetastoreContext metastoreContext, String schemaName, String tableName, ExtendedHiveMetastore metastore) { this.metastoreContext = requireNonNull(metastoreContext, "metastoreContext is null"); this.schemaName = schemaName; this.tableName = tableName; this.metastore = metastore; - this.batchSize = batchSize; + this.batchSize = metastore.getPartitionCommitBatchSize(); this.partitions = new ArrayList<>(batchSize); this.operationResults = new ArrayList<>(); } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java index be4cca8247d60..6cf88de5673ea 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java @@ -150,7 +150,24 @@ public class GlueHiveMetastore private static final String PUBLIC_ROLE_NAME = "public"; private static final String DEFAULT_METASTORE_USER = "presto"; private static final String WILDCARD_EXPRESSION = ""; + + // This is the total number of partitions allowed to process in a big batch chunk which splits multiple smaller batch of partitions allowed by BATCH_CREATE_PARTITION_MAX_PAGE_SIZE + // Here's an example diagram on how async batches are handled for Create Partition: + // |--------BATCH_CREATE_PARTITION_MAX_PAGE_SIZE------------| ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + // | p0, p1, p2 ..................................... p99 | + // |--------------------------------------------------------| + // | p0, p1, p2 ..................................... p99 | + // |--------------------------------------------------------| + // BATCH_PARTITION_COMMIT_TOTAL_SIZE / BATCH_CREATE_PARTITION_MAX_PAGE_SIZE ..... (10k/100=100 batches) + // |--------------------------------------------------------|++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + // | p0, p1, p2 ..................................... p99 | + // |--------------------------------------------------------| + // | p0, p1, p2 ..................................... p99 | + // |--------------------------------------------------------|.......... (100 batches) + // |--------------------------------------------------------|++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + private static final int BATCH_PARTITION_COMMIT_TOTAL_SIZE = 10000; private static final int BATCH_GET_PARTITION_MAX_PAGE_SIZE = 1000; + // this is the total number of partitions allowed per batch that glue metastore can process to create partitions private static final int BATCH_CREATE_PARTITION_MAX_PAGE_SIZE = 100; private static final int AWS_GLUE_GET_PARTITIONS_MAX_RESULTS = 1000; private static final Comparator PARTITION_COMPARATOR = comparing(Partition::getValues, lexicographical(String.CASE_INSENSITIVE_ORDER)); @@ -226,6 +243,14 @@ public GlueMetastoreStats getStats() return stats; } + // For Glue metastore there's an upper bound limit on 100 partitions per batch. + // Here's the reference: https://docs.aws.amazon.com/glue/latest/webapi/API_BatchCreatePartition.html + @Override + public int getPartitionCommitBatchSize() + { + return BATCH_PARTITION_COMMIT_TOTAL_SIZE; + } + @Override public Optional getDatabase(MetastoreContext metastoreContext, String databaseName) {