Skip to content

Commit

Permalink
Approximate written bytes in Hive and Iceberg sorted writers
Browse files Browse the repository at this point in the history
Hive and Iceberg use the written bytes value to determine when a new file
should be started. Without this value the `target-max-file-size` config
property is ignored.

While writing, the returned value is an approximation. However, an accurate
value is returned after the writter commits.

Doing this does reduce the sort window to one file rather than all the
data the PageSink receives. Broadening the sort window while also splitting
files into the right size can be done as follow up.
  • Loading branch information
alexjo2144 authored and findepi committed Aug 29, 2023
1 parent 17115ed commit ceb3e8b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public class SortingFileWriter
private final AtomicLong nextFileId = new AtomicLong();
private final TypeOperators typeOperators;

private boolean flushed;
private long tempFilesWrittenBytes;

public SortingFileWriter(
TrinoFileSystem fileSystem,
Location tempFilePrefix,
Expand Down Expand Up @@ -109,7 +112,14 @@ public SortingFileWriter(
@Override
public long getWrittenBytes()
{
return outputWriter.getWrittenBytes();
if (flushed) {
return outputWriter.getWrittenBytes();
}

// This is an approximation, since the outputWriter is not used until this write is committed.
// Returning an approximation is important as the value is used by the PageSink to split files
// into a reasonable size.
return tempFilesWrittenBytes;
}

@Override
Expand All @@ -130,6 +140,8 @@ public void appendRows(Page page)
@Override
public Closeable commit()
{
flushed = true;

Closeable rollbackAction = createRollbackAction(fileSystem, tempFiles);
if (!sortBuffer.isEmpty()) {
// skip temporary files entirely if the total output size is small
Expand Down Expand Up @@ -259,6 +271,7 @@ private void writeTempFile(Consumer<TempFileWriter> consumer)
consumer.accept(writer);
writer.close();
tempFiles.add(new TempFile(tempFile, writer.getWrittenBytes()));
tempFilesWrittenBytes += writer.getWrittenBytes();
}
catch (IOException | UncheckedIOException e) {
cleanupFile(fileSystem, tempFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5301,6 +5301,42 @@ public void testTargetMaxFileSize()
.forEach(row -> assertThat((Long) row.getField(0)).isBetween(1L, maxSize.toBytes() * 6));
}

@Test
public void testTargetMaxFileSizeOnSortedTable()
{
String tableName = "test_default_max_file_size_sorted_" + randomNameSuffix();
@Language("SQL") String createTableSql = format("CREATE TABLE %s WITH (sorted_by = ARRAY['shipdate']) AS SELECT * FROM tpch.sf1.lineitem LIMIT 100000", tableName);

Session session = Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
// task scale writers should be disabled since we want to write with a single task writer
.setSystemProperty("task_scale_writers_enabled", "false")
.build();
assertUpdate(session, createTableSql, 100000);
List<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles.size()).isLessThanOrEqualTo(3);
assertUpdate(format("DROP TABLE %s", tableName));

DataSize maxSize = DataSize.of(40, DataSize.Unit.KILOBYTE);
session = Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
// task scale writers should be disabled since we want to write with a single task writer
.setSystemProperty("task_scale_writers_enabled", "false")
.setCatalogSessionProperty("iceberg", "target_max_file_size", maxSize.toString())
.build();

assertUpdate(session, createTableSql, 100000);
assertThat(query(format("SELECT count(*) FROM %s", tableName))).matches("VALUES BIGINT '100000'");
List<String> updatedFiles = getActiveFiles(tableName);
assertThat(updatedFiles.size()).isGreaterThan(5);

computeActual(format("SELECT file_size_in_bytes FROM \"%s$files\"", tableName))
.getMaterializedRows()
// as target_max_file_size is set to quite low value it can happen that created files are bigger,
// so just to be safe we check if it is not much bigger
.forEach(row -> assertThat((Long) row.getField(0)).isBetween(1L, maxSize.toBytes() * 20));
}

@Test
public void testDroppingIcebergAndCreatingANewTableWithTheSameNameShouldBePossible()
{
Expand Down

0 comments on commit ceb3e8b

Please sign in to comment.