Skip to content

Commit

Permalink
This feature adds an initial phase of support for constraint optimiza…
Browse files Browse the repository at this point in the history
…tion to the PrestoDB optimizer in response to issue prestodb#16413. In particular, it adds support for constraint optimizations involving primary and unique key constraints. This support manifests as a general-purpose optimization framework for associating logical properties with the result table produced by a plan node that optimization rules then exploit to generate more efficient plans. These logical properties initially derive from unique key constraints defined on database tables. They are further augmented and refined by the grouping, limiting, predicate application, and other operations performed by plan nodes. The feature adds several new optimization rules that exploit logical properties to discover and remove redundant query operations. Future work will extend this framework with constraint optimizations involving logical properties derived from referential integrity constraints, functional dependencies, order dependencies, and other types of constraints. This work exploits Hive 3.1.2 catalog capabilities that allow for the definition of informational constraints on Hive tables. However, this feature can extend to any data source that provides enforced or informational table constraints.  Please see the design and test strategy documents attached issue prestodb#16413 for additional details.
  • Loading branch information
simmend committed Aug 13, 2021
1 parent ccfb095 commit 4e0ea30
Show file tree
Hide file tree
Showing 99 changed files with 7,252 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public PlanNode visitFilter(FilterNode node, Void context)
oldTableScanNode.getOutputVariables(),
oldTableScanNode.getAssignments(),
oldTableScanNode.getCurrentConstraint(),
oldTableScanNode.getEnforcedConstraint());
oldTableScanNode.getEnforcedConstraint(),
oldTableScanNode.getTableConstraints());

return new FilterNode(idAllocator.getNextId(), newTableScanNode, node.getPredicate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ private Optional<PlanNode> tryCreatingNewScanNode(PlanNode plan)
ImmutableList.copyOf(assignments.keySet()),
assignments.entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, (e) -> (ColumnHandle) (e.getValue()))),
tableScanNode.getCurrentConstraint(),
tableScanNode.getEnforcedConstraint()));
tableScanNode.getEnforcedConstraint(),
tableScanNode.getTableConstraints()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,14 @@ public void testMismatchSchemaTable()

super.testMismatchSchemaTable();
}

@Override
public void testTableConstraints()
{
if (getHiveVersionMajor() < 3) {
throw new SkipException("Table constraints support is in Hive 3 and above. Disabling it for lower versions");
}

super.testTableConstraints();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.TableConstraint;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.RoleGrant;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
Expand Down Expand Up @@ -86,11 +87,13 @@ public enum MetastoreCacheScope
}

protected final ExtendedHiveMetastore delegate;

private final LoadingCache<KeyAndContext<String>, Optional<Database>> databaseCache;
private final LoadingCache<KeyAndContext<String>, List<String>> databaseNamesCache;
private final LoadingCache<KeyAndContext<HiveTableName>, Optional<Table>> tableCache;
private final LoadingCache<KeyAndContext<String>, Optional<List<String>>> tableNamesCache;
private final LoadingCache<KeyAndContext<HiveTableName>, PartitionStatistics> tableStatisticsCache;
private final LoadingCache<KeyAndContext<HiveTableName>, List<TableConstraint<String>>> tableConstraintsCache;
private final LoadingCache<KeyAndContext<HivePartitionName>, PartitionStatistics> partitionStatisticsCache;
private final LoadingCache<KeyAndContext<String>, Optional<List<String>>> viewNamesCache;
private final LoadingCache<KeyAndContext<HivePartitionName>, Optional<Partition>> partitionCache;
Expand All @@ -99,7 +102,6 @@ public enum MetastoreCacheScope
private final LoadingCache<KeyAndContext<UserTableKey>, Set<HivePrivilegeInfo>> tablePrivilegesCache;
private final LoadingCache<KeyAndContext<String>, Set<String>> rolesCache;
private final LoadingCache<KeyAndContext<PrestoPrincipal>, Set<RoleGrant>> roleGrantsCache;

private final boolean metastoreImpersonationEnabled;
private final boolean partitionVersioningEnabled;
private final double partitionCacheValidationPercentage;
Expand Down Expand Up @@ -245,6 +247,9 @@ public Map<KeyAndContext<HivePartitionName>, PartitionStatistics> loadAll(Iterab
tableCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadTable), executor));

tableConstraintsCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadTableConstraints), executor));

viewNamesCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadAllViews), executor));

Expand Down Expand Up @@ -289,6 +294,7 @@ public void flushCache()
partitionNamesCache.invalidateAll();
databaseCache.invalidateAll();
tableCache.invalidateAll();
tableConstraintsCache.invalidateAll();
partitionCache.invalidateAll();
partitionFilterCache.invalidateAll();
tablePrivilegesCache.invalidateAll();
Expand Down Expand Up @@ -348,6 +354,12 @@ public Optional<Table> getTable(MetastoreContext metastoreContext, String databa
return get(tableCache, getCachingKey(metastoreContext, hiveTableName(databaseName, tableName)));
}

