Skip to content

Commit

Permalink
Address imjalpreet's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacBlanco committed Jan 27, 2025
1 parent a85aa9d commit 52c43b4
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public ConnectorPageSourceWithRowPositions(
Optional<Long> startRowPosition,
Optional<Long> endRowPosition)
{
this.delegate = requireNonNull(delegate, "connectorPageSource is null");
this.delegate = requireNonNull(delegate, "delegate is null");
this.startRowPosition = requireNonNull(startRowPosition, "startRowPosition is null");
this.endRowPosition = requireNonNull(endRowPosition, "endRowPosition is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.view.View;

Expand Down Expand Up @@ -124,7 +125,7 @@
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBEG_COMMIT_ERROR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
Expand Down Expand Up @@ -531,49 +532,8 @@ private Optional<ConnectorOutputMetadata> finishWrite(ConnectorSession session,

ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
ImmutableSet.Builder<String> referencedDataFiles = ImmutableSet.builder();
for (CommitTaskData task : commitTasks) {
PartitionSpec partitionSpec = icebergTable.specs().get(task.getPartitionSpecId());
Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
switch (task.getContent()) {
case POSITION_DELETES:
FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.ofPositionDeletes()
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!partitionSpec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

rowDelta.addDeletes(deleteBuilder.build());
writtenFiles.add(task.getPath());
task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
break;
case DATA:
DataFiles.Builder builder = DataFiles.builder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
rowDelta.addRows(builder.build());
writtenFiles.add(task.getPath());
break;
default:
throw new UnsupportedOperationException("Unsupported task content: " + task.getContent());
}
}
commitTasks.forEach(task -> handleTask(task, icebergTable, rowDelta, writtenFiles, referencedDataFiles));

rowDelta.validateDataFilesExist(referencedDataFiles.build());
if (isolationLevel == IsolationLevel.SERIALIZABLE) {
rowDelta.validateNoConflictingDataFiles();
Expand All @@ -592,14 +552,69 @@ private Optional<ConnectorOutputMetadata> finishWrite(ConnectorSession session,
}
catch (ValidationException e) {
log.error(e, "ValidationException in finishWrite");
throw new PrestoException(ICEBEG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + writableTableHandle.getTableName(), e);
throw new PrestoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + writableTableHandle.getTableName(), e);
}

return Optional.of(new HiveWrittenPartitions(commitTasks.stream()
.map(CommitTaskData::getPath)
.collect(toImmutableList())));
}

private void handleTask(CommitTaskData task, Table icebergTable, RowDelta rowDelta, ImmutableSet.Builder<String> writtenFiles, ImmutableSet.Builder<String> referencedDataFiles)
{
PartitionSpec partitionSpec = icebergTable.specs().get(task.getPartitionSpecId());
Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
switch (task.getContent()) {
case POSITION_DELETES:
handleFinishPositionDeletes(task, partitionSpec, partitionColumnTypes, rowDelta, writtenFiles, referencedDataFiles);
break;
case DATA:
handleFinishData(task, icebergTable, partitionSpec, partitionColumnTypes, rowDelta, writtenFiles, referencedDataFiles);
break;
default:
throw new UnsupportedOperationException("Unsupported task content: " + task.getContent());
}
}

private void handleFinishPositionDeletes(CommitTaskData task, PartitionSpec partitionSpec, Type[] partitionColumnTypes, RowDelta rowDelta, ImmutableSet.Builder<String> writtenFiles, ImmutableSet.Builder<String> referencedDataFiles)
{
FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.ofPositionDeletes()
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!partitionSpec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

rowDelta.addDeletes(deleteBuilder.build());
writtenFiles.add(task.getPath());
task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
}

private void handleFinishData(CommitTaskData task, Table icebergTable, PartitionSpec partitionSpec, Type[] partitionColumnTypes, RowDelta rowDelta, ImmutableSet.Builder<String> writtenFiles, ImmutableSet.Builder<String> referencedDataFiles)
{
DataFiles.Builder builder = DataFiles.builder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
rowDelta.addRows(builder.build());
writtenFiles.add(task.getPath());
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down Expand Up @@ -686,14 +701,14 @@ protected ImmutableMap<String, Object> createViewMetadataProperties(View view)

public static Schema toIcebergSchema(List<ColumnMetadata> columns)
{
List<Types.NestedField> icebergColumns = new ArrayList<>();
List<NestedField> icebergColumns = new ArrayList<>();
for (ColumnMetadata column : columns) {
if (!column.isHidden()) {
int index = icebergColumns.size();
Type type = toIcebergType(column.getType());
Types.NestedField field = column.isNullable()
? Types.NestedField.optional(index, column.getName(), type, column.getComment())
: Types.NestedField.required(index, column.getName(), type, column.getComment());
NestedField field = column.isNullable()
? NestedField.optional(index, column.getName(), type, column.getComment())
: NestedField.required(index, column.getName(), type, column.getComment());
icebergColumns.add(field);
}
}
Expand Down Expand Up @@ -889,7 +904,6 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
tableSchemaJson,
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableList.of());
}

Expand Down Expand Up @@ -1150,22 +1164,23 @@ else if (tableVersion.getVersionExpressionType() instanceof VarcharType) {
*
* @return A column handle for the Row ID update column.
*/
@Override
public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
List<Types.NestedField> unmodifiedColumns = new ArrayList<>();
List<NestedField> unmodifiedColumns = new ArrayList<>();
unmodifiedColumns.add(ROW_POSITION);
// Include all the non-updated columns. These are needed when writing the new data file with updated column values.
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Set<Integer> updatedFields = updatedColumns.stream()
.map(IcebergColumnHandle.class::cast)
.map(IcebergColumnHandle::getId)
.collect(toImmutableSet());
for (Types.NestedField column : SchemaParser.fromJson(table.getTableSchemaJson().get()).columns()) {
for (NestedField column : SchemaParser.fromJson(table.getTableSchemaJson().get()).columns()) {
if (!updatedFields.contains(column.fieldId())) {
unmodifiedColumns.add(column);
}
}
Types.NestedField field = Types.NestedField.required(UPDATE_ROW_DATA.getId(), UPDATE_ROW_DATA.getColumnName(), Types.StructType.of(unmodifiedColumns));
NestedField field = NestedField.required(UPDATE_ROW_DATA.getId(), UPDATE_ROW_DATA.getColumnName(), Types.StructType.of(unmodifiedColumns));
return IcebergColumnHandle.create(field, typeManager, SYNTHESIZED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public enum IcebergErrorCode
ICEBERG_ROLLBACK_ERROR(13, EXTERNAL),
ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR),
ICEBERG_UNKNOWN_MANIFEST_TYPE(15, EXTERNAL),
ICEBEG_COMMIT_ERROR(16, EXTERNAL);
ICEBERG_COMMIT_ERROR(16, EXTERNAL),
ICEBERG_MISSING_COLUMN(17, EXTERNAL);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.crypto.InternalFileDecryptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -150,8 +150,8 @@
import static com.facebook.presto.iceberg.IcebergColumnHandle.isPushedDownSubfield;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_MISSING_COLUMN;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_MISSING_DATA;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.UPDATE_ROW_DATA;
import static com.facebook.presto.iceberg.IcebergOrcColumn.ROOT_COLUMN_ID;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getLocationProvider;
Expand Down Expand Up @@ -797,7 +797,10 @@ public ConnectorPageSource createPageSource(
columnsToReadFromStorage.add(handle);
}
else {
Types.NestedField column = tableSchema.findField(colId.getId());
NestedField column = tableSchema.findField(colId.getId());
if (column == null) {
throw new PrestoException(ICEBERG_MISSING_COLUMN, "Could not find field " + colId + " in table schema: " + tableSchema);
}
IcebergColumnHandle handle = IcebergColumnHandle.create(column, typeManager, REGULAR);
columnsToReadFromStorage.add(handle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class IcebergPartitionInsertingPageSource
private final ConnectorPageSourceWithRowPositions delegateWithPositions;
private final ConnectorPageSource delegate;
private final Block[] partitionValueBlocks;
private final long partitionValuesMemoryUsage;
// maps output array index to index of input page from delegate provider.
private final int[] outputIndexes;

public IcebergPartitionInsertingPageSource(
Expand All @@ -63,10 +65,22 @@ public IcebergPartitionInsertingPageSource(
Function<List<IcebergColumnHandle>, ConnectorPageSourceWithRowPositions> delegateSupplier)
{
this.nonPartitionColumnIndexes = new IcebergColumnHandle[fullColumnList.size()];
this.outputIndexes = new int[fullColumnList.size()];
populateIndexes(fullColumnList, partitionKeys);
this.partitionValueBlocks = generatePartitionValueBlocks(fullColumnList, metadataValues, partitionKeys);
this.partitionValuesMemoryUsage = Arrays.stream(partitionValueBlocks).filter(Objects::nonNull).mapToLong(Block::getRetainedSizeInBytes).sum();

List<IcebergColumnHandle> delegateColumns = Arrays.stream(nonPartitionColumnIndexes)
.filter(Objects::nonNull)
.collect(toImmutableList());

this.delegateWithPositions = delegateSupplier.apply(delegateColumns);
this.delegate = delegateWithPositions.getDelegate();
}

private void populateIndexes(List<IcebergColumnHandle> fullColumnList, Map<Integer, HivePartitionKey> partitionKeys)
{
int delegateIndex = 0;
// maps output array index to index of input page from delegate provider.
this.outputIndexes = new int[fullColumnList.size()];
// generate array of non-partition column indexes
for (int i = 0; i < fullColumnList.size(); i++) {
IcebergColumnHandle handle = fullColumnList.get(i);
Expand All @@ -80,38 +94,37 @@ public IcebergPartitionInsertingPageSource(
outputIndexes[i] = delegateIndex;
delegateIndex++;
}
List<IcebergColumnHandle> delegateColumns = Arrays.stream(nonPartitionColumnIndexes)
.filter(Objects::nonNull)
.collect(toImmutableList());
}

this.partitionValueBlocks = IntStream.range(0, fullColumnList.size())
private Block[] generatePartitionValueBlocks(
List<IcebergColumnHandle> fullColumnList,
Map<Integer, Object> metadataValues,
Map<Integer, HivePartitionKey> partitionKeys)
{
return IntStream.range(0, fullColumnList.size())
.mapToObj(idx -> {
IcebergColumnHandle column = fullColumnList.get(idx);
if (nonPartitionColumnIndexes[idx] != null) {
return null;
}

Type type = column.getType();
if (partitionKeys.containsKey(column.getId())) {
HivePartitionKey icebergPartition = partitionKeys.get(column.getId());
Type type = column.getType();
Object prefilledValue = deserializePartitionValue(type, icebergPartition.getValue().orElse(null), column.getName());
return nativeValueToBlock(type, prefilledValue);
}
else if (column.getColumnType() == PARTITION_KEY) {
// Partition key with no value. This can happen after partition evolution
Type type = column.getType();
return nativeValueToBlock(type, null);
}
else if (isMetadataColumnId(column.getId())) {
return nativeValueToBlock(column.getType(), metadataValues.get(column.getColumnIdentity().getId()));
return nativeValueToBlock(type, metadataValues.get(column.getColumnIdentity().getId()));
}

return null;
})
.toArray(Block[]::new);

this.delegateWithPositions = delegateSupplier.apply(delegateColumns);
this.delegate = delegateWithPositions.getDelegate();
}

public ConnectorPageSourceWithRowPositions getRowPositionDelegate()
Expand Down Expand Up @@ -154,12 +167,9 @@ public Page getNextPage()
int batchSize = dataPage.getPositionCount();
Block[] blocks = new Block[nonPartitionColumnIndexes.length];
for (int i = 0; i < nonPartitionColumnIndexes.length; i++) {
if (partitionValueBlocks[i] != null) {
blocks[i] = new RunLengthEncodedBlock(partitionValueBlocks[i], batchSize);
}
else {
blocks[i] = dataPage.getBlock(outputIndexes[i]);
}
blocks[i] = partitionValueBlocks[i] == null ?
dataPage.getBlock(outputIndexes[i]) :
new RunLengthEncodedBlock(partitionValueBlocks[i], batchSize);
}

return new Page(batchSize, blocks);
Expand Down Expand Up @@ -188,7 +198,7 @@ protected void closeWithSuppression(Throwable throwable)
@Override
public long getSystemMemoryUsage()
{
return delegate.getSystemMemoryUsage();
return delegate.getSystemMemoryUsage() + partitionValuesMemoryUsage;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void testGenerateStatisticColumnSets()
.setDataColumns(ImmutableList.of())
.setPredicateColumns(ImmutableMap.of())
.setRequestedColumns(Optional.empty())
.setTable(new IcebergTableHandle("test", IcebergTableName.from("test"), false, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), ImmutableList.of()))
.setTable(new IcebergTableHandle("test", IcebergTableName.from("test"), false, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), ImmutableList.of()))
.setDomainPredicate(TupleDomain.all());
// verify all selected columns are included
List<IcebergColumnHandle> includedColumns = combineSelectedAndPredicateColumns(
Expand Down

0 comments on commit 52c43b4

Please sign in to comment.