Skip to content

Commit

Permalink
PARQUET-2437: Avoid flushing at Parquet writes after an exception
Browse files Browse the repository at this point in the history
  • Loading branch information
gszadovszky committed Feb 28, 2024
1 parent d31a891 commit 29a86b4
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -57,6 +58,7 @@ private interface ColumnWriterProvider {
private final long thresholdTolerance;
private long rowCount;
private long rowCountForNextSizeCheck;
private StatusManager statusManager = StatusManager.create();

// To be used by the deprecated constructor of ColumnWriteStoreV1
@Deprecated
Expand All @@ -73,7 +75,7 @@ private interface ColumnWriterProvider {
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterBase column = columns.get(path);
if (column == null) {
column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props);
column = createColumnWriterBase(path, pageWriteStore.getPageWriter(path), null, props);
columns.put(path, column);
}
return column;
Expand All @@ -87,7 +89,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
mcolumns.put(path, createColumnWriterBase(path, pageWriter, null, props));
}
this.columns = unmodifiableMap(mcolumns);

Expand All @@ -114,9 +116,9 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
if (props.isBloomFilterEnabled(path)) {
BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props));
mcolumns.put(path, createColumnWriterBase(path, pageWriter, bloomFilterWriter, props));
} else {
mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
mcolumns.put(path, createColumnWriterBase(path, pageWriter, null, props));
}
}
this.columns = unmodifiableMap(mcolumns);
Expand All @@ -131,6 +133,13 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
};
}

private ColumnWriterBase createColumnWriterBase(
ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
ColumnWriterBase columnWriterBase = createColumnWriter(path, pageWriter, bloomFilterWriter, props);
columnWriterBase.initStatusManager(statusManager);
return columnWriterBase;
}

abstract ColumnWriterBase createColumnWriter(
ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter bloomFilterWriter, ParquetProperties props);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.parquet.column.impl;

import java.io.IOException;
import java.util.Objects;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
Expand Down Expand Up @@ -52,6 +53,7 @@ abstract class ColumnWriterBase implements ColumnWriter {

private long rowsWrittenSoFar = 0;
private int pageRowCount;
private StatusManager statusManager = StatusManager.create();

private final ColumnValueCollector collector;

Expand All @@ -74,6 +76,10 @@ abstract class ColumnWriterBase implements ColumnWriter {
this.collector = new ColumnValueCollector(path, bloomFilterWriter, props);
}

void initStatusManager(StatusManager statusManager) {
this.statusManager = Objects.requireNonNull(statusManager);
}

abstract ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path);

abstract ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path);
Expand Down Expand Up @@ -103,10 +109,15 @@ private void repetitionLevel(int repetitionLevel) {
@Override
public void writeNull(int repetitionLevel, int definitionLevel) {
if (DEBUG) log(null, repetitionLevel, definitionLevel);
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
collector.writeNull(repetitionLevel, definitionLevel);
++valueCount;
try {
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
collector.writeNull(repetitionLevel, definitionLevel);
++valueCount;
} catch (Throwable e) {
statusManager.abort();
throw e;
}
}

@Override
Expand Down Expand Up @@ -135,11 +146,16 @@ public long getBufferedSizeInMemory() {
@Override
public void write(double value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeDouble(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
try {
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeDouble(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
} catch (Throwable e) {
statusManager.abort();
throw e;
}
}

/**
Expand All @@ -152,11 +168,16 @@ public void write(double value, int repetitionLevel, int definitionLevel) {
@Override
public void write(float value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeFloat(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
try {
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeFloat(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
} catch (Throwable e) {
statusManager.abort();
throw e;
}
}

/**
Expand All @@ -169,11 +190,16 @@ public void write(float value, int repetitionLevel, int definitionLevel) {
@Override
public void write(Binary value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeBytes(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
try {
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeBytes(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
} catch (Throwable e) {
statusManager.abort();
throw e;
}
}

/**
Expand All @@ -186,11 +212,16 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) {
@Override
public void write(boolean value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeBoolean(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
try {
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeBoolean(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
} catch (Throwable e) {
statusManager.abort();
throw e;
}
}

/**
Expand All @@ -203,11 +234,16 @@ public void write(boolean value, int repetitionLevel, int definitionLevel) {
@Override
public void write(int value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeInteger(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
try {
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeInteger(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
} catch (Throwable e) {
statusManager.abort();
throw e;
}
}

/**
Expand All @@ -220,30 +256,44 @@ public void write(int value, int repetitionLevel, int definitionLevel) {
@Override
public void write(long value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeLong(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
try {
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
dataColumn.writeLong(value);
collector.write(value, repetitionLevel, definitionLevel);
++valueCount;
} catch (Throwable e) {
statusManager.abort();
throw e;
}
}

/**
* Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
* Is called right after writePage
*/
void finalizeColumnChunk() {
final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
if (dictionaryPage != null) {
if (DEBUG) LOG.debug("write dictionary");
try {
pageWriter.writeDictionaryPage(dictionaryPage);
} catch (IOException e) {
throw new ParquetEncodingException("could not write dictionary page for " + path, e);
}
dataColumn.resetDictionary();
if (statusManager.isAborted()) {
// We are aborting -> nothing to be done
return;
}
try {
final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
if (dictionaryPage != null) {
if (DEBUG) LOG.debug("write dictionary");
try {
pageWriter.writeDictionaryPage(dictionaryPage);
} catch (IOException e) {
throw new ParquetEncodingException("could not write dictionary page for " + path, e);
}
dataColumn.resetDictionary();
}

collector.finalizeColumnChunk();
collector.finalizeColumnChunk();
} catch (Throwable t) {
statusManager.abort();
throw t;
}
}

/**
Expand Down Expand Up @@ -317,26 +367,36 @@ void writePage() {
if (valueCount == 0) {
throw new ParquetEncodingException("writing empty page");
}
this.rowsWrittenSoFar += pageRowCount;
if (DEBUG) LOG.debug("write page");
if (statusManager.isAborted()) {
// We are aborting -> nothing to be done
return;
}
try {
writePage(
pageRowCount,
valueCount,
collector.getStatistics(),
collector.getSizeStatistics(),
repetitionLevelColumn,
definitionLevelColumn,
dataColumn);
} catch (IOException e) {
throw new ParquetEncodingException("could not write page for " + path, e);
this.rowsWrittenSoFar += pageRowCount;
if (DEBUG)
LOG.debug("write page");
try {
writePage(
pageRowCount,
valueCount,
collector.getStatistics(),
collector.getSizeStatistics(),
repetitionLevelColumn,
definitionLevelColumn,
dataColumn);
} catch (IOException e) {
throw new ParquetEncodingException("could not write page for " + path, e);
}
repetitionLevelColumn.reset();
definitionLevelColumn.reset();
dataColumn.reset();
valueCount = 0;
collector.resetPageStatistics();
pageRowCount = 0;
} catch(Throwable t) {
statusManager.abort();
throw t;
}
repetitionLevelColumn.reset();
definitionLevelColumn.reset();
dataColumn.reset();
valueCount = 0;
collector.resetPageStatistics();
pageRowCount = 0;
}

abstract void writePage(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.parquet.column.impl;

/**
* Interface to manage the current error status. It is used to share the status of all the different (column, page,
* etc.) writer/reader instances.
*/
interface StatusManager {

/**
* Creates an instance of the default {@link StatusManager} implementation.
*
* @return the newly created {@link StatusManager} instance
*/
static StatusManager create() {
return new StatusManager() {
private boolean aborted;

@Override
public void abort() {
aborted = true;
}

@Override
public boolean isAborted() {
return aborted;
}
};
}

/**
* To be invoked if the current process is to be aborted. For example in case of an exception is occurred during
* writing a page.
*/
void abort();

/**
* Returns whether the current process is aborted.
*
* @return {@code true} if the current process is aborted, {@code false} otherwise
*/
boolean isAborted();
}
Loading

0 comments on commit 29a86b4

Please sign in to comment.