@Override
public List<TableConstraint<String>> getTableConstraints(MetastoreContext metastoreContext, String databaseName, String tableName)
{
return get(tableConstraintsCache, getCachingKey(metastoreContext, hiveTableName(databaseName, tableName)));
}

@Override
public Set<ColumnStatisticType> getSupportedColumnStatistics(MetastoreContext metastoreContext, Type type)
{
Expand All @@ -359,6 +371,11 @@ private Optional<Table> loadTable(KeyAndContext<HiveTableName> hiveTableName)
return delegate.getTable(hiveTableName.getContext(), hiveTableName.getKey().getDatabaseName(), hiveTableName.getKey().getTableName());
}

private List<TableConstraint<String>> loadTableConstraints(KeyAndContext<HiveTableName> hiveTableName)
{
return delegate.getTableConstraints(hiveTableName.getContext(), hiveTableName.getKey().getDatabaseName(), hiveTableName.getKey().getTableName());
}

@Override
public PartitionStatistics getTableStatistics(MetastoreContext metastoreContext, String databaseName, String tableName)
{
Expand Down Expand Up @@ -593,6 +610,10 @@ protected void invalidateTable(String databaseName, String tableName)
.filter(hiveTableNameKey -> hiveTableNameKey.getKey().equals(hiveTableName))
.forEach(tableCache::invalidate);

tableConstraintsCache.asMap().keySet().stream()
.filter(hiveTableNameKey -> hiveTableNameKey.getKey().equals(hiveTableName))
.forEach(tableConstraintsCache::invalidate);

tableNamesCache.asMap().keySet().stream()
.filter(tableNameKey -> tableNameKey.getKey().equals(databaseName))
.forEach(tableNamesCache::invalidate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.spi.TableConstraint;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.RoleGrant;
import com.facebook.presto.spi.statistics.ColumnStatisticType;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -114,4 +116,9 @@ List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
void revokeTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal grantee, Set<HivePrivilegeInfo> privileges);

Set<HivePrivilegeInfo> listTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal principal);

default List<TableConstraint<String>> getTableConstraints(MetastoreContext metastoreContext, String schemaName, String tableName)
{
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.TableConstraint;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.RoleGrant;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class RecordingHiveMetastore

private final Cache<String, Optional<Database>> databaseCache;
private final Cache<HiveTableName, Optional<Table>> tableCache;
private final Cache<HiveTableName, List<TableConstraint<String>>> tableConstraintsCache;
private final Cache<String, Set<ColumnStatisticType>> supportedColumnStatisticsCache;
private final Cache<HiveTableName, PartitionStatistics> tableStatisticsCache;
private final Cache<Set<HivePartitionName>, Map<String, PartitionStatistics>> partitionStatisticsCache;
Expand All @@ -87,6 +89,7 @@ public RecordingHiveMetastore(@ForRecordingHiveMetastore ExtendedHiveMetastore d

databaseCache = createCache(metastoreClientConfig);
tableCache = createCache(metastoreClientConfig);
tableConstraintsCache = createCache(metastoreClientConfig);
supportedColumnStatisticsCache = createCache(metastoreClientConfig);
tableStatisticsCache = createCache(metastoreClientConfig);
partitionStatisticsCache = createCache(metastoreClientConfig);
Expand Down Expand Up @@ -114,6 +117,7 @@ void loadRecording()
allRoles = recording.getAllRoles();
databaseCache.putAll(toMap(recording.getDatabases()));
tableCache.putAll(toMap(recording.getTables()));
tableConstraintsCache.putAll(toMap(recording.getTableConstraints()));
supportedColumnStatisticsCache.putAll(toMap(recording.getSupportedColumnStatistics()));
tableStatisticsCache.putAll(toMap(recording.getTableStatistics()));
partitionStatisticsCache.putAll(toMap(recording.getPartitionStatistics()));
Expand Down Expand Up @@ -152,6 +156,7 @@ public void writeRecording()
allRoles,
toPairs(databaseCache),
toPairs(tableCache),
toPairs(tableConstraintsCache),
toPairs(supportedColumnStatisticsCache),
toPairs(tableStatisticsCache),
toPairs(partitionStatisticsCache),
Expand Down Expand Up @@ -205,6 +210,11 @@ public Optional<Table> getTable(MetastoreContext metastoreContext, String databa
return loadValue(tableCache, hiveTableName(databaseName, tableName), () -> delegate.getTable(metastoreContext, databaseName, tableName));
}

public List<TableConstraint<String>> getTableConstraints(MetastoreContext metastoreContext, String databaseName, String tableName)
{
return loadValue(tableConstraintsCache, hiveTableName(databaseName, tableName), () -> delegate.getTableConstraints(metastoreContext, databaseName, tableName));
}

@Override
public Set<ColumnStatisticType> getSupportedColumnStatistics(MetastoreContext metastoreContext, Type type)
{
Expand Down Expand Up @@ -501,6 +511,7 @@ public static class Recording
private final Optional<Set<String>> allRoles;
private final List<Pair<String, Optional<Database>>> databases;
private final List<Pair<HiveTableName, Optional<Table>>> tables;
private final List<Pair<HiveTableName, List<TableConstraint<String>>>> tableConstraints;
private final List<Pair<String, Set<ColumnStatisticType>>> supportedColumnStatistics;
private final List<Pair<HiveTableName, PartitionStatistics>> tableStatistics;
private final List<Pair<Set<HivePartitionName>, Map<String, PartitionStatistics>>> partitionStatistics;
Expand All @@ -519,6 +530,7 @@ public Recording(
@JsonProperty("allRoles") Optional<Set<String>> allRoles,
@JsonProperty("databases") List<Pair<String, Optional<Database>>> databases,
@JsonProperty("tables") List<Pair<HiveTableName, Optional<Table>>> tables,
@JsonProperty("tableConstraints") List<Pair<HiveTableName, List<TableConstraint<String>>>> tableConstraints,
@JsonProperty("supportedColumnStatistics") List<Pair<String, Set<ColumnStatisticType>>> supportedColumnStatistics,
@JsonProperty("tableStatistics") List<Pair<HiveTableName, PartitionStatistics>> tableStatistics,
@JsonProperty("partitionStatistics") List<Pair<Set<HivePartitionName>, Map<String, PartitionStatistics>>> partitionStatistics,
Expand All @@ -535,6 +547,7 @@ public Recording(
this.allRoles = allRoles;
this.databases = databases;
this.tables = tables;
this.tableConstraints = tableConstraints;
this.supportedColumnStatistics = supportedColumnStatistics;
this.tableStatistics = tableStatistics;
this.partitionStatistics = partitionStatistics;
Expand Down Expand Up @@ -572,6 +585,12 @@ public List<Pair<HiveTableName, Optional<Table>>> getTables()
return tables;
}

@JsonProperty
public List<Pair<HiveTableName, List<TableConstraint<String>>>> getTableConstraints()
{
return tableConstraints;
}

@JsonProperty
public List<Pair<String, Set<ColumnStatisticType>>> getSupportedColumnStatistics()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.TableConstraint;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.PrincipalType;
Expand All @@ -51,6 +52,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -180,6 +182,16 @@ public synchronized Optional<Table> getTable(MetastoreContext metastoreContext,
}
}

public synchronized List<TableConstraint<String>> getTableConstraints(MetastoreContext metastoreContext, String databaseName, String tableName)
{
checkReadable();
Action<TableAndMore> tableAction = tableActions.get(new SchemaTableName(databaseName, tableName));
if (tableAction == null) {
return delegate.getTableConstraints(metastoreContext, databaseName, tableName);
}
return Collections.emptyList();
}

public synchronized Set<ColumnStatisticType> getSupportedColumnStatistics(MetastoreContext metastoreContext, Type type)
{
return delegate.getSupportedColumnStatistics(metastoreContext, type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.PrimaryKeyConstraint;
import com.facebook.presto.spi.SchemaNotFoundException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableConstraint;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.RoleGrant;
Expand All @@ -41,6 +43,7 @@

import javax.inject.Inject;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -97,6 +100,18 @@ public Optional<Table> getTable(MetastoreContext metastoreContext, String databa
});
}

@Override
public List<TableConstraint<String>> getTableConstraints(MetastoreContext metastoreContext, String databaseName, String tableName)
{
List<TableConstraint<String>> constraints = new ArrayList<>();
Optional<PrimaryKeyConstraint<String>> primaryKey = delegate.getPrimaryKey(metastoreContext, databaseName, tableName);
if (primaryKey.isPresent()) {
constraints.add(primaryKey.get());
}
constraints.addAll(delegate.getUniqueConstraints(metastoreContext, databaseName, tableName));
return constraints;
}

@Override
public Set<ColumnStatisticType> getSupportedColumnStatistics(MetastoreContext metastoreContext, Type type)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.PrimaryKeyConstraint;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.UniqueConstraint;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.RoleGrant;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
Expand All @@ -32,6 +34,7 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -133,4 +136,14 @@ default Optional<List<FieldSchema>> getFields(MetastoreContext metastoreContext,

return Optional.of(table.get().getSd().getCols());
}

default Optional<PrimaryKeyConstraint<String>> getPrimaryKey(MetastoreContext metastoreContext, String databaseName, String tableName)
{
return Optional.empty();
}

default List<UniqueConstraint<String>> getUniqueConstraints(MetastoreContext metastoreContext, String databaseName, String tableName)
{
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysResponse;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UniqueConstraintsResponse;
import org.apache.thrift.TException;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public interface HiveMetastoreClient
extends Closeable
Expand Down Expand Up @@ -146,4 +149,10 @@ List<RolePrincipalGrant> listRoleGrants(String name, PrincipalType principalType

void setUGI(String userName)
throws TException;

Optional<PrimaryKeysResponse> getPrimaryKey(String dbName, String tableName)
throws TException;

Optional<UniqueConstraintsResponse> getUniqueConstraints(String catName, String dbName, String tableName)
throws TException;
}
Loading

0 comments on commit 4e0ea30

Please sign in to comment.