Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overload HbaseIO with KV<RowKey, RowMutations> #25831

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar {
public List<CoderProvider> getCoderProviders() {
return ImmutableList.of(
HBaseMutationCoder.getCoderProvider(),
HBaseRowMutationsCoder.getCoderProvider(),
CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
Expand All @@ -60,11 +61,13 @@
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -133,6 +136,8 @@
*
* <h3>Writing to HBase</h3>
*
* <h4>Writing {@link Mutation}</h4>
*
* <p>The HBase sink executes a set of row mutations on a single table. It takes as input a {@link
* PCollection PCollection&lt;Mutation&gt;}, where each {@link Mutation} represents an idempotent
* transformation on a row.
Expand All @@ -150,6 +155,31 @@
* .withTableId("table"));
* }</pre>
*
* <h4>Writing {@link RowMutations}</h4>
*
* <p>An alternative way to write to HBase is with {@link HBaseIO#writeRowMutations()}, which takes
* as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs of bytes row keys and
* {@link RowMutations}.
*
* <p>This implementation is useful for preserving mutation order if the upstream is ordered by row
* key, as RowMutations will only be applied after previous RowMutations are successful.
*
* <p>To configure the sink, you must supply a table id string and a {@link Configuration} to
* identify the HBase instance, for example:
*
* <pre>{@code
* Configuration configuration = ...;
* PCollection<KV<byte[], RowMutations>> data = ...;
*
* data.apply("write",
* HBaseIO.writeRowMutations()
* .withConfiguration(configuration)
* .withTableId("table"));
* }</pre>
*
* <p>Note that the transformation emits the number of RowMutations written as an integer after
* successfully writing to HBase.
*
* <h3>Experimental</h3>
*
* <p>The design of the API for HBaseIO is currently related to the BigtableIO one, it can evolve or
Expand Down Expand Up @@ -765,4 +795,194 @@ public void populateDisplayData(DisplayData.Builder builder) {
private transient BufferedMutator mutator;
}
}

public static WriteRowMutations writeRowMutations() {
return new WriteRowMutations(null /* Configuration */, "");
}

/** Transformation that writes RowMutation objects to a Hbase table. */
public static class WriteRowMutations
extends PTransform<PCollection<KV<byte[], RowMutations>>, PDone> {

/** Writes to the HBase instance indicated by the given Configuration. */
public WriteRowMutations withConfiguration(Configuration configuration) {
checkNotNull(configuration, "configuration cannot be null");
return new WriteRowMutations(configuration, tableId);
}

/** Writes to the specified table. */
public WriteRowMutations withTableId(String tableId) {
checkNotNull(tableId, "tableId cannot be null");
return new WriteRowMutations(configuration, tableId);
}

private WriteRowMutations(Configuration configuration, String tableId) {
this.configuration = configuration;
this.tableId = tableId;
}

@Override
public PDone expand(PCollection<KV<byte[], RowMutations>> input) {
checkNotNull(configuration, "withConfiguration() is required");
checkNotNull(tableId, "withTableId() is required");
checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");

input.apply(ParDo.of(new WriteRowMutationsFn(this)));
return PDone.in(input.getPipeline());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("configuration", configuration.toString()));
builder.add(DisplayData.item("tableId", tableId));
}

public Configuration getConfiguration() {
return configuration;
}

public String getTableId() {
return tableId;
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WriteRowMutations writeRowMutations = (WriteRowMutations) o;
return configuration.toString().equals(writeRowMutations.configuration.toString())
&& Objects.equals(tableId, writeRowMutations.tableId);
}

@Override
public int hashCode() {
return Objects.hash(configuration, tableId);
}

/**
* The writeReplace method allows the developer to provide a replacement object that will be
* serialized instead of the original one. We use this to keep the enclosed class immutable. For
* more details on the technique see <a
* href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
* article</a>.
*/
private Object writeReplace() {
return new SerializationProxy(this);
}

private static class SerializationProxy implements Serializable {
public SerializationProxy() {}

public SerializationProxy(WriteRowMutations writeRowMutations) {
configuration = writeRowMutations.configuration;
tableId = writeRowMutations.tableId;
}

private void writeObject(ObjectOutputStream out) throws IOException {
SerializableCoder.of(SerializableConfiguration.class)
.encode(new SerializableConfiguration(this.configuration), out);

StringUtf8Coder.of().encode(this.tableId, out);
}

private void readObject(ObjectInputStream in) throws IOException {
this.configuration = SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
this.tableId = StringUtf8Coder.of().decode(in);
}

Object readResolve() {
return HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
}

private Configuration configuration;
private String tableId;
}

private final Configuration configuration;
private final String tableId;

/** Function to write row mutations to a hbase table. */
private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>, Integer> {

public WriteRowMutationsFn(
WriteRowMutations writeRowMutations) { // , HbaseSharedConnection hbaseSharedConnection) {
checkNotNull(writeRowMutations.tableId, "tableId");
checkNotNull(writeRowMutations.configuration, "configuration");
}

@Setup
public void setup() throws Exception {
connection = HBaseSharedConnection.getOrCreate(configuration);
}

@StartBundle
public void startBundle(StartBundleContext c) throws IOException {
table = connection.getTable(TableName.valueOf(tableId));
recordsWritten = 0;
}

@FinishBundle
public void finishBundle() throws Exception {
if (table != null) {
table.close();
table = null;
}

LOG.debug("Wrote {} records", recordsWritten);
}

@Teardown
public void tearDown() throws Exception {

if (table != null) {
table.close();
table = null;
}

HBaseSharedConnection.close();
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
RowMutations mutations = c.element().getValue();

try {
// Use Table instead of BufferedMutator to preserve mutation-ordering
table.mutateRow(mutations);
recordsWritten++;
} catch (Exception e) {
throw new Exception(
(String.join(
" ",
"Table",
tableId,
"row",
Bytes.toString(mutations.getRow()),
"mutation failed.",
"\nTable Available/Enabled:",
Boolean.toString(
connection.getAdmin().isTableAvailable(TableName.valueOf(tableId))),
Boolean.toString(
connection.getAdmin().isTableEnabled(TableName.valueOf(tableId))),
"\nConnection Closed/Aborted/Locks:",
Boolean.toString(connection.isClosed()),
Boolean.toString(connection.isAborted()))));
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(WriteRowMutations.this);
}

private long recordsWritten;
private transient Connection connection;
private transient Table table;
}
}
}
Loading