Skip to content

Commit 0bea2bd

Browse files
authored
Cleanup Iceberg Manifest list files on Purge (#1039)
1 parent 9dd47d5 commit 0bea2bd

File tree

2 files changed

+47
-8
lines changed

2 files changed

+47
-8
lines changed

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java

+42-7
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ public void testTableCleanup() throws IOException {
167167
taskEntity2 -> taskEntity2.getTaskType())
168168
.returns(
169169
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
170-
tableIdentifier, List.of(statisticsFile.path())),
170+
tableIdentifier,
171+
List.of(snapshot.manifestListLocation(), statisticsFile.path())),
171172
entity ->
172173
entity.readData(
173174
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
@@ -224,7 +225,7 @@ public void close() {
224225
.getOrCreateMetaStoreManager(realmContext)
225226
.loadTasks(callContext.getPolarisCallContext(), "test", 5)
226227
.getEntities())
227-
.hasSize(1);
228+
.hasSize(2);
228229
}
229230

230231
@Test
@@ -285,15 +286,41 @@ public void close() {
285286
.getOrCreateMetaStoreManager(realmContext)
286287
.loadTasks(callContext.getPolarisCallContext(), "test", 5)
287288
.getEntities())
288-
.hasSize(2)
289+
.hasSize(4)
289290
.satisfiesExactly(
290291
taskEntity ->
291292
assertThat(taskEntity)
292293
.returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode)
293294
.extracting(TaskEntity::of)
294295
.returns(
295-
AsyncTaskType.MANIFEST_FILE_CLEANUP,
296+
AsyncTaskType.METADATA_FILE_BATCH_CLEANUP,
296297
taskEntity1 -> taskEntity1.getTaskType())
298+
.returns(
299+
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
300+
tableIdentifier, List.of(snapshot.manifestListLocation())),
301+
entity ->
302+
entity.readData(
303+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
304+
taskEntity ->
305+
assertThat(taskEntity)
306+
.returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode)
307+
.extracting(TaskEntity::of)
308+
.returns(
309+
AsyncTaskType.METADATA_FILE_BATCH_CLEANUP,
310+
taskEntity2 -> taskEntity2.getTaskType())
311+
.returns(
312+
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
313+
tableIdentifier, List.of(snapshot.manifestListLocation())),
314+
entity ->
315+
entity.readData(
316+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
317+
taskEntity ->
318+
assertThat(taskEntity)
319+
.returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode)
320+
.extracting(TaskEntity::of)
321+
.returns(
322+
AsyncTaskType.MANIFEST_FILE_CLEANUP,
323+
taskEntity3 -> taskEntity3.getTaskType())
297324
.returns(
298325
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
299326
tableIdentifier,
@@ -307,7 +334,7 @@ public void close() {
307334
.extracting(TaskEntity::of)
308335
.returns(
309336
AsyncTaskType.MANIFEST_FILE_CLEANUP,
310-
taskEntity2 -> taskEntity2.getTaskType())
337+
taskEntity4 -> taskEntity4.getTaskType())
311338
.returns(
312339
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
313340
tableIdentifier,
@@ -413,7 +440,11 @@ public void testTableCleanupMultipleSnapshots() throws IOException {
413440
.returns(
414441
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
415442
tableIdentifier,
416-
List.of(statisticsFile1.path(), statisticsFile2.path())),
443+
List.of(
444+
snapshot.manifestListLocation(),
445+
snapshot2.manifestListLocation(),
446+
statisticsFile1.path(),
447+
statisticsFile2.path())),
417448
entity ->
418449
entity.readData(
419450
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
@@ -569,7 +600,11 @@ public void testTableCleanupMultipleMetadata() throws IOException {
569600
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
570601
tableIdentifier,
571602
List.of(
572-
firstMetadataFile, statisticsFile1.path(), statisticsFile2.path())),
603+
firstMetadataFile,
604+
snapshot.manifestListLocation(),
605+
snapshot2.manifestListLocation(),
606+
statisticsFile1.path(),
607+
statisticsFile2.path())),
573608
entity ->
574609
entity.readData(
575610
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));

service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.stream.Collectors;
2626
import java.util.stream.Stream;
2727
import org.apache.iceberg.ManifestFile;
28+
import org.apache.iceberg.Snapshot;
2829
import org.apache.iceberg.StatisticsFile;
2930
import org.apache.iceberg.TableMetadata;
3031
import org.apache.iceberg.TableMetadataParser;
@@ -243,7 +244,10 @@ private List<List<String>> getMetadataFileBatches(TableMetadata tableMetadata, i
243244
List<List<String>> result = new ArrayList<>();
244245
List<String> metadataFiles =
245246
Stream.concat(
246-
tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
247+
Stream.concat(
248+
tableMetadata.previousFiles().stream()
249+
.map(TableMetadata.MetadataLogEntry::file),
250+
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation)),
247251
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path))
248252
.toList();
249253

0 commit comments

Comments
 (0)