Skip to content

Commit c29b385

Browse files
codoperschlussel
authored andcommitted
Support Hudi merged view file slices for partition path updates without compaction
for MOR table, just consider latest completed compaction Address review comments and fix test failures add a negative test and address other comments fix compilation post rebase reduce test artifacts
1 parent cb73e99 commit c29b385

File tree

59 files changed

+704
-3
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+704
-3
lines changed

presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java

+14
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ public class HiveClientConfig
189189
private boolean verboseRuntimeStatsEnabled;
190190
private boolean useRecordPageSourceForCustomSplit = true;
191191
private boolean hudiMetadataEnabled;
192+
private String hudiTablesUseMergedView;
192193

193194
private boolean sizeBasedSplitWeightsEnabled = true;
194195
private double minimumAssignedSplitWeight = 0.05;
@@ -1615,6 +1616,19 @@ public boolean isHudiMetadataEnabled()
16151616
return this.hudiMetadataEnabled;
16161617
}
16171618

1619+
@Config("hive.hudi-tables-use-merged-view")
1620+
@ConfigDescription("For Hudi tables, a comma-separated list in the form of <schema>.<table> which should prefer to fetch the list of files from the merged file system view")
1621+
public HiveClientConfig setHudiTablesUseMergedView(String hudiTablesUseMergedView)
1622+
{
1623+
this.hudiTablesUseMergedView = hudiTablesUseMergedView;
1624+
return this;
1625+
}
1626+
1627+
public String getHudiTablesUseMergedView()
1628+
{
1629+
return this.hudiTablesUseMergedView;
1630+
}
1631+
16181632
@Config("hive.quick-stats.enabled")
16191633
@ConfigDescription("Use quick stats to resolve stats")
16201634
public HiveClientConfig setQuickStatsEnabled(boolean quickStatsEnabled)

presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java

+12
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public final class HiveSessionProperties
125125
public static final String MAX_INITIAL_SPLITS = "max_initial_splits";
126126
public static final String FILE_SPLITTABLE = "file_splittable";
127127
private static final String HUDI_METADATA_ENABLED = "hudi_metadata_enabled";
128+
private static final String HUDI_TABLES_USE_MERGED_VIEW = "hudi_tables_use_merged_view";
128129
private static final String READ_TABLE_CONSTRAINTS = "read_table_constraints";
129130
public static final String PARALLEL_PARSING_OF_PARTITION_VALUES_ENABLED = "parallel_parsing_of_partition_values_enabled";
130131
public static final String QUICK_STATS_ENABLED = "quick_stats_enabled";
@@ -609,6 +610,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
609610
"For Hudi tables prefer to fetch the list of file names, sizes and other metadata from the internal metadata table rather than storage",
610611
hiveClientConfig.isHudiMetadataEnabled(),
611612
false),
613+
stringProperty(
614+
HUDI_TABLES_USE_MERGED_VIEW,
615+
"For Hudi tables, a comma-separated list in the form of <schema>.<table> which should use merged view to read data",
616+
hiveClientConfig.getHudiTablesUseMergedView(),
617+
false),
612618
booleanProperty(
613619
PARALLEL_PARSING_OF_PARTITION_VALUES_ENABLED,
614620
"Enables parallel parsing of partition values from partition names using thread pool",
@@ -1102,6 +1108,12 @@ public static boolean isHudiMetadataEnabled(ConnectorSession session)
11021108
return session.getProperty(HUDI_METADATA_ENABLED, Boolean.class);
11031109
}
11041110

1111+
public static String getHudiTablesUseMergedView(ConnectorSession session)
1112+
{
1113+
String hudiTablesUseMergedView = session.getProperty(HUDI_TABLES_USE_MERGED_VIEW, String.class);
1114+
return hudiTablesUseMergedView == null ? "" : hudiTablesUseMergedView;
1115+
}
1116+
11051117
public static boolean isReadTableConstraints(ConnectorSession session)
11061118
{
11071119
return session.getProperty(READ_TABLE_CONSTRAINTS, Boolean.class);

presto-hive/src/main/java/com/facebook/presto/hive/HudiDirectoryLister.java

+40-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.facebook.presto.hive.metastore.Table;
2121
import com.facebook.presto.hive.util.HiveFileIterator;
2222
import com.facebook.presto.spi.ConnectorSession;
23+
import com.google.common.base.Splitter;
2324
import org.apache.hadoop.conf.Configuration;
2425
import org.apache.hadoop.fs.BlockLocation;
2526
import org.apache.hadoop.fs.FileStatus;
@@ -30,32 +31,44 @@
3031
import org.apache.hudi.common.engine.HoodieEngineContext;
3132
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
3233
import org.apache.hudi.common.fs.FSUtils;
34+
import org.apache.hudi.common.model.FileSlice;
3335
import org.apache.hudi.common.model.HoodieBaseFile;
36+
import org.apache.hudi.common.model.HoodieTableType;
3437
import org.apache.hudi.common.table.HoodieTableMetaClient;
38+
import org.apache.hudi.common.table.timeline.HoodieInstant;
39+
import org.apache.hudi.common.table.timeline.HoodieTimeline;
3540
import org.apache.hudi.common.table.view.FileSystemViewManager;
3641
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
42+
import org.apache.hudi.common.util.Option;
3743

3844
import java.io.IOException;
3945
import java.util.Iterator;
4046
import java.util.Optional;
47+
import java.util.stream.Stream;
4148

4249
import static com.facebook.presto.hive.HiveFileInfo.createHiveFileInfo;
50+
import static com.facebook.presto.hive.HiveSessionProperties.getHudiTablesUseMergedView;
4351
import static com.facebook.presto.hive.HiveSessionProperties.isHudiMetadataEnabled;
4452
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT;
53+
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
4554

4655
public class HudiDirectoryLister
4756
implements DirectoryLister
4857
{
4958
private static final Logger log = Logger.get(HudiDirectoryLister.class);
59+
private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
5060

5161
private final HoodieTableFileSystemView fileSystemView;
5262
private final HoodieTableMetaClient metaClient;
5363
private final boolean metadataEnabled;
64+
private final String latestInstant;
65+
private final boolean shouldUseMergedView;
5466

5567
public HudiDirectoryLister(Configuration conf, ConnectorSession session, Table table)
5668
{
5769
log.info("Using Hudi Directory Lister.");
5870
this.metadataEnabled = isHudiMetadataEnabled(session);
71+
this.shouldUseMergedView = SPLITTER.splitToList(getHudiTablesUseMergedView(session)).contains(table.getSchemaTableName().toString());
5972
Configuration actualConfig = ((CachingJobConf) conf).getConfig();
6073
/*
6174
WrapperJobConf acts as a wrapper on top of the actual Configuration object. If `hive.copy-on-first-write-configuration-enabled`
@@ -68,6 +81,16 @@ public HudiDirectoryLister(Configuration conf, ConnectorSession session, Table t
6881
.setConf(actualConfig)
6982
.setBasePath(table.getStorage().getLocation())
7083
.build();
84+
this.latestInstant = metaClient.getActiveTimeline()
85+
.getCommitsAndCompactionTimeline()
86+
.filterCompletedInstants()
87+
.filter(instant -> MERGE_ON_READ.equals(metaClient.getTableType()) && instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
88+
.lastInstant()
89+
.map(HoodieInstant::getTimestamp).orElseGet(() -> metaClient.getActiveTimeline()
90+
.getCommitsTimeline()
91+
.filterCompletedInstants()
92+
.lastInstant()
93+
.map(HoodieInstant::getTimestamp).orElseThrow(() -> new RuntimeException("No active instant found")));
7194
HoodieEngineContext engineContext = new HoodieLocalEngineContext(actualConfig);
7295
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
7396
.enable(metadataEnabled)
@@ -96,7 +119,10 @@ public Iterator<HiveFileInfo> list(
96119
fileSystemView,
97120
metadataEnabled ? Optional.empty() : Optional.of(fileSystem.listStatus(p)),
98121
table.getStorage().getLocation(),
99-
p),
122+
p,
123+
metaClient.getTableType(),
124+
latestInstant,
125+
shouldUseMergedView),
100126
namenodeStats,
101127
hiveDirectoryContext.getNestedDirectoryPolicy(),
102128
hiveDirectoryContext.isSkipEmptyFilesEnabled());
@@ -111,15 +137,26 @@ public HudiFileInfoIterator(
111137
HoodieTableFileSystemView fileSystemView,
112138
Optional<FileStatus[]> fileStatuses,
113139
String tablePath,
114-
Path directory)
140+
Path directory,
141+
HoodieTableType tableType,
142+
String latestInstant,
143+
boolean shouldUseMergedView)
115144
{
116145
String partition = FSUtils.getRelativePartitionPath(new Path(tablePath), directory);
117146
if (fileStatuses.isPresent()) {
118147
fileSystemView.addFilesToView(fileStatuses.get());
119148
this.hoodieBaseFileIterator = fileSystemView.fetchLatestBaseFiles(partition).iterator();
120149
}
121150
else {
122-
this.hoodieBaseFileIterator = fileSystemView.getLatestBaseFiles(partition).iterator();
151+
if (shouldUseMergedView) {
152+
Stream<FileSlice> fileSlices = MERGE_ON_READ.equals(tableType) ?
153+
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partition, latestInstant) :
154+
fileSystemView.getLatestFileSlicesBeforeOrOn(partition, latestInstant, false);
155+
this.hoodieBaseFileIterator = fileSlices.map(FileSlice::getBaseFile).filter(Option::isPresent).map(Option::get).iterator();
156+
}
157+
else {
158+
this.hoodieBaseFileIterator = fileSystemView.getLatestBaseFiles(partition).iterator();
159+
}
123160
}
124161
}
125162

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java

+3
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ public void testDefaults()
149149
.setUseRecordPageSourceForCustomSplit(true)
150150
.setFileSplittable(true)
151151
.setHudiMetadataEnabled(false)
152+
.setHudiTablesUseMergedView(null)
152153
.setThriftProtocol(Protocol.BINARY)
153154
.setThriftBufferSize(new DataSize(128, BYTE))
154155
.setCopyOnFirstWriteConfigurationEnabled(true)
@@ -274,6 +275,7 @@ public void testExplicitPropertyMappings()
274275
.put("hive.use-record-page-source-for-custom-split", "false")
275276
.put("hive.file-splittable", "false")
276277
.put("hive.hudi-metadata-enabled", "true")
278+
.put("hive.hudi-tables-use-merged-view", "default.user")
277279
.put("hive.internal-communication.thrift-transport-protocol", "COMPACT")
278280
.put("hive.internal-communication.thrift-transport-buffer-size", "256B")
279281
.put("hive.copy-on-first-write-configuration-enabled", "false")
@@ -395,6 +397,7 @@ public void testExplicitPropertyMappings()
395397
.setUseRecordPageSourceForCustomSplit(false)
396398
.setFileSplittable(false)
397399
.setHudiMetadataEnabled(true)
400+
.setHudiTablesUseMergedView("default.user")
398401
.setThriftProtocol(Protocol.COMPACT)
399402
.setThriftBufferSize(new DataSize(256, BYTE))
400403
.setCopyOnFirstWriteConfigurationEnabled(false)

presto-hive/src/test/java/com/facebook/presto/hive/TestHudiDirectoryLister.java

+121
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
1919
import com.facebook.presto.hive.metastore.Storage;
2020
import com.facebook.presto.hive.metastore.Table;
21+
import com.facebook.presto.spi.ConnectorSession;
2122
import com.facebook.presto.spi.security.ConnectorIdentity;
23+
import com.facebook.presto.testing.TestingConnectorSession;
2224
import com.google.common.collect.ImmutableList;
2325
import com.google.common.collect.ImmutableMap;
2426
import org.apache.hadoop.conf.Configuration;
@@ -33,15 +35,19 @@
3335
import java.io.IOException;
3436
import java.net.URI;
3537
import java.util.Iterator;
38+
import java.util.List;
3639
import java.util.Optional;
3740

3841
import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
3942
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
4043
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
44+
import static com.facebook.presto.hive.HiveTestUtils.TEST_CLIENT_TAGS;
45+
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
4146
import static com.facebook.presto.hive.NestedDirectoryPolicy.IGNORED;
4247
import static com.facebook.presto.hive.metastore.PrestoTableType.EXTERNAL_TABLE;
4348
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
4449
import static org.testng.Assert.assertEquals;
50+
import static org.testng.Assert.assertFalse;
4551
import static org.testng.Assert.assertThrows;
4652
import static org.testng.Assert.assertTrue;
4753

@@ -107,6 +113,121 @@ private Table getMockTable()
107113
Optional.empty());
108114
}
109115

116+
private Table getMockMORTableWithPartition()
117+
{
118+
return new Table(
119+
"schema",
120+
"hudi_mor_part_update",
121+
"user",
122+
EXTERNAL_TABLE,
123+
new Storage(fromHiveStorageFormat(PARQUET),
124+
getTableBasePath("hudi_mor_part_update"),
125+
Optional.of(new HiveBucketProperty(
126+
ImmutableList.of(),
127+
1,
128+
ImmutableList.of(),
129+
HIVE_COMPATIBLE,
130+
Optional.empty())),
131+
false,
132+
ImmutableMap.of(),
133+
ImmutableMap.of()),
134+
ImmutableList.of(),
135+
ImmutableList.of(),
136+
ImmutableMap.of(),
137+
Optional.empty(),
138+
Optional.empty());
139+
}
140+
141+
@Test
142+
public void testDirectoryListerForMORTableWithPartitionUpdates()
143+
throws IOException
144+
{
145+
Table mockTable = getMockMORTableWithPartition();
146+
Configuration hadoopConf = getHadoopConfWithCopyOnFirstWriteDisabled();
147+
try {
148+
ConnectorSession session = new TestingConnectorSession(
149+
getAllSessionProperties(
150+
new HiveClientConfig()
151+
.setHudiMetadataEnabled(true)
152+
.setHudiTablesUseMergedView(mockTable.getSchemaTableName().toString()),
153+
new HiveCommonClientConfig()),
154+
TEST_CLIENT_TAGS);
155+
HudiDirectoryLister directoryLister = new HudiDirectoryLister(hadoopConf, session, mockTable);
156+
HoodieTableMetaClient metaClient = directoryLister.getMetaClient();
157+
assertEquals(metaClient.getBasePath(), mockTable.getStorage().getLocation());
158+
Path path = new Path(mockTable.getStorage().getLocation());
159+
ExtendedFileSystem fs = (ExtendedFileSystem) path.getFileSystem(hadoopConf);
160+
Iterator<HiveFileInfo> fileInfoIterator = directoryLister.list(fs, mockTable, path, Optional.empty(), new NamenodeStats(), new HiveDirectoryContext(
161+
IGNORED,
162+
false,
163+
false,
164+
new ConnectorIdentity("test", Optional.empty(), Optional.empty()),
165+
ImmutableMap.of(),
166+
new RuntimeStats()));
167+
while (fileInfoIterator.hasNext()) {
168+
HiveFileInfo fileInfo = fileInfoIterator.next();
169+
String fileName = fileInfo.getFileName();
170+
// expected to have the latest base file in p1 and p2 partitions
171+
assertTrue(fileName.startsWith("daf69bc6-01c8-4b86-b9ef-d9c036aa5cdc-0") || fileName.startsWith("bb70e1e1-8310-4ebe-8a9c-c955f8e72830-0"));
172+
// not expected to have the older version of the base file in p1
173+
assertFalse(fileName.startsWith("c0bbff31-67b3-4660-99ba-d388b8bb8c3c-0_0-32-192"));
174+
}
175+
}
176+
finally {
177+
hadoopConf = null;
178+
}
179+
}
180+
181+
@Test
182+
public void testDirectoryListerForMORTableWithoutTableNames()
183+
throws IOException
184+
{
185+
Table mockTable = getMockMORTableWithPartition();
186+
Configuration hadoopConf = getHadoopConfWithCopyOnFirstWriteDisabled();
187+
try {
188+
ConnectorSession session = new TestingConnectorSession(
189+
getAllSessionProperties(
190+
new HiveClientConfig()
191+
.setHudiMetadataEnabled(true),
192+
new HiveCommonClientConfig()),
193+
TEST_CLIENT_TAGS);
194+
HudiDirectoryLister directoryLister = new HudiDirectoryLister(hadoopConf, session, mockTable);
195+
HoodieTableMetaClient metaClient = directoryLister.getMetaClient();
196+
assertEquals(metaClient.getBasePath(), mockTable.getStorage().getLocation());
197+
Path path = new Path(mockTable.getStorage().getLocation(), "p1");
198+
ExtendedFileSystem fs = (ExtendedFileSystem) path.getFileSystem(hadoopConf);
199+
Iterator<HiveFileInfo> fileInfoIterator = directoryLister.list(fs, mockTable, path, Optional.empty(), new NamenodeStats(), new HiveDirectoryContext(
200+
IGNORED,
201+
false,
202+
false,
203+
new ConnectorIdentity("test", Optional.empty(), Optional.empty()),
204+
ImmutableMap.of(),
205+
new RuntimeStats()));
206+
String partition1FileId = "daf69bc6-01c8-4b86-b9ef-d9c036aa5cdc-0";
207+
// expected to have the latest base file in p1 as well as older version because the table is not configured to use merged view
208+
List<HiveFileInfo> fileInfoList = ImmutableList.copyOf(fileInfoIterator);
209+
assertEquals(fileInfoList.size(), 1);
210+
assertTrue(fileInfoList.get(0).getFileName().startsWith(partition1FileId));
211+
212+
Path path2 = new Path(mockTable.getStorage().getLocation(), "p2");
213+
fileInfoIterator = directoryLister.list(fs, mockTable, path2, Optional.empty(), new NamenodeStats(), new HiveDirectoryContext(
214+
IGNORED,
215+
false,
216+
false,
217+
new ConnectorIdentity("test", Optional.empty(), Optional.empty()),
218+
ImmutableMap.of(),
219+
new RuntimeStats()));
220+
String partition2FileId = "bb70e1e1-8310-4ebe-8a9c-c955f8e72830-0";
221+
// expected to have only the latest base file in p2
222+
List<HiveFileInfo> fileInfoList2 = ImmutableList.copyOf(fileInfoIterator);
223+
assertEquals(fileInfoList2.size(), 1);
224+
assertTrue(fileInfoList2.get(0).getFileName().startsWith(partition2FileId));
225+
}
226+
finally {
227+
hadoopConf = null;
228+
}
229+
}
230+
110231
@Test
111232
public void testDirectoryListerForHudiTable()
112233
throws IOException
Binary file not shown.

0 commit comments

Comments
 (0)