diff --git a/presto-hive-common/src/main/java/com/facebook/presto/hive/HdfsContext.java b/presto-hive-common/src/main/java/com/facebook/presto/hive/HdfsContext.java index 3209ccdeca672..fe30292219ea3 100644 --- a/presto-hive-common/src/main/java/com/facebook/presto/hive/HdfsContext.java +++ b/presto-hive-common/src/main/java/com/facebook/presto/hive/HdfsContext.java @@ -32,6 +32,7 @@ public class HdfsContext private final Optional tablePath; // true if the table already exist in the metastore, false if the table is about to be created in the current transaction private final Optional isNewTable; + private final Optional isPathValidationNeeded; private final Optional clientInfo; private final Optional> clientTags; private final Optional session; @@ -53,6 +54,7 @@ public HdfsContext(ConnectorIdentity identity) this.tablePath = Optional.empty(); this.isNewTable = Optional.empty(); this.session = Optional.empty(); + this.isPathValidationNeeded = Optional.empty(); } /** @@ -92,6 +94,22 @@ public HdfsContext(ConnectorSession session, String schemaName, String tableName Optional.empty()); } + public HdfsContext( + ConnectorSession session, + String schemaName, + String tableName, + String tablePath, + boolean isNewTable, + boolean isPathValidationNeeded) + { + this( + session, + Optional.of(requireNonNull(schemaName, "schemaName is null")), + Optional.of(requireNonNull(tableName, "tableName is null")), + Optional.of(requireNonNull(tablePath, "tablePath is null")), + Optional.of(isNewTable), + Optional.of(isPathValidationNeeded)); + } public HdfsContext( ConnectorSession session, String schemaName, @@ -104,7 +122,8 @@ public HdfsContext( Optional.of(requireNonNull(schemaName, "schemaName is null")), Optional.of(requireNonNull(tableName, "tableName is null")), Optional.of(requireNonNull(tablePath, "tablePath is null")), - Optional.of(isNewTable)); + Optional.of(isNewTable), + Optional.empty()); } private HdfsContext( @@ -113,6 +132,22 @@ private HdfsContext( Optional tableName, Optional tablePath, Optional isNewTable) + { + this( + session, + schemaName, + tableName, + tablePath, + isNewTable, + Optional.empty()); + } + private HdfsContext( + ConnectorSession session, + Optional schemaName, + Optional tableName, + Optional tablePath, + Optional isNewTable, + Optional isPathValidationNeeded) { this.session = Optional.of(requireNonNull(session, "session is null")); this.identity = requireNonNull(session.getIdentity(), "session.getIdentity() is null"); @@ -124,6 +159,7 @@ private HdfsContext( this.clientTags = Optional.of(session.getClientTags()); this.tablePath = requireNonNull(tablePath, "tablePath is null"); this.isNewTable = requireNonNull(isNewTable, "isNewTable is null"); + this.isPathValidationNeeded = requireNonNull(isPathValidationNeeded, "isPathValidationNeeded is null"); } public ConnectorIdentity getIdentity() @@ -176,6 +212,11 @@ public Optional getSession() return session; } + public Optional getIsPathValidationNeeded() + { + return isPathValidationNeeded; + } + @Override public String toString() { diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java index c08bae8d12481..ef0cda8a4a8b4 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java @@ -145,4 +145,11 @@ default List> getTableConstraints(MetastoreContext metas { return ImmutableList.of(); } + + // Different metastore systems could implement this commit batch size differently based on different underlying database capacity. + // Default batch partition commit size is set to 10. + default int getPartitionCommitBatchSize() + { + return 10; + } } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java index 914960d90288c..5d1985462bf44 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java @@ -115,7 +115,6 @@ public class SemiTransactionalHiveMetastore { private static final Logger log = Logger.get(SemiTransactionalHiveMetastore.class); - private static final int PARTITION_COMMIT_BATCH_SIZE = 8; private static final int MAX_LAST_DATA_COMMIT_TIME_ENTRY_PER_TABLE = 100; private static final int MAX_LAST_DATA_COMMIT_TIME_ENTRY_PER_TRANSACTION = 10_000; @@ -823,12 +822,39 @@ public synchronized void addPartition( Partition partition, Path currentLocation, PartitionStatistics statistics) + { + addPartition(session, databaseName, tableName, tablePath, isNewTable, partition, currentLocation, statistics, false); + } + + /** + * Add a new partition metadata in metastore + * + * @param session Connector level session + * @param databaseName Name of the schema + * @param tableName Name of the table + * @param tablePath Storage location of the table + * @param isNewTable The new partition is from an existing table or a new table + * @param partition The new partition object to be added + * @param currentLocation The path for which the partition is added in the table + * @param statistics The basic statistics and column statistics for the added partition + * @param noNeedToValidatePath check metastore file path. True for no check which is enabled by the sync partition code path only + */ + public synchronized void addPartition( + ConnectorSession session, + String databaseName, + String tableName, + String tablePath, + boolean isNewTable, + Partition partition, + Path currentLocation, + PartitionStatistics statistics, + boolean isPathValidationNeeded) { setShared(); checkArgument(getPrestoQueryId(partition).isPresent()); Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>()); Action oldPartitionAction = partitionActionsOfTable.get(partition.getValues()); - HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, isNewTable); + HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, isNewTable, isPathValidationNeeded); if (oldPartitionAction == null) { partitionActionsOfTable.put( partition.getValues(), @@ -1486,21 +1512,25 @@ private void prepareAddPartition(MetastoreContext metastoreContext, HdfsContext PartitionAdder partitionAdder = partitionAdders.computeIfAbsent( partition.getSchemaTableName(), - ignored -> new PartitionAdder(metastoreContext, partition.getDatabaseName(), partition.getTableName(), delegate, PARTITION_COMMIT_BATCH_SIZE)); - - if (pathExists(context, hdfsEnvironment, currentPath)) { - if (!targetPath.equals(currentPath)) { - renameDirectory( - context, - hdfsEnvironment, - currentPath, - targetPath, - () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true))); + ignored -> new PartitionAdder(metastoreContext, partition.getDatabaseName(), partition.getTableName(), delegate)); + + // we can bypass the file storage path checking logic for sync partition code path + // because the file paths have been verified during early phase of the sync logic already + if (!context.getIsPathValidationNeeded().orElse(false)) { + if (pathExists(context, hdfsEnvironment, currentPath)) { + if (!targetPath.equals(currentPath)) { + renameDirectory( + context, + hdfsEnvironment, + currentPath, + targetPath, + () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true))); + } + } + else { + cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)); + createDirectory(context, hdfsEnvironment, targetPath); } - } - else { - cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)); - createDirectory(context, hdfsEnvironment, targetPath); } String partitionName = getPartitionName(metastoreContext, partition.getDatabaseName(), partition.getTableName(), partition.getValues()); partitionAdder.addPartition(new PartitionWithStatistics(partition, partitionName, partitionAndMore.getStatisticsUpdate())); @@ -2930,13 +2960,13 @@ private static class PartitionAdder private List> createdPartitionValues = new ArrayList<>(); private List operationResults; - public PartitionAdder(MetastoreContext metastoreContext, String schemaName, String tableName, ExtendedHiveMetastore metastore, int batchSize) + public PartitionAdder(MetastoreContext metastoreContext, String schemaName, String tableName, ExtendedHiveMetastore metastore) { this.metastoreContext = requireNonNull(metastoreContext, "metastoreContext is null"); this.schemaName = schemaName; this.tableName = tableName; this.metastore = metastore; - this.batchSize = batchSize; + this.batchSize = metastore.getPartitionCommitBatchSize(); this.partitions = new ArrayList<>(batchSize); this.operationResults = new ArrayList<>(); } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java index be4cca8247d60..6cf88de5673ea 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/glue/GlueHiveMetastore.java @@ -150,7 +150,24 @@ public class GlueHiveMetastore private static final String PUBLIC_ROLE_NAME = "public"; private static final String DEFAULT_METASTORE_USER = "presto"; private static final String WILDCARD_EXPRESSION = ""; + + // This is the total number of partitions allowed to process in a big batch chunk which splits multiple smaller batch of partitions allowed by BATCH_CREATE_PARTITION_MAX_PAGE_SIZE + // Here's an example diagram on how async batches are handled for Create Partition: + // |--------BATCH_CREATE_PARTITION_MAX_PAGE_SIZE------------| ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + // | p0, p1, p2 ..................................... p99 | + // |--------------------------------------------------------| + // | p0, p1, p2 ..................................... p99 | + // |--------------------------------------------------------| + // BATCH_PARTITION_COMMIT_TOTAL_SIZE / BATCH_CREATE_PARTITION_MAX_PAGE_SIZE ..... (10k/100=100 batches) + // |--------------------------------------------------------|++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + // | p0, p1, p2 ..................................... p99 | + // |--------------------------------------------------------| + // | p0, p1, p2 ..................................... p99 | + // |--------------------------------------------------------|.......... (100 batches) + // |--------------------------------------------------------|++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + private static final int BATCH_PARTITION_COMMIT_TOTAL_SIZE = 10000; private static final int BATCH_GET_PARTITION_MAX_PAGE_SIZE = 1000; + // this is the total number of partitions allowed per batch that glue metastore can process to create partitions private static final int BATCH_CREATE_PARTITION_MAX_PAGE_SIZE = 100; private static final int AWS_GLUE_GET_PARTITIONS_MAX_RESULTS = 1000; private static final Comparator PARTITION_COMPARATOR = comparing(Partition::getValues, lexicographical(String.CASE_INSENSITIVE_ORDER)); @@ -226,6 +243,14 @@ public GlueMetastoreStats getStats() return stats; } + // For Glue metastore there's an upper bound limit on 100 partitions per batch. + // Here's the reference: https://docs.aws.amazon.com/glue/latest/webapi/API_BatchCreatePartition.html + @Override + public int getPartitionCommitBatchSize() + { + return BATCH_PARTITION_COMMIT_TOTAL_SIZE; + } + @Override public Optional getDatabase(MetastoreContext metastoreContext, String databaseName) { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java b/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java index aae304ba0898c..ee99489d370a7 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/SyncPartitionMetadataProcedure.java @@ -40,6 +40,8 @@ import java.lang.invoke.MethodHandle; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import java.util.stream.Stream; @@ -54,6 +56,7 @@ import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled; import static com.facebook.presto.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Lists.partition; import static java.lang.Boolean.TRUE; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -61,6 +64,8 @@ public class SyncPartitionMetadataProcedure implements Provider { + private static final int GET_PARTITION_BY_NAMES_BATCH_SIZE = 1000; + public enum SyncMode { ADD, DROP, FULL @@ -114,7 +119,15 @@ private void doSyncPartitionMetadata(ConnectorSession session, String schemaName SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.get().getMetastore(); SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); - Table table = metastore.getTable(new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), metastore.getColumnConverterProvider()), schemaName, tableName) + MetastoreContext metastoreContext = new MetastoreContext( + session.getIdentity(), + session.getQueryId(), + session.getClientInfo(), + session.getSource(), + getMetastoreHeaders(session), + isUserDefinedTypeEncodingEnabled(session), + metastore.getColumnConverterProvider()); + Table table = metastore.getTable(metastoreContext, schemaName, tableName) .orElseThrow(() -> new TableNotFoundException(schemaTableName)); if (table.getPartitionColumns().isEmpty()) { throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Table is not partitioned: " + schemaTableName); @@ -127,17 +140,26 @@ private void doSyncPartitionMetadata(ConnectorSession session, String schemaName try { FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, tableLocation); - List partitionsInMetastore = metastore.getPartitionNames(new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), metastore.getColumnConverterProvider()), schemaName, tableName) + List partitionNamesInMetastore = metastore.getPartitionNames(metastoreContext, schemaName, tableName) .orElseThrow(() -> new TableNotFoundException(schemaTableName)); + ImmutableList.Builder partitionsInMetastore = new ImmutableList.Builder<>(); + for (List batchPartitionNames : partition(partitionNamesInMetastore, GET_PARTITION_BY_NAMES_BATCH_SIZE)) { + Map> partitionsOptionalMap = metastore.getPartitionsByNames(metastoreContext, schemaName, tableName, batchPartitionNames); + for (Map.Entry> entry : partitionsOptionalMap.entrySet()) { + if (entry.getValue().isPresent()) { + partitionsInMetastore.add(tableLocation.toUri().relativize(new Path(entry.getValue().get().getStorage().getLocation()).toUri()).getPath()); + } + } + } List partitionsInFileSystem = listDirectory(fileSystem, fileSystem.getFileStatus(tableLocation), table.getPartitionColumns(), table.getPartitionColumns().size(), caseSensitive).stream() .map(fileStatus -> fileStatus.getPath().toUri()) .map(uri -> tableLocation.toUri().relativize(uri).getPath()) .collect(toImmutableList()); // partitions in file system but not in metastore - partitionsToAdd = difference(partitionsInFileSystem, partitionsInMetastore); + partitionsToAdd = difference(partitionsInFileSystem, partitionsInMetastore.build()); // partitions in metastore but not in file system - partitionsToDrop = difference(partitionsInMetastore, partitionsInFileSystem); + partitionsToDrop = difference(partitionsInMetastore.build(), partitionsInFileSystem); } catch (IOException e) { throw new PrestoException(HIVE_FILESYSTEM_ERROR, e); @@ -211,7 +233,8 @@ private static void addPartitions( false, buildPartitionObject(session, table, name), new Path(table.getStorage().getLocation(), name), - PartitionStatistics.empty()); + PartitionStatistics.empty(), + true); } } diff --git a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestSyncPartitionMetadata.java b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestSyncPartitionMetadata.java index 108b4035cd99f..0ca528574a4e3 100644 --- a/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestSyncPartitionMetadata.java +++ b/presto-product-tests/src/main/java/com/facebook/presto/tests/hive/TestSyncPartitionMetadata.java @@ -126,6 +126,33 @@ public void testConflictingMixedCasePartitionNames() assertPartitions(tableName, row("a", "1"), row("b", "2")); } + @Test(groups = {HIVE_PARTITIONING, SMOKE}) + public void testAddPartitionNeedsUrlEncoding() + { + String tableName = "test_sync_partition_add_partition_url_encoding"; + String mirrorTableName = "test_sync_partition_add_partition_url_encoding_mirror"; + + query("CREATE TABLE " + tableName + " (payload bigint, col_x varchar, col_y varchar) " + + "WITH (format = 'ORC', partitioned_by = ARRAY[ 'col_x', 'col_y' ])"); + query("INSERT INTO " + tableName + " VALUES (1, 'a', '1'), (2, 'b', '2')"); + + String tableLocation = WAREHOUSE_DIRECTORY_PATH + tableName; + + query("CREATE TABLE " + mirrorTableName + " (payload bigint, col_x varchar, col_y varchar) " + + "WITH (external_location = '" + tableLocation + "', format = 'ORC', partitioned_by = ARRAY[ 'col_x', 'col_y' ])"); + query("CALL system.sync_partition_metadata('default', '" + mirrorTableName + "', 'ADD')"); + + assertPartitions(tableName, row("a", "1"), row("b", "2")); + assertPartitions(mirrorTableName, row("a", "1"), row("b", "2")); + + query("INSERT INTO " + tableName + " VALUES (3, 'c', '3')"); + assertPartitions(tableName, row("a", "1"), row("b", "2"), row("c", "3")); + assertPartitions(mirrorTableName, row("a", "1"), row("b", "2")); + + query("CALL system.sync_partition_metadata('default', '" + mirrorTableName + "', 'ADD')"); + assertPartitions(mirrorTableName, row("a", "1"), row("b", "2"), row("c", "3")); + } + private static void prepare(HdfsClient hdfsClient, HdfsDataSourceWriter hdfsDataSourceWriter, String tableName) { query("DROP TABLE IF EXISTS " + tableName);