Skip to content

Commit 0927c8f

Browse files
ZacBlancoyingsu00
authored andcommitted
[Iceberg] Enable affinity scheduling on file sections
This change moves the affinity scheduling file section size configuration from HiveClientConfig and HiveSessionProperties to HiveCommonClientConfig and HiveCommonSessionProperties so that the iceberg connector can benefit from this scheduling strategy when tables have a small number of files but a large number of splits.
1 parent f9caa39 commit 0927c8f

File tree

14 files changed

+116
-47
lines changed

14 files changed

+116
-47
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

+8
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,14 @@ Property Name Description
428428
``iceberg.target_split_size`` Overrides the target split size for all tables in a query in bytes.
429429
Set to 0 to use the value in each Iceberg table's
430430
``read.split.target-size`` property.
431+
``iceberg.affinity_scheduling_file_section_size`` When the ``node_selection_strategy`` or
432+
``hive.node-selection-strategy`` property is set to ``SOFT_AFFINITY``,
433+
this configuration property will change the size of a file chunk that
434+
is hashed to a particular node when determining the which worker to
435+
assign a split to. Splits which read data from the same file within
436+
the same chunk will hash to the same node. A smaller chunk size will
437+
result in a higher probability splits being distributed evenly across
438+
the cluster, but reduce locality.
431439
===================================================== ======================================================================
432440

433441
Caching Support

presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonClientConfig.java

+14
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class HiveCommonClientConfig
4646
private boolean readNullMaskedParquetEncryptedValueEnabled;
4747
private boolean useParquetColumnNames;
4848
private boolean zstdJniDecompressionEnabled;
49+
private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE);
4950

5051
public NodeSelectionStrategy getNodeSelectionStrategy()
5152
{
@@ -284,4 +285,17 @@ public HiveCommonClientConfig setZstdJniDecompressionEnabled(boolean zstdJniDeco
284285
this.zstdJniDecompressionEnabled = zstdJniDecompressionEnabled;
285286
return this;
286287
}
288+
289+
@NotNull
290+
public DataSize getAffinitySchedulingFileSectionSize()
291+
{
292+
return affinitySchedulingFileSectionSize;
293+
}
294+
295+
@Config("hive.affinity-scheduling-file-section-size")
296+
public HiveCommonClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySchedulingFileSectionSize)
297+
{
298+
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
299+
return this;
300+
}
287301
}

presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonSessionProperties.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class HiveCommonSessionProperties
4444
@VisibleForTesting
4545
public static final String PARQUET_BATCH_READ_OPTIMIZATION_ENABLED = "parquet_batch_read_optimization_enabled";
4646

47-
private static final String NODE_SELECTION_STRATEGY = "node_selection_strategy";
47+
public static final String NODE_SELECTION_STRATEGY = "node_selection_strategy";
4848
private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled";
4949
private static final String ORC_LAZY_READ_SMALL_RANGES = "orc_lazy_read_small_ranges";
5050
private static final String ORC_MAX_BUFFER_SIZE = "orc_max_buffer_size";
@@ -61,6 +61,7 @@ public class HiveCommonSessionProperties
6161
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
6262
private static final String PARQUET_USE_COLUMN_NAMES = "parquet_use_column_names";
6363
public static final String READ_MASKED_VALUE_ENABLED = "read_null_masked_parquet_encrypted_value_enabled";
64+
public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size";
6465
private final List<PropertyMetadata<?>> sessionProperties;
6566

6667
@Inject
@@ -177,6 +178,11 @@ public HiveCommonSessionProperties(HiveCommonClientConfig hiveCommonClientConfig
177178
READ_MASKED_VALUE_ENABLED,
178179
"Return null when access is denied for an encrypted parquet column",
179180
hiveCommonClientConfig.getReadNullMaskedParquetEncryptedValue(),
181+
false),
182+
dataSizeSessionProperty(
183+
AFFINITY_SCHEDULING_FILE_SECTION_SIZE,
184+
"Size of file section for affinity scheduling",
185+
hiveCommonClientConfig.getAffinitySchedulingFileSectionSize(),
180186
false));
181187
}
182188

@@ -299,4 +305,9 @@ public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, St
299305
value -> DataSize.valueOf((String) value),
300306
DataSize::toString);
301307
}
308+
309+
public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession session)
310+
{
311+
return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class);
312+
}
302313
}

presto-hive-common/src/test/java/com/facebook/presto/hive/TestHiveCommonClientConfig.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424

2525
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY;
26+
import static io.airlift.units.DataSize.Unit.MEGABYTE;
2627

2728
public class TestHiveCommonClientConfig
2829
{
@@ -47,7 +48,8 @@ public void testDefaults()
4748
.setZstdJniDecompressionEnabled(false)
4849
.setParquetBatchReaderVerificationEnabled(false)
4950
.setParquetBatchReadOptimizationEnabled(false)
50-
.setReadNullMaskedParquetEncryptedValue(false));
51+
.setReadNullMaskedParquetEncryptedValue(false)
52+
.setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE)));
5153
}
5254

5355
@Test
@@ -72,6 +74,7 @@ public void testExplicitPropertyMappings()
7274
.put("hive.enable-parquet-batch-reader-verification", "true")
7375
.put("hive.parquet-batch-read-optimization-enabled", "true")
7476
.put("hive.read-null-masked-parquet-encrypted-value-enabled", "true")
77+
.put("hive.affinity-scheduling-file-section-size", "512MB")
7578
.build();
7679

7780
HiveCommonClientConfig expected = new HiveCommonClientConfig()
@@ -92,7 +95,8 @@ public void testExplicitPropertyMappings()
9295
.setZstdJniDecompressionEnabled(true)
9396
.setParquetBatchReaderVerificationEnabled(true)
9497
.setParquetBatchReadOptimizationEnabled(true)
95-
.setReadNullMaskedParquetEncryptedValue(true);
98+
.setReadNullMaskedParquetEncryptedValue(true)
99+
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE));
96100

97101
ConfigAssertions.assertFullMapping(properties, expected);
98102
}

presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java

-14
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ public class HiveClientConfig
220220
private Duration parquetQuickStatsFileMetadataFetchTimeout = new Duration(60, TimeUnit.SECONDS);
221221
private int parquetQuickStatsMaxConcurrentCalls = 500;
222222
private int quickStatsMaxConcurrentCalls = 100;
223-
private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE);
224223
private boolean legacyTimestampBucketing;
225224

226225
@Min(0)
@@ -1793,19 +1792,6 @@ public int getMaxParallelParsingConcurrency()
17931792
return this.maxParallelParsingConcurrency;
17941793
}
17951794

1796-
@NotNull
1797-
public DataSize getAffinitySchedulingFileSectionSize()
1798-
{
1799-
return affinitySchedulingFileSectionSize;
1800-
}
1801-
1802-
@Config("hive.affinity-scheduling-file-section-size")
1803-
public HiveClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySchedulingFileSectionSize)
1804-
{
1805-
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
1806-
return this;
1807-
}
1808-
18091795
@Config("hive.skip-empty-files")
18101796
@ConfigDescription("Enables skip of empty files avoiding output error")
18111797
public HiveClientConfig setSkipEmptyFilesEnabled(boolean skipEmptyFiles)

presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java

-11
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ public final class HiveSessionProperties
131131
public static final String QUICK_STATS_INLINE_BUILD_TIMEOUT = "quick_stats_inline_build_timeout";
132132
public static final String QUICK_STATS_BACKGROUND_BUILD_TIMEOUT = "quick_stats_background_build_timeout";
133133
public static final String DYNAMIC_SPLIT_SIZES_ENABLED = "dynamic_split_sizes_enabled";
134-
public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size";
135134
public static final String SKIP_EMPTY_FILES = "skip_empty_files";
136135
public static final String LEGACY_TIMESTAMP_BUCKETING = "legacy_timestamp_bucketing";
137136

@@ -639,11 +638,6 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
639638
false,
640639
value -> Duration.valueOf((String) value),
641640
Duration::toString),
642-
dataSizeSessionProperty(
643-
AFFINITY_SCHEDULING_FILE_SECTION_SIZE,
644-
"Size of file section for affinity scheduling",
645-
hiveClientConfig.getAffinitySchedulingFileSectionSize(),
646-
false),
647641
booleanProperty(
648642
SKIP_EMPTY_FILES,
649643
"If it is required empty files will be skipped",
@@ -1126,11 +1120,6 @@ public static Duration getQuickStatsBackgroundBuildTimeout(ConnectorSession sess
11261120
return session.getProperty(QUICK_STATS_BACKGROUND_BUILD_TIMEOUT, Duration.class);
11271121
}
11281122

1129-
public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession session)
1130-
{
1131-
return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class);
1132-
}
1133-
11341123
public static boolean isSkipEmptyFilesEnabled(ConnectorSession session)
11351124
{
11361125
return session.getProperty(SKIP_EMPTY_FILES, Boolean.class);

presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@
5252

5353
import static com.facebook.airlift.concurrent.MoreFutures.failedFuture;
5454
import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture;
55+
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
5556
import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT;
5657
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
5758
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
58-
import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize;
5959
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
6060
import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize;
6161
import static com.facebook.presto.hive.HiveSessionProperties.getMinimumAssignedSplitWeight;

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java

-4
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ public void testDefaults()
165165
.setMaxConcurrentParquetQuickStatsCalls(500)
166166
.setCteVirtualBucketCount(128)
167167
.setSkipEmptyFilesEnabled(false)
168-
.setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE))
169168
.setLegacyTimestampBucketing(false));
170169
}
171170

@@ -290,7 +289,6 @@ public void testExplicitPropertyMappings()
290289
.put("hive.quick-stats.parquet.max-concurrent-calls", "399")
291290
.put("hive.quick-stats.max-concurrent-calls", "101")
292291
.put("hive.cte-virtual-bucket-count", "256")
293-
.put("hive.affinity-scheduling-file-section-size", "512MB")
294292
.put("hive.skip-empty-files", "true")
295293
.put("hive.legacy-timestamp-bucketing", "true")
296294
.build();
@@ -411,10 +409,8 @@ public void testExplicitPropertyMappings()
411409
.setParquetQuickStatsFileMetadataFetchTimeout(new Duration(30, TimeUnit.SECONDS))
412410
.setMaxConcurrentParquetQuickStatsCalls(399)
413411
.setMaxConcurrentQuickStatsCalls(101)
414-
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE))
415412
.setSkipEmptyFilesEnabled(true)
416413
.setCteVirtualBucketCount(256)
417-
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE))
418414
.setLegacyTimestampBucketing(true);
419415

420416
ConfigAssertions.assertFullMapping(properties, expected);

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import static com.facebook.presto.hive.CacheQuotaScope.GLOBAL;
4343
import static com.facebook.presto.hive.CacheQuotaScope.PARTITION;
4444
import static com.facebook.presto.hive.CacheQuotaScope.TABLE;
45-
import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize;
45+
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
4646
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
4747
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
4848
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class IcebergSplit
5151
private final List<DeleteFile> deletes;
5252
private final Optional<ChangelogSplitInfo> changelogSplitInfo;
5353
private final long dataSequenceNumber;
54+
private final long affinitySchedulingFileSectionSize;
55+
private final long affinitySchedulingFileSectionIndex;
5456

5557
@JsonCreator
5658
public IcebergSplit(
@@ -66,7 +68,8 @@ public IcebergSplit(
6668
@JsonProperty("splitWeight") SplitWeight splitWeight,
6769
@JsonProperty("deletes") List<DeleteFile> deletes,
6870
@JsonProperty("changelogSplitInfo") Optional<ChangelogSplitInfo> changelogSplitInfo,
69-
@JsonProperty("dataSequenceNumber") long dataSequenceNumber)
71+
@JsonProperty("dataSequenceNumber") long dataSequenceNumber,
72+
@JsonProperty("affinitySchedulingSectionSize") long affinitySchedulingFileSectionSize)
7073
{
7174
requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
7275
this.path = requireNonNull(path, "path is null");
@@ -82,6 +85,8 @@ public IcebergSplit(
8285
this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null"));
8386
this.changelogSplitInfo = requireNonNull(changelogSplitInfo, "changelogSplitInfo is null");
8487
this.dataSequenceNumber = dataSequenceNumber;
88+
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
89+
this.affinitySchedulingFileSectionIndex = start / affinitySchedulingFileSectionSize;
8590
}
8691

8792
@JsonProperty
@@ -143,7 +148,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
143148
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
144149
{
145150
if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
146-
return nodeProvider.get(path);
151+
return nodeProvider.get(path + "#" + affinitySchedulingFileSectionIndex);
147152
}
148153
return addresses;
149154
}
@@ -173,6 +178,12 @@ public long getDataSequenceNumber()
173178
return dataSequenceNumber;
174179
}
175180

181+
@JsonProperty
182+
public long getAffinitySchedulingFileSectionSize()
183+
{
184+
return affinitySchedulingFileSectionSize;
185+
}
186+
176187
@Override
177188
public Object getInfo()
178189
{

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Optional;
3838
import java.util.concurrent.CompletableFuture;
3939

40+
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
4041
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
4142
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
4243
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
@@ -60,6 +61,7 @@ public class IcebergSplitSource
6061
private final double minimumAssignedSplitWeight;
6162
private final long targetSplitSize;
6263
private final NodeSelectionStrategy nodeSelectionStrategy;
64+
private final long affinitySchedulingFileSectionSize;
6365

6466
private final TupleDomain<IcebergColumnHandle> metadataColumnConstraints;
6567

@@ -73,11 +75,12 @@ public IcebergSplitSource(
7375
this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes();
7476
this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session);
7577
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
78+
this.affinitySchedulingFileSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
7679
this.fileScanTaskIterator = closer.register(
7780
splitFiles(
7881
closer.register(tableScan.planFiles()),
7982
targetSplitSize)
80-
.iterator());
83+
.iterator());
8184
}
8285

8386
@Override
@@ -139,6 +142,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
139142
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / targetSplitSize, minimumAssignedSplitWeight), 1.0)),
140143
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
141144
Optional.empty(),
142-
getDataSequenceNumber(task.file()));
145+
getDataSequenceNumber(task.file()),
146+
affinitySchedulingFileSectionSize);
143147
}
144148
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.Optional;
4848
import java.util.concurrent.CompletableFuture;
4949

50+
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
5051
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
5152
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
5253
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
@@ -71,6 +72,7 @@ public class ChangelogSplitSource
7172
private final long targetSplitSize;
7273
private final List<IcebergColumnHandle> columnHandles;
7374
private final NodeSelectionStrategy nodeSelectionStrategy;
75+
private final long affinitySchedulingSectionSize;
7476

7577
public ChangelogSplitSource(
7678
ConnectorSession session,
@@ -86,6 +88,7 @@ public ChangelogSplitSource(
8688
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
8789
this.fileScanTaskIterable = closer.register(tableScan.planFiles());
8890
this.fileScanTaskIterator = closer.register(fileScanTaskIterable.iterator());
91+
this.affinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
8992
}
9093

9194
@Override
@@ -153,6 +156,7 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask<DataFile> task, Ch
153156
changeTask.changeOrdinal(),
154157
changeTask.commitSnapshotId(),
155158
columnHandles)),
156-
getDataSequenceNumber(task.file()));
159+
getDataSequenceNumber(task.file()),
160+
affinitySchedulingSectionSize);
157161
}
158162
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/equalitydeletes/EqualityDeletesSplitSource.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Optional;
3737
import java.util.concurrent.CompletableFuture;
3838

39+
import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
3940
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
4041
import static com.facebook.presto.iceberg.FileContent.EQUALITY_DELETES;
4142
import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent;
@@ -52,6 +53,7 @@ public class EqualityDeletesSplitSource
5253
{
5354
private final ConnectorSession session;
5455
private final Map<Integer, PartitionSpec> specById;
56+
private final long affinitySchedulingSectionSize;
5557
private CloseableIterator<DeleteFile> deleteFiles;
5658

5759
public EqualityDeletesSplitSource(
@@ -64,6 +66,7 @@ public EqualityDeletesSplitSource(
6466
requireNonNull(deleteFiles, "deleteFiles is null");
6567
this.specById = table.specs();
6668
this.deleteFiles = CloseableIterable.filter(deleteFiles, deleteFile -> fromIcebergFileContent(deleteFile.content()) == EQUALITY_DELETES).iterator();
69+
this.affinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
6770
}
6871

6972
@Override
@@ -121,6 +124,7 @@ private IcebergSplit splitFromDeleteFile(DeleteFile deleteFile)
121124
SplitWeight.standard(),
122125
ImmutableList.of(),
123126
Optional.empty(),
124-
IcebergUtil.getDataSequenceNumber(deleteFile));
127+
IcebergUtil.getDataSequenceNumber(deleteFile),
128+
affinitySchedulingSectionSize);
125129
}
126130
}

0 commit comments

Comments
 (0)