From 4f47dccd0d038c4760a9c8e698e9f2c8eaacc17c Mon Sep 17 00:00:00 2001 From: georgema-google Date: Mon, 13 Mar 2023 19:07:03 -0400 Subject: [PATCH 1/9] handshake --- .../src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index ca6e67517c89..86736d4f9f70 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -765,4 +765,7 @@ public void populateDisplayData(DisplayData.Builder builder) { private transient BufferedMutator mutator; } } + + + // TODO: add public static class for RowMutationsIO } From 3e47cbff10c50bd1d0c9078c4e1b49b92d0a3496 Mon Sep 17 00:00:00 2001 From: georgema-google Date: Tue, 14 Mar 2023 12:05:56 -0400 Subject: [PATCH 2/9] Added coder, coder-test, HbaseIO, Shared Connection --- .../io/hbase/HBaseCoderProviderRegistrar.java | 1 + .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 203 +++++++++++++++++- .../sdk/io/hbase/HBaseRowMutationsCoder.java | 142 ++++++++++++ .../sdk/io/hbase/HBaseSharedConnection.java | 105 +++++++++ .../HBaseCoderProviderRegistrarTest.java | 6 + 5 files changed, 456 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java create mode 100644 sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java index e7cb823dff4b..88a0b47ef617 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java @@ -33,6 +33,7 @@ public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar { public List getCoderProviders() { return ImmutableList.of( HBaseMutationCoder.getCoderProvider(), + HBaseRowMutationsCoder.getCoderProvider(), CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of())); } } diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 86736d4f9f70..0d76a0bb73ee 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -41,11 +41,13 @@ import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -60,11 +62,13 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -767,5 +771,202 @@ public void populateDisplayData(DisplayData.Builder builder) { } - // TODO: add public static class for RowMutationsIO + public static WriteRowMutations writeRowMutations() { + return new WriteRowMutations(null /* Configuration */, ""); + } + + // TODO: write class-level Javadoc + // TODO: write tests for HbaseIO.writeRowMutations(), HBaseRowMutationsCoder + /** Transformation that writes RowMutation objects to a Hbase table. */ + public static class WriteRowMutations + extends PTransform>, PCollection> { + + /** Writes to the HBase instance indicated by the* given Configuration. */ + public WriteRowMutations withConfiguration(Configuration configuration) { + checkNotNull(configuration, "configuration cannot be null"); + return new WriteRowMutations(configuration, tableId); + } + + /** Writes to the specified table. */ + public WriteRowMutations withTableId(String tableId) { + checkNotNull(tableId, "tableId cannot be null"); + return new WriteRowMutations(configuration, tableId); + } + + private WriteRowMutations(Configuration configuration, String tableId) { + this.configuration = configuration; + this.tableId = tableId; + } + + @Override + public PCollection expand(PCollection> input) { + checkNotNull(configuration, "withConfiguration() is required"); + checkNotNull(tableId, "withTableId() is required"); + checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty"); + + return input.apply(ParDo.of(new WriteRowMutationsFn(this))); + // TODO: change this back to PDone later. + // return PDone.in(input.getPipeline()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("configuration", configuration.toString())); + builder.add(DisplayData.item("tableId", tableId)); + } + + public Configuration getConfiguration() { + return configuration; + } + + public String getTableId() { + return tableId; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WriteRowMutations writeRowMutations = (WriteRowMutations) o; + return configuration.toString().equals(writeRowMutations.configuration.toString()) + && Objects.equals(tableId, writeRowMutations.tableId); + } + + @Override + public int hashCode() { + return Objects.hash(configuration, tableId); + } + + /** + * The writeReplace method allows the developer to provide a replacement object that will be + * serialized instead of the original one. We use this to keep the enclosed class immutable. For + * more details on the technique see this + * article. + */ + private Object writeReplace() { + return new SerializationProxy(this); + } + + private static class SerializationProxy implements Serializable { + public SerializationProxy() {} + + public SerializationProxy(WriteRowMutations writeRowMutations) { + configuration = writeRowMutations.configuration; + tableId = writeRowMutations.tableId; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + SerializableCoder.of(SerializableConfiguration.class) + .encode(new SerializableConfiguration(this.configuration), out); + + StringUtf8Coder.of().encode(this.tableId, out); + } + + private void readObject(ObjectInputStream in) throws IOException { + this.configuration = SerializableCoder.of(SerializableConfiguration.class).decode(in).get(); + this.tableId = StringUtf8Coder.of().decode(in); + } + + Object readResolve() { + return HBaseIO.writeRowMutations() + .withConfiguration(configuration) + .withTableId(tableId); + } + + private Configuration configuration; + private String tableId; + } + + private final Configuration configuration; + private final String tableId; + + /** Function to write row mutations to a hbase table. */ + private class WriteRowMutationsFn extends DoFn, Integer> { + + public WriteRowMutationsFn( + WriteRowMutations writeRowMutations) { // , HbaseSharedConnection hbaseSharedConnection) { + checkNotNull(writeRowMutations.tableId, "tableId"); + checkNotNull(writeRowMutations.configuration, "configuration"); + } + + @Setup + public void setup() throws Exception { + connection = HBaseSharedConnection.getOrCreate(configuration); + } + + @StartBundle + public void startBundle(StartBundleContext c) throws IOException { + table = connection.getTable(TableName.valueOf(tableId)); + recordsWritten = 0; + } + + @FinishBundle + public void finishBundle() throws Exception { + if (table != null) { + table.close(); + table = null; + } + + LOG.debug("Wrote {} records", recordsWritten); + } + + @Teardown + public void tearDown() throws Exception { + + if (table != null) { + table.close(); + table = null; + } + + HBaseSharedConnection.close(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + RowMutations mutations = c.element().getValue(); + + try { + // Use Table instead of BufferedMutator to preserve mutation-ordering + table.mutateRow(mutations); + } catch (Exception e) { + throw new Exception( + (String.join( + " ", + "Table", + tableId, + "row", + Bytes.toString(mutations.getRow()), + "mutation failed.", + "\nTable Available/Enabled:", + Boolean.toString( + connection.getAdmin().isTableAvailable(TableName.valueOf(tableId))), + Boolean.toString( + connection.getAdmin().isTableEnabled(TableName.valueOf(tableId))), + "\nConnection Closed/Aborted/Locks:", + Boolean.toString(connection.isClosed()), + Boolean.toString(connection.isAborted())))); + } + + // TODO: what to do with this Metrics counter? + Metrics.counter(HBaseIO.class, "mutations_written_to_hbase").inc(); + // TODO: change this back to PDone when moving to HbaseIO + c.output(1); // Dummy output so that we can get Dataflow stats for throughput. + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(WriteRowMutations.this); + } + + private long recordsWritten; + private transient Connection connection; + private transient Table table; + } + } } diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java new file mode 100644 index 000000000000..12ac503b18ae --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java @@ -0,0 +1,142 @@ +/* + * Copyright (C) 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hbase; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; + +/** + * Row mutations coder to provide serialization support for Hbase RowMutations object, which isn't + * natively serializable. + */ +public class HBaseRowMutationsCoder extends AtomicCoder implements Serializable { + private static final HBaseRowMutationsCoder INSTANCE = new HBaseRowMutationsCoder(); + + public HBaseRowMutationsCoder() {} + + public static HBaseRowMutationsCoder of() { + return INSTANCE; + } + + @Override + public void encode(RowMutations value, OutputStream outStream) throws IOException { + + // encode row key + byte[] rowKey = value.getRow(); + int rowKeyLen = rowKey.length; + + // serialize row key + outStream.write(rowKeyLen); + outStream.write(rowKey); + + // serialize mutation list + List mutations = value.getMutations(); + int mutationsSize = mutations.size(); + outStream.write(mutationsSize); + for (Mutation mutation : mutations) { + MutationType type = getType(mutation); + MutationProto proto = ProtobufUtil.toMutation(type, mutation); + proto.writeDelimitedTo(outStream); + } + } + + @Override + public RowMutations decode(InputStream inStream) throws IOException { + + int rowKeyLen = inStream.read(); + byte[] rowKey = new byte[rowKeyLen]; + inStream.read(rowKey); + + RowMutations rowMutations = new RowMutations(rowKey); + int mutationListSize = inStream.read(); + for (int i = 0; i < mutationListSize; i++) { + Mutation m = ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream)); + MutationType type = getType(m); + + if (type == MutationType.PUT) { + rowMutations.add((Put) m); + } else if (type == MutationType.DELETE) { + rowMutations.add((Delete) m); + } + } + return rowMutations; + } + + private static MutationType getType(Mutation mutation) { + if (mutation instanceof Put) { + return MutationType.PUT; + } else if (mutation instanceof Delete) { + return MutationType.DELETE; + } else { + throw new IllegalArgumentException("Only Put and Delete are supported"); + } + } + + + /** + * Returns a {@link CoderProvider} which uses the {@link HBaseRowMutationsCoder} for + * {@link RowMutations}. + */ + static CoderProvider getCoderProvider() { + return HBASE_ROW_MUTATIONS_CODER_PROVIDER; + } + + private static final CoderProvider HBASE_ROW_MUTATIONS_CODER_PROVIDER = + new HBaseRowMutationsCoderProvider(); + + /** A {@link CoderProvider} for {@link Mutation mutations}. */ + private static class HBaseRowMutationsCoderProvider extends CoderProvider { + @Override + public Coder coderFor( + TypeDescriptor typeDescriptor, List> componentCoders) + throws CannotProvideCoderException { + if (!typeDescriptor.isSubtypeOf(HBASE_ROW_MUTATIONS_TYPE_DESCRIPTOR)) { + throw new CannotProvideCoderException( + String.format( + "Cannot provide %s because %s is not a subclass of %s", + HBaseRowMutationsCoder.class.getSimpleName(), + typeDescriptor, + Mutation.class.getName())); + } + + try { + @SuppressWarnings("unchecked") + Coder coder = (Coder) HBaseRowMutationsCoder.of(); + return coder; + } catch (IllegalArgumentException e) { + throw new CannotProvideCoderException(e); + } + } + } + + private static final TypeDescriptor HBASE_ROW_MUTATIONS_TYPE_DESCRIPTOR = + new TypeDescriptor() {}; +} diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java new file mode 100644 index 000000000000..6f7d5cc01b16 --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hbase; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Static connection shared between all threads of a worker. Connectors are not persisted between + * worker machines as Connection serialization is not implemented. Each worker will create its own + * connection and share it between all its threads. + */ +public class HBaseSharedConnection implements Serializable { + private static final long serialVersionUID = 5252999807656940415L; + private static final Logger LOG = LoggerFactory.getLogger(HBaseSharedConnection.class); + + // Transient connection to be initialized per worker + private static AtomicReference connection = new AtomicReference<>(); + // Number of threads using the shared connection, close connection if connectionCount goes to 0 + private static int connectionCount; + + /** + * Create or return existing Hbase connection. + * + * @param configuration Hbase configuration + * @return Hbase connection + * @throws IOException + */ + public static synchronized Connection getOrCreate(Configuration configuration) + throws IOException { + if (connection.get() == null || connection.get().isClosed()) { + forceCreate(configuration); + } + connectionCount++; + return connection.get(); + } + + /** + * Forcibly create new connection. + * + * @param configuration + * @throws IOException + */ + public static synchronized void forceCreate(Configuration configuration) throws IOException { + connection.set(ConnectionFactory.createConnection(configuration)); + connectionCount = 0; + } + + /** + * Decrement connector count and close connection if no more connector is using it. + * + * @throws IOException + */ + public static synchronized void close() throws IOException { + connectionCount--; + if (connectionCount == 0) { + forceClose(); + } + if (connectionCount < 0) { + LOG.warn("Connection count at " + connectionCount + ", should not be possible"); + } + } + + /** + * Forcibly close connection. + * + * @throws IOException + */ + public static synchronized void forceClose() throws IOException { + if (connection.get() != null) { + connection.get().close(); + connectionCount = 0; + } + } + + public String getDebugString() { + return String.format( + "Connection down: %s\n" + "Connectors: %s\n", + (connection.get() == null || connection.get().isClosed()), connectionCount); + } + + public int getConnectionCount() { + return connectionCount; + } + +} diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java index 25369fc50a20..fcfc59dc8de3 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -40,4 +41,9 @@ public void testMutationCoderIsRegistered() throws Exception { CoderRegistry.createDefault().getCoder(Put.class); CoderRegistry.createDefault().getCoder(Delete.class); } + + @Test + public void testRowMutationsCoderIsRegistered() throws Exception { + CoderRegistry.createDefault().getCoder(RowMutations.class); + } } From e92c06511c698f93d66650cbd37dc7433bb769d8 Mon Sep 17 00:00:00 2001 From: georgema-google Date: Tue, 14 Mar 2023 13:29:04 -0400 Subject: [PATCH 3/9] RowMutationsCoderTests added --- .../sdk/io/hbase/RowMutationsCoderTest.java | 149 ++++++++++++++++++ .../sdk/io/hbase/utils/HBaseTestUtils.java | 76 +++++++++ .../beam/sdk/io/hbase/utils/HashUtils.java | 147 +++++++++++++++++ .../sdk/io/hbase/utils/TestConstants.java | 44 ++++++ 4 files changed, 416 insertions(+) create mode 100644 sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java create mode 100644 sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HBaseTestUtils.java create mode 100644 sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java create mode 100644 sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java new file mode 100644 index 000000000000..a7a4bee3f934 --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java @@ -0,0 +1,149 @@ +/* + * Copyright (C) 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hbase; + + +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily2; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colQualifier; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colQualifier2; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.rowKey; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.timeT; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.value; + + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import org.apache.beam.sdk.io.hbase.utils.HBaseTestUtils; +import org.apache.beam.sdk.io.hbase.utils.HBaseTestUtils.HBaseMutationBuilder; +import org.apache.beam.sdk.io.hbase.utils.HashUtils; +import org.apache.hadoop.hbase.client.RowMutations; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Test that {@link HBaseRowMutationsCoder} encoding does not change {@link RowMutations} object. */ +@RunWith(JUnit4.class) +public class RowMutationsCoderTest { + + private final HBaseRowMutationsCoder coder = HBaseRowMutationsCoder.of(); + private ByteArrayOutputStream outputStream; + private ByteArrayInputStream inputStream; + + @Before + public void setUp() { + outputStream = new ByteArrayOutputStream(); + } + + @After + public void tearDown() throws IOException { + outputStream.close(); + } + + @Test + public void testEncodePut() throws Exception { + RowMutations put = new RowMutations(rowKey); + put.add(HBaseTestUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + coder.encode(put, outputStream); + + inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + + RowMutations decodedPut = coder.decode(inputStream); + + Assert.assertTrue(inputStream.available() == 0); + HashUtils.assertRowMutationsEquals(put, decodedPut); + } + + @Test + public void testEncodeMultipleMutations() throws Exception { + RowMutations multipleMutations = new RowMutations(rowKey); + multipleMutations.add( + HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + multipleMutations.add( + HBaseTestUtils.HBaseMutationBuilder.createDelete( + rowKey, colFamily, colQualifier, timeT)); + multipleMutations.add( + HBaseTestUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); + coder.encode(multipleMutations, outputStream); + + inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + + RowMutations decodedMultipleMutations = coder.decode(inputStream); + + Assert.assertTrue(inputStream.available() == 0); + HashUtils.assertRowMutationsEquals(multipleMutations, decodedMultipleMutations); + } + + @Test + public void testEncodeMultipleRowMutations() throws Exception { + RowMutations put = new RowMutations(rowKey); + put.add(HBaseTestUtils.HBaseMutationBuilder.createPut(rowKey, colFamily, colQualifier, value, timeT)); + RowMutations deleteCols = new RowMutations(rowKey); + deleteCols.add(HBaseTestUtils.HBaseMutationBuilder.createDelete(rowKey, colFamily, colQualifier, timeT)); + RowMutations deleteFamily = new RowMutations(rowKey); + deleteFamily.add(HBaseTestUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); + + coder.encode(put, outputStream); + coder.encode(deleteCols, outputStream); + coder.encode(deleteFamily, outputStream); + + inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + + RowMutations decodedPut = coder.decode(inputStream); + RowMutations decodedDeleteCols = coder.decode(inputStream); + RowMutations decodedDeleteFamily = coder.decode(inputStream); + + Assert.assertTrue(inputStream.available() == 0); + HashUtils.assertRowMutationsEquals(put, decodedPut); + HashUtils.assertRowMutationsEquals(deleteCols, decodedDeleteCols); + HashUtils.assertRowMutationsEquals(deleteFamily, decodedDeleteFamily); + } + + @Test + public void testEncodeMultipleComplexRowMutations() throws Exception { + RowMutations complexMutation = + new RowMutations(rowKey); + complexMutation.add(HBaseTestUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + complexMutation.add(HBaseTestUtils.HBaseMutationBuilder.createDelete( + rowKey, colFamily2, colQualifier2, timeT + 1)); + complexMutation.add(HBaseTestUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); + + coder.encode(complexMutation, outputStream); + coder.encode(complexMutation, outputStream); + coder.encode(complexMutation, outputStream); + + inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + + RowMutations decodedComplexMutation = coder.decode(inputStream); + RowMutations decodedComplexMutation2 = coder.decode(inputStream); + RowMutations decodedComplexMutation3 = coder.decode(inputStream); + + Assert.assertTrue(inputStream.available() == 0); + HashUtils.assertRowMutationsEquals(complexMutation, decodedComplexMutation); + HashUtils.assertRowMutationsEquals(complexMutation, decodedComplexMutation2); + HashUtils.assertRowMutationsEquals(complexMutation, decodedComplexMutation3); + } +} diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HBaseTestUtils.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HBaseTestUtils.java new file mode 100644 index 000000000000..ccc46c7e4fbd --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HBaseTestUtils.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hbase.utils; + +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily2; + +import java.io.IOException; +import java.util.UUID; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; + +/** Hbase-related convenience functions. */ +public class HBaseTestUtils { + + public static String getCell(Table table, byte[] rowKey, byte[] colFamily, byte[] colQualifier) + throws IOException { + + return Bytes.toString( + getRowResult(table, rowKey).getValue(colFamily, colQualifier)); + } + + public static Result getRowResult(Table table, byte[] rowKey) throws IOException { + return table.get(new Get(rowKey)); + } + + public static Table createTable(HBaseTestingUtility hbaseTestingUtil) throws IOException { + return createTable(hbaseTestingUtil, UUID.randomUUID().toString()); + } + + public static Table createTable(HBaseTestingUtility hbaseTestingUtil, String name) + throws IOException { + TableName tableName = TableName.valueOf(name); + return hbaseTestingUtil.createTable(tableName, new String[] {Bytes.toString(colFamily), + Bytes.toString(colFamily2)}); + } + + /** Builder class for Hbase mutations. */ + public static class HBaseMutationBuilder { + + public static Put createPut( + byte[] rowKey, byte[] colFamily, byte[] colQualifier, byte[] value, long atTimestamp) { + return new Put(rowKey, atTimestamp) + .addColumn(colFamily, colQualifier, value); + } + + public static Delete createDelete( + byte[] rowKey, byte[] colFamily, byte[] colQualifier, long atTimestamp) { + return new Delete(rowKey, atTimestamp) + .addColumns(colFamily, colQualifier); + } + + public static Delete createDeleteFamily(byte[] rowKey, byte[] colFamily, long atTimestamp) { + return new Delete(rowKey, atTimestamp).addFamily(colFamily); + } + } +} diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java new file mode 100644 index 000000000000..608e82d1c829 --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hbase.utils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility functions to help assert equality between mutation lists for testing purposes. */ +public class HashUtils { + + /** + * Asserts two {@link RowMutations} objects are equal by rowkey and list of {@link Mutation}. + * + * @param rowMutationA + * @param rowMutationB + * @throws Exception if hash function fails + */ + public static void assertRowMutationsEquals(RowMutations rowMutationA, RowMutations rowMutationB) + throws Exception { + if (rowMutationA == null || rowMutationB == null) { + Assert.assertEquals(rowMutationA, rowMutationB); + } + Assert.assertTrue(Bytes.equals(rowMutationA.getRow(),rowMutationB.getRow())); + Assert.assertEquals( + hashMutationList(rowMutationA.getMutations()), + hashMutationList(rowMutationB.getMutations())); + } + + /** + * Hashes list of {@link Mutation} into String, by iterating through Mutation {@link Cell} and + * picking out relevant attributes for comparison. + * + *

Different mutation types may have different hashing treatments. + * + * @param mutationList + * @return list of mutation strings that can be compared to other hashed mutation lists. + */ + public static List hashMutationList(List mutationList) throws Exception { + List mutations = new ArrayList<>(); + for (Mutation mutation : mutationList) { + List cells = new ArrayList<>(); + + CellScanner scanner = mutation.cellScanner(); + while (scanner.advance()) { + Cell c = scanner.current(); + String mutationType = ""; + long ts = 0; + + if (KeyValue.Type.codeToType(c.getTypeByte()).equals( KeyValue.Type.DeleteFamily)) { + // DeleteFamily has its timestamp created at runtime and cannot be compared with accuracy + // during tests, so we remove the timestamp altogether. + mutationType = "DELETE_FAMILY"; + ts = 0L; + } else if (KeyValue.Type.codeToType(c.getTypeByte()).equals( KeyValue.Type.DeleteColumn)) { + mutationType = "DELETE_COLUMN"; + ts = c.getTimestamp(); + } else if (KeyValue.Type.codeToType(c.getTypeByte()).equals( KeyValue.Type.Put)) { + mutationType = "PUT"; + ts = c.getTimestamp(); + } else { + throw new Exception("hashMutationList error: Cell type is not supported."); + } + + String cellHash = + String.join( + "_", + mutationType, + Long.toString(ts), + Bytes.toString(CellUtil.cloneRow(c)), + Bytes.toString(CellUtil.cloneFamily(c)), + Bytes.toString(CellUtil.cloneQualifier(c)), + Bytes.toString(CellUtil.cloneValue(c))); + + cells.add(cellHash); + } + + mutations.add(String.join(" > ", cells)); + } + + return mutations; + } + + /** + * {@link RowMutations} assert equality on rowkey only and does not guarantee that its mutations + * are the same nor that they are in the same order. + * + *

This transform splits a RowMutations object into so + * that two RowMutations objects can be compared via {@link org.apache.beam.sdk.testing.PAssert}. + */ + public static class HashHbaseRowMutations + extends PTransform< + PCollection>, PCollection>>> { + + + @Override + public PCollection>> expand( + PCollection> input) { + return input.apply(ParDo.of(new HashHbaseRowMutationsFn())); + } + } + + static class HashHbaseRowMutationsFn + extends DoFn, KV>> { + + public HashHbaseRowMutationsFn() {} + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + RowMutations rowMutations = c.element().getValue(); + + if (!Bytes.equals(c.element().getKey(),rowMutations.getRow())) { + throw new Exception("Hash error, KV rowkey is not the same as rowMutations rowkey"); + } + + c.output( + KV.of(Bytes.toString(rowMutations.getRow()), hashMutationList(rowMutations.getMutations()))); + } + } +} diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java new file mode 100644 index 000000000000..0f1777e0aded --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hbase.utils; + +import com.google.common.base.Charsets; +import java.nio.charset.StandardCharsets; + +/** Constants used for testing purposes. */ +public class TestConstants { + // Base timestamp, assumed to be in milliseconds. + public static long timeT = 123456000; + + public static byte[] rowKey = "row-key-1".getBytes(StandardCharsets.UTF_8); + + public static byte[] colFamily = "cf".getBytes(StandardCharsets.UTF_8); + public static byte[] colQualifier = "col1".getBytes(StandardCharsets.UTF_8); + public static byte[] value = "val-1".getBytes(StandardCharsets.UTF_8); + + public static String rowKey2 = "row-key-2"; + public static byte[] colFamily2 = "cf2".getBytes(StandardCharsets.UTF_8); + public static byte[] colQualifier2 = "col2".getBytes(StandardCharsets.UTF_8); + public static String value2 = "long-value-2"; + + // Variables for bidirectional replication. + public static String cbtQualifier = "SOURCE_CBT"; + public static String hbaseQualifier = "SOURCE_HBASE"; + + // Bigtable change stream constants. + public static String testCluster = "test-cluster-1"; + public static String testToken = "test-token"; +} From 4e23e25eea4b4f51c51dbc3a750c5c1ab90a0a4d Mon Sep 17 00:00:00 2001 From: georgema-google Date: Tue, 14 Mar 2023 15:19:04 -0400 Subject: [PATCH 4/9] Added HbaseRowMutationsIO tests, linted HbaseIO --- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 7 +- .../sdk/io/hbase/HBaseRowMutationsCoder.java | 27 +-- .../sdk/io/hbase/HBaseSharedConnection.java | 23 +- .../sdk/io/hbase/HbaseRowMutationIOTest.java | 214 ++++++++++++++++++ .../sdk/io/hbase/RowMutationsCoderTest.java | 74 +++--- .../beam/sdk/io/hbase/utils/HashUtils.java | 39 ++-- .../sdk/io/hbase/utils/TestConstants.java | 27 +-- ...BaseTestUtils.java => TestHBaseUtils.java} | 47 ++-- 8 files changed, 338 insertions(+), 120 deletions(-) create mode 100644 sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseRowMutationIOTest.java rename sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/{HBaseTestUtils.java => TestHBaseUtils.java} (54%) diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 0d76a0bb73ee..3f5311acc71e 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -154,6 +154,8 @@ * .withTableId("table")); * } * + * // TODO: Add section on writing {@link RowMutations} to Hbase. + * *

Experimental

* *

The design of the API for HBaseIO is currently related to the BigtableIO one, it can evolve or @@ -770,7 +772,6 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - public static WriteRowMutations writeRowMutations() { return new WriteRowMutations(null /* Configuration */, ""); } @@ -874,9 +875,7 @@ private void readObject(ObjectInputStream in) throws IOException { } Object readResolve() { - return HBaseIO.writeRowMutations() - .withConfiguration(configuration) - .withTableId(tableId); + return HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId); } private Configuration configuration; diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java index 12ac503b18ae..c2b657427df2 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java @@ -1,17 +1,19 @@ /* - * Copyright (C) 2023 Google LLC + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.beam.sdk.io.hbase; @@ -100,10 +102,9 @@ private static MutationType getType(Mutation mutation) { } } - /** - * Returns a {@link CoderProvider} which uses the {@link HBaseRowMutationsCoder} for - * {@link RowMutations}. + * Returns a {@link CoderProvider} which uses the {@link HBaseRowMutationsCoder} for {@link + * RowMutations}. */ static CoderProvider getCoderProvider() { return HBASE_ROW_MUTATIONS_CODER_PROVIDER; diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java index 6f7d5cc01b16..ac1c197e6348 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java @@ -1,17 +1,19 @@ /* - * Copyright (C) 2023 Google LLC + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.beam.sdk.io.hbase; @@ -101,5 +103,4 @@ public String getDebugString() { public int getConnectionCount() { return connectionCount; } - } diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseRowMutationIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseRowMutationIOTest.java new file mode 100644 index 000000000000..aef56233ddce --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseRowMutationIOTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily2; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colQualifier; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colQualifier2; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.rowKey; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.rowKey2; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.timeT; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.value; +import static org.apache.beam.sdk.io.hbase.utils.TestConstants.value2; + +import java.io.IOException; +import org.apache.beam.sdk.io.hbase.utils.TestHBaseUtils; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Unit tests for Hbase row mutation IO. */ +@RunWith(JUnit4.class) +public class HbaseRowMutationIOTest { + private static final Logger log = LoggerFactory.getLogger(HbaseRowMutationIOTest.class); + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private static HBaseTestingUtility htu; + private static final Configuration conf = HBaseConfiguration.create(); + + public HbaseRowMutationIOTest() {} + + @BeforeClass + public static void setUpCluster() throws Exception { + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + // Try to bind the hostname to localhost to solve an issue when it is not configured or + // no DNS resolution available. + conf.setStrings("hbase.master.hostname", "localhost"); + conf.setStrings("hbase.regionserver.hostname", "localhost"); + + // Create an HBase test cluster with one table. + htu = new HBaseTestingUtility(); + // We don't use the full htu.startMiniCluster() to avoid starting unneeded HDFS/MR daemons + htu.startMiniZKCluster(); + MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 4); + hbm.waitForActiveAndReadyMaster(); + log.info("Hbase test cluster started."); + } + + @AfterClass + public static void tearDownCluster() throws Exception { + if (htu != null) { + htu.shutdownMiniHBaseCluster(); + htu.shutdownMiniZKCluster(); + htu.cleanupTestDir(); + htu = null; + } + } + + @Before + public void setUp() throws IOException, InterruptedException { + // Provide custom encoder to non-serializable RowMutations class. + pipeline + .getCoderRegistry() + .registerCoderForClass(RowMutations.class, HBaseRowMutationsCoder.of()); + } + + @Test + public void testWritesPuts() throws Exception { + + Table table = TestHBaseUtils.createTable(htu); + + // Write two cells in one row mutations object + RowMutations rowMutationsOnTwoColumnFamilies = new RowMutations(rowKey); + rowMutationsOnTwoColumnFamilies.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + rowMutationsOnTwoColumnFamilies.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily2, colQualifier2, value2, timeT)); + + // Two mutations on same cell, later one should overwrite earlier one + RowMutations overwritingRowMutations = new RowMutations(rowKey2); + overwritingRowMutations.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey2, colFamily, colQualifier, value, timeT)); + overwritingRowMutations.add( + // Overwrites previous mutation + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey2, colFamily, colQualifier, value2, timeT)); + + pipeline + .apply( + "Create row mutations", + Create.of( + KV.of(rowKey, rowMutationsOnTwoColumnFamilies), + KV.of(rowKey2, overwritingRowMutations))) + .apply( + "Write to hbase", + HBaseIO.writeRowMutations() + .withConfiguration(htu.getConfiguration()) + .withTableId(table.getName().getNameAsString())); + + pipeline.run().waitUntilFinish(); + + Assert.assertEquals(2, TestHBaseUtils.getRowResult(table, rowKey).size()); + Assert.assertArrayEquals(value, TestHBaseUtils.getCell(table, rowKey, colFamily, colQualifier)); + Assert.assertArrayEquals( + value2, TestHBaseUtils.getCell(table, rowKey, colFamily2, colQualifier2)); + + Assert.assertEquals(1, TestHBaseUtils.getRowResult(table, rowKey2).size()); + Assert.assertArrayEquals( + value2, TestHBaseUtils.getCell(table, rowKey2, colFamily, colQualifier)); + } + + @Test + public void testWritesDeletes() throws Exception { + Table table = TestHBaseUtils.createTable(htu); + + // Expect deletes to result in empty row. + RowMutations deleteCellMutation = new RowMutations(rowKey); + deleteCellMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + deleteCellMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createDelete(rowKey, colFamily, colQualifier, timeT)); + // Expect delete family to delete entire row. + RowMutations deleteColFamilyMutation = new RowMutations(rowKey2); + deleteColFamilyMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey2, colFamily, colQualifier, value, timeT)); + deleteColFamilyMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey2, colFamily, colQualifier2, value2, timeT)); + deleteColFamilyMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createDeleteFamily(rowKey2, colFamily, Long.MAX_VALUE)); + + pipeline + .apply( + "Create row mutations", + Create.of(KV.of(rowKey, deleteCellMutation), KV.of(rowKey2, deleteColFamilyMutation))) + .apply( + "Write to hbase", + HBaseIO.writeRowMutations() + .withConfiguration(htu.getConfiguration()) + .withTableId(table.getName().getNameAsString())); + + pipeline.run().waitUntilFinish(); + + Assert.assertTrue(TestHBaseUtils.getRowResult(table, rowKey).isEmpty()); + Assert.assertTrue(TestHBaseUtils.getRowResult(table, rowKey2).isEmpty()); + } + + @Test + public void testWritesDeletesThenPutsInOrderByTimestamp() throws Exception { + Table table = TestHBaseUtils.createTable(htu); + + // RowMutations entry ordering does not guarantee mutation ordering, as Hbase operations + // are ordered by timestamp. See https://issues.apache.org/jira/browse/HBASE-2256 + RowMutations putDeletePut = new RowMutations(rowKey); + putDeletePut.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + putDeletePut.add( + TestHBaseUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT + 1)); + putDeletePut.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value2, timeT + 2)); + + pipeline + .apply("Create row mutations", Create.of(KV.of(rowKey, putDeletePut))) + .apply( + "Write to hbase", + HBaseIO.writeRowMutations() + .withConfiguration(htu.getConfiguration()) + .withTableId(table.getName().getNameAsString())); + + pipeline.run().waitUntilFinish(); + + Assert.assertArrayEquals( + value2, TestHBaseUtils.getCell(table, rowKey, colFamily, colQualifier)); + } +} diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java index a7a4bee3f934..5aac7c122679 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/RowMutationsCoderTest.java @@ -1,21 +1,22 @@ /* - * Copyright (C) 2023 Google LLC + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.beam.sdk.io.hbase; - import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily; import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colFamily2; import static org.apache.beam.sdk.io.hbase.utils.TestConstants.colQualifier; @@ -24,14 +25,12 @@ import static org.apache.beam.sdk.io.hbase.utils.TestConstants.timeT; import static org.apache.beam.sdk.io.hbase.utils.TestConstants.value; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.Arrays; -import org.apache.beam.sdk.io.hbase.utils.HBaseTestUtils; -import org.apache.beam.sdk.io.hbase.utils.HBaseTestUtils.HBaseMutationBuilder; import org.apache.beam.sdk.io.hbase.utils.HashUtils; +import org.apache.beam.sdk.io.hbase.utils.TestHBaseUtils; +import org.apache.beam.sdk.io.hbase.utils.TestHBaseUtils.HBaseMutationBuilder; import org.apache.hadoop.hbase.client.RowMutations; import org.junit.After; import org.junit.Assert; @@ -39,10 +38,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** Test that {@link HBaseRowMutationsCoder} encoding does not change {@link RowMutations} object. */ +/** + * Test that {@link HBaseRowMutationsCoder} encoding does not change {@link RowMutations} object. + */ @RunWith(JUnit4.class) public class RowMutationsCoderTest { @@ -63,8 +62,9 @@ public void tearDown() throws IOException { @Test public void testEncodePut() throws Exception { RowMutations put = new RowMutations(rowKey); - put.add(HBaseTestUtils.HBaseMutationBuilder.createPut( - rowKey, colFamily, colQualifier, value, timeT)); + put.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); coder.encode(put, outputStream); inputStream = new ByteArrayInputStream(outputStream.toByteArray()); @@ -79,13 +79,11 @@ public void testEncodePut() throws Exception { public void testEncodeMultipleMutations() throws Exception { RowMutations multipleMutations = new RowMutations(rowKey); multipleMutations.add( - HBaseMutationBuilder.createPut( - rowKey, colFamily, colQualifier, value, timeT)); + HBaseMutationBuilder.createPut(rowKey, colFamily, colQualifier, value, timeT)); multipleMutations.add( - HBaseTestUtils.HBaseMutationBuilder.createDelete( - rowKey, colFamily, colQualifier, timeT)); + TestHBaseUtils.HBaseMutationBuilder.createDelete(rowKey, colFamily, colQualifier, timeT)); multipleMutations.add( - HBaseTestUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); + TestHBaseUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); coder.encode(multipleMutations, outputStream); inputStream = new ByteArrayInputStream(outputStream.toByteArray()); @@ -99,11 +97,15 @@ public void testEncodeMultipleMutations() throws Exception { @Test public void testEncodeMultipleRowMutations() throws Exception { RowMutations put = new RowMutations(rowKey); - put.add(HBaseTestUtils.HBaseMutationBuilder.createPut(rowKey, colFamily, colQualifier, value, timeT)); + put.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); RowMutations deleteCols = new RowMutations(rowKey); - deleteCols.add(HBaseTestUtils.HBaseMutationBuilder.createDelete(rowKey, colFamily, colQualifier, timeT)); + deleteCols.add( + TestHBaseUtils.HBaseMutationBuilder.createDelete(rowKey, colFamily, colQualifier, timeT)); RowMutations deleteFamily = new RowMutations(rowKey); - deleteFamily.add(HBaseTestUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); + deleteFamily.add( + TestHBaseUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); coder.encode(put, outputStream); coder.encode(deleteCols, outputStream); @@ -123,13 +125,15 @@ public void testEncodeMultipleRowMutations() throws Exception { @Test public void testEncodeMultipleComplexRowMutations() throws Exception { - RowMutations complexMutation = - new RowMutations(rowKey); - complexMutation.add(HBaseTestUtils.HBaseMutationBuilder.createPut( - rowKey, colFamily, colQualifier, value, timeT)); - complexMutation.add(HBaseTestUtils.HBaseMutationBuilder.createDelete( - rowKey, colFamily2, colQualifier2, timeT + 1)); - complexMutation.add(HBaseTestUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); + RowMutations complexMutation = new RowMutations(rowKey); + complexMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createPut( + rowKey, colFamily, colQualifier, value, timeT)); + complexMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createDelete( + rowKey, colFamily2, colQualifier2, timeT + 1)); + complexMutation.add( + TestHBaseUtils.HBaseMutationBuilder.createDeleteFamily(rowKey, colFamily, timeT)); coder.encode(complexMutation, outputStream); coder.encode(complexMutation, outputStream); diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java index 608e82d1c829..3cc81c45c161 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java @@ -1,17 +1,19 @@ /* - * Copyright (C) 2023 Google LLC + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.beam.sdk.io.hbase.utils; @@ -30,8 +32,6 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Utility functions to help assert equality between mutation lists for testing purposes. */ public class HashUtils { @@ -48,7 +48,7 @@ public static void assertRowMutationsEquals(RowMutations rowMutationA, RowMutati if (rowMutationA == null || rowMutationB == null) { Assert.assertEquals(rowMutationA, rowMutationB); } - Assert.assertTrue(Bytes.equals(rowMutationA.getRow(),rowMutationB.getRow())); + Assert.assertTrue(Bytes.equals(rowMutationA.getRow(), rowMutationB.getRow())); Assert.assertEquals( hashMutationList(rowMutationA.getMutations()), hashMutationList(rowMutationB.getMutations())); @@ -74,15 +74,15 @@ public static List hashMutationList(List mutationList) throws String mutationType = ""; long ts = 0; - if (KeyValue.Type.codeToType(c.getTypeByte()).equals( KeyValue.Type.DeleteFamily)) { + if (KeyValue.Type.codeToType(c.getTypeByte()).equals(KeyValue.Type.DeleteFamily)) { // DeleteFamily has its timestamp created at runtime and cannot be compared with accuracy // during tests, so we remove the timestamp altogether. mutationType = "DELETE_FAMILY"; ts = 0L; - } else if (KeyValue.Type.codeToType(c.getTypeByte()).equals( KeyValue.Type.DeleteColumn)) { + } else if (KeyValue.Type.codeToType(c.getTypeByte()).equals(KeyValue.Type.DeleteColumn)) { mutationType = "DELETE_COLUMN"; ts = c.getTimestamp(); - } else if (KeyValue.Type.codeToType(c.getTypeByte()).equals( KeyValue.Type.Put)) { + } else if (KeyValue.Type.codeToType(c.getTypeByte()).equals(KeyValue.Type.Put)) { mutationType = "PUT"; ts = c.getTimestamp(); } else { @@ -119,7 +119,6 @@ public static class HashHbaseRowMutations extends PTransform< PCollection>, PCollection>>> { - @Override public PCollection>> expand( PCollection> input) { @@ -136,12 +135,14 @@ public HashHbaseRowMutationsFn() {} public void processElement(ProcessContext c) throws Exception { RowMutations rowMutations = c.element().getValue(); - if (!Bytes.equals(c.element().getKey(),rowMutations.getRow())) { + if (!Bytes.equals(c.element().getKey(), rowMutations.getRow())) { throw new Exception("Hash error, KV rowkey is not the same as rowMutations rowkey"); } c.output( - KV.of(Bytes.toString(rowMutations.getRow()), hashMutationList(rowMutations.getMutations()))); + KV.of( + Bytes.toString(rowMutations.getRow()), + hashMutationList(rowMutations.getMutations()))); } } } diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java index 0f1777e0aded..a6e01245dc94 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java @@ -1,21 +1,22 @@ /* - * Copyright (C) 2023 Google LLC + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.beam.sdk.io.hbase.utils; -import com.google.common.base.Charsets; import java.nio.charset.StandardCharsets; /** Constants used for testing purposes. */ @@ -29,10 +30,10 @@ public class TestConstants { public static byte[] colQualifier = "col1".getBytes(StandardCharsets.UTF_8); public static byte[] value = "val-1".getBytes(StandardCharsets.UTF_8); - public static String rowKey2 = "row-key-2"; + public static byte[] rowKey2 = "row-key-2".getBytes(StandardCharsets.UTF_8); public static byte[] colFamily2 = "cf2".getBytes(StandardCharsets.UTF_8); public static byte[] colQualifier2 = "col2".getBytes(StandardCharsets.UTF_8); - public static String value2 = "long-value-2"; + public static byte[] value2 = "long-value-2".getBytes(StandardCharsets.UTF_8); // Variables for bidirectional replication. public static String cbtQualifier = "SOURCE_CBT"; diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HBaseTestUtils.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestHBaseUtils.java similarity index 54% rename from sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HBaseTestUtils.java rename to sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestHBaseUtils.java index ccc46c7e4fbd..72751fcb6801 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HBaseTestUtils.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestHBaseUtils.java @@ -1,17 +1,19 @@ /* - * Copyright (C) 2023 Google LLC + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.beam.sdk.io.hbase.utils; @@ -30,28 +32,25 @@ import org.apache.hadoop.hbase.util.Bytes; /** Hbase-related convenience functions. */ -public class HBaseTestUtils { +public class TestHBaseUtils { - public static String getCell(Table table, byte[] rowKey, byte[] colFamily, byte[] colQualifier) + public static byte[] getCell(Table table, byte[] rowKey, byte[] colFamily, byte[] colQualifier) throws IOException { - - return Bytes.toString( - getRowResult(table, rowKey).getValue(colFamily, colQualifier)); + return getRowResult(table, rowKey).getValue(colFamily, colQualifier); } public static Result getRowResult(Table table, byte[] rowKey) throws IOException { return table.get(new Get(rowKey)); } - public static Table createTable(HBaseTestingUtility hbaseTestingUtil) throws IOException { - return createTable(hbaseTestingUtil, UUID.randomUUID().toString()); + public static Table createTable(HBaseTestingUtility htu) throws IOException { + return createTable(htu, UUID.randomUUID().toString()); } - public static Table createTable(HBaseTestingUtility hbaseTestingUtil, String name) - throws IOException { + public static Table createTable(HBaseTestingUtility htu, String name) throws IOException { TableName tableName = TableName.valueOf(name); - return hbaseTestingUtil.createTable(tableName, new String[] {Bytes.toString(colFamily), - Bytes.toString(colFamily2)}); + return htu.createTable( + tableName, new String[] {Bytes.toString(colFamily), Bytes.toString(colFamily2)}); } /** Builder class for Hbase mutations. */ @@ -59,14 +58,12 @@ public static class HBaseMutationBuilder { public static Put createPut( byte[] rowKey, byte[] colFamily, byte[] colQualifier, byte[] value, long atTimestamp) { - return new Put(rowKey, atTimestamp) - .addColumn(colFamily, colQualifier, value); + return new Put(rowKey, atTimestamp).addColumn(colFamily, colQualifier, value); } public static Delete createDelete( byte[] rowKey, byte[] colFamily, byte[] colQualifier, long atTimestamp) { - return new Delete(rowKey, atTimestamp) - .addColumns(colFamily, colQualifier); + return new Delete(rowKey, atTimestamp).addColumns(colFamily, colQualifier); } public static Delete createDeleteFamily(byte[] rowKey, byte[] colFamily, long atTimestamp) { From 4559a686f503a9064692e4b5999e206de0e94f5f Mon Sep 17 00:00:00 2001 From: georgema-google Date: Tue, 14 Mar 2023 17:32:35 -0400 Subject: [PATCH 5/9] Wrote up Javadocs for HbaseIO.writeRowMutations --- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 39 ++++++++++++++----- .../sdk/io/hbase/HBaseSharedConnection.java | 1 + 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 3f5311acc71e..fae1f01abd24 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -41,7 +41,6 @@ import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -137,6 +136,8 @@ * *

Writing to HBase

* + *

Writing {@link Mutation}

+ * *

The HBase sink executes a set of row mutations on a single table. It takes as input a {@link * PCollection PCollection<Mutation>}, where each {@link Mutation} represents an idempotent * transformation on a row. @@ -154,7 +155,31 @@ * .withTableId("table")); * } * - * // TODO: Add section on writing {@link RowMutations} to Hbase. + *

Writing {@link RowMutations}

+ * + *

An alternative way to write to HBase is with {@link HBaseIO#writeRowMutations()}, which takes + * as input a {@link PCollection>}, representing KVs of byte row keys and + * {@link RowMutations}. + * + *

This implementation is Dataflow specific. Useful for preserving mutation order if the upstream + * is ordered by row key, as RowMutations will only be applied after previous RowMutations are + * successful. + * + *

To configure the sink, you must supply a table id string and a {@link Configuration} to + * identify the HBase instance, for example: + * + *

{@code
+ * Configuration configuration = ...;
+ * PCollection> data = ...;
+ *
+ * data.apply("write",
+ *     HBaseIO.writeRowMutations()
+ *         .withConfiguration(configuration)
+ *         .withTableId("table"));
+ * }
+ * + *

Note that the transformation emits the number of RowMutations written as an integer after + * successfully writing to HBase. * *

Experimental

* @@ -776,8 +801,6 @@ public static WriteRowMutations writeRowMutations() { return new WriteRowMutations(null /* Configuration */, ""); } - // TODO: write class-level Javadoc - // TODO: write tests for HbaseIO.writeRowMutations(), HBaseRowMutationsCoder /** Transformation that writes RowMutation objects to a Hbase table. */ public static class WriteRowMutations extends PTransform>, PCollection> { @@ -806,8 +829,6 @@ public PCollection expand(PCollection> input) checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty"); return input.apply(ParDo.of(new WriteRowMutationsFn(this))); - // TODO: change this back to PDone later. - // return PDone.in(input.getPipeline()); } @Override @@ -952,10 +973,8 @@ public void processElement(ProcessContext c) throws Exception { Boolean.toString(connection.isAborted())))); } - // TODO: what to do with this Metrics counter? - Metrics.counter(HBaseIO.class, "mutations_written_to_hbase").inc(); - // TODO: change this back to PDone when moving to HbaseIO - c.output(1); // Dummy output so that we can get Dataflow stats for throughput. + // Dummy output so that we can get Dataflow stats for throughput. + c.output(1); } @Override diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java index ac1c197e6348..5ebf5d786317 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +// TODO: test SharedConnection on prod Dataflow instance. /** * Static connection shared between all threads of a worker. Connectors are not persisted between * worker machines as Connection serialization is not implemented. Each worker will create its own From 250ee6ecc261c357946a410f536ae4b44c2cbfb7 Mon Sep 17 00:00:00 2001 From: George Ma <111381964+georgecma@users.noreply.github.com> Date: Thu, 16 Mar 2023 12:59:22 -0400 Subject: [PATCH 6/9] Update sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java Co-authored-by: Yi Hu --- .../src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index fae1f01abd24..b71db1c4a8a3 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -158,7 +158,7 @@ *

Writing {@link RowMutations}

* *

An alternative way to write to HBase is with {@link HBaseIO#writeRowMutations()}, which takes - * as input a {@link PCollection>}, representing KVs of byte row keys and + * as input a {@link PCollection>}, representing KVs of bytes row keys and * {@link RowMutations}. * *

This implementation is Dataflow specific. Useful for preserving mutation order if the upstream From ccfd88ed2ffb31057f1274df263906a5065f2fa0 Mon Sep 17 00:00:00 2001 From: George Ma <111381964+georgecma@users.noreply.github.com> Date: Thu, 16 Mar 2023 13:00:19 -0400 Subject: [PATCH 7/9] Update sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java Co-authored-by: Yi Hu --- .../src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index b71db1c4a8a3..74760125226c 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -161,7 +161,7 @@ * as input a {@link PCollection>}, representing KVs of bytes row keys and * {@link RowMutations}. * - *

This implementation is Dataflow specific. Useful for preserving mutation order if the upstream + *

This implementation is useful for preserving mutation order if the upstream * is ordered by row key, as RowMutations will only be applied after previous RowMutations are * successful. * From 37ac14a9edbd7542f080b3da6925e755c3ffa852 Mon Sep 17 00:00:00 2001 From: George Ma <111381964+georgecma@users.noreply.github.com> Date: Thu, 16 Mar 2023 13:00:37 -0400 Subject: [PATCH 8/9] Update sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java Co-authored-by: Yi Hu --- .../src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 74760125226c..b1a67bf1d079 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -805,7 +805,7 @@ public static WriteRowMutations writeRowMutations() { public static class WriteRowMutations extends PTransform>, PCollection> { - /** Writes to the HBase instance indicated by the* given Configuration. */ + /** Writes to the HBase instance indicated by the given Configuration. */ public WriteRowMutations withConfiguration(Configuration configuration) { checkNotNull(configuration, "configuration cannot be null"); return new WriteRowMutations(configuration, tableId); From 5f2aa5a3f9f9a647c27a6ead4062702835d49e0d Mon Sep 17 00:00:00 2001 From: georgema-google Date: Thu, 16 Mar 2023 13:18:02 -0400 Subject: [PATCH 9/9] responded to comments, changed HbaseSharedConnection from AtomicReference to Array --- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 16 +++++++--------- .../sdk/io/hbase/HBaseSharedConnection.java | 17 ++++++++--------- ...t.java => HbaseIOWriteRowMutationsTest.java} | 8 ++++---- 3 files changed, 19 insertions(+), 22 deletions(-) rename sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/{HbaseRowMutationIOTest.java => HbaseIOWriteRowMutationsTest.java} (97%) diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index b1a67bf1d079..db63d9d0f32b 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -161,9 +161,8 @@ * as input a {@link PCollection>}, representing KVs of bytes row keys and * {@link RowMutations}. * - *

This implementation is useful for preserving mutation order if the upstream - * is ordered by row key, as RowMutations will only be applied after previous RowMutations are - * successful. + *

This implementation is useful for preserving mutation order if the upstream is ordered by row + * key, as RowMutations will only be applied after previous RowMutations are successful. * *

To configure the sink, you must supply a table id string and a {@link Configuration} to * identify the HBase instance, for example: @@ -803,7 +802,7 @@ public static WriteRowMutations writeRowMutations() { /** Transformation that writes RowMutation objects to a Hbase table. */ public static class WriteRowMutations - extends PTransform>, PCollection> { + extends PTransform>, PDone> { /** Writes to the HBase instance indicated by the given Configuration. */ public WriteRowMutations withConfiguration(Configuration configuration) { @@ -823,12 +822,13 @@ private WriteRowMutations(Configuration configuration, String tableId) { } @Override - public PCollection expand(PCollection> input) { + public PDone expand(PCollection> input) { checkNotNull(configuration, "withConfiguration() is required"); checkNotNull(tableId, "withTableId() is required"); checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty"); - return input.apply(ParDo.of(new WriteRowMutationsFn(this))); + input.apply(ParDo.of(new WriteRowMutationsFn(this))); + return PDone.in(input.getPipeline()); } @Override @@ -954,6 +954,7 @@ public void processElement(ProcessContext c) throws Exception { try { // Use Table instead of BufferedMutator to preserve mutation-ordering table.mutateRow(mutations); + recordsWritten++; } catch (Exception e) { throw new Exception( (String.join( @@ -972,9 +973,6 @@ public void processElement(ProcessContext c) throws Exception { Boolean.toString(connection.isClosed()), Boolean.toString(connection.isAborted())))); } - - // Dummy output so that we can get Dataflow stats for throughput. - c.output(1); } @Override diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java index 5ebf5d786317..7786fe20e67e 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java @@ -19,14 +19,12 @@ import java.io.IOException; import java.io.Serializable; -import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO: test SharedConnection on prod Dataflow instance. /** * Static connection shared between all threads of a worker. Connectors are not persisted between * worker machines as Connection serialization is not implemented. Each worker will create its own @@ -37,7 +35,8 @@ public class HBaseSharedConnection implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(HBaseSharedConnection.class); // Transient connection to be initialized per worker - private static AtomicReference connection = new AtomicReference<>(); + // Wrap Connection in array because static Connection cannot be non-null in beam repo + private static Connection[] connection = new Connection[1]; // Number of threads using the shared connection, close connection if connectionCount goes to 0 private static int connectionCount; @@ -50,11 +49,11 @@ public class HBaseSharedConnection implements Serializable { */ public static synchronized Connection getOrCreate(Configuration configuration) throws IOException { - if (connection.get() == null || connection.get().isClosed()) { + if (connection[0] == null || connection[0].isClosed()) { forceCreate(configuration); } connectionCount++; - return connection.get(); + return connection[0]; } /** @@ -64,7 +63,7 @@ public static synchronized Connection getOrCreate(Configuration configuration) * @throws IOException */ public static synchronized void forceCreate(Configuration configuration) throws IOException { - connection.set(ConnectionFactory.createConnection(configuration)); + connection[0] = ConnectionFactory.createConnection(configuration); connectionCount = 0; } @@ -89,8 +88,8 @@ public static synchronized void close() throws IOException { * @throws IOException */ public static synchronized void forceClose() throws IOException { - if (connection.get() != null) { - connection.get().close(); + if (connection != null) { + connection[0].close(); connectionCount = 0; } } @@ -98,7 +97,7 @@ public static synchronized void forceClose() throws IOException { public String getDebugString() { return String.format( "Connection down: %s\n" + "Connectors: %s\n", - (connection.get() == null || connection.get().isClosed()), connectionCount); + (connection[0] == null || connection[0].isClosed()), connectionCount); } public int getConnectionCount() { diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseRowMutationIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseIOWriteRowMutationsTest.java similarity index 97% rename from sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseRowMutationIOTest.java rename to sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseIOWriteRowMutationsTest.java index aef56233ddce..87798ae8bb5d 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseRowMutationIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HbaseIOWriteRowMutationsTest.java @@ -52,14 +52,14 @@ /** Unit tests for Hbase row mutation IO. */ @RunWith(JUnit4.class) -public class HbaseRowMutationIOTest { - private static final Logger log = LoggerFactory.getLogger(HbaseRowMutationIOTest.class); +public class HbaseIOWriteRowMutationsTest { + private static final Logger LOG = LoggerFactory.getLogger(HbaseIOWriteRowMutationsTest.class); @Rule public final transient TestPipeline pipeline = TestPipeline.create(); private static HBaseTestingUtility htu; private static final Configuration conf = HBaseConfiguration.create(); - public HbaseRowMutationIOTest() {} + public HbaseIOWriteRowMutationsTest() {} @BeforeClass public static void setUpCluster() throws Exception { @@ -75,7 +75,7 @@ public static void setUpCluster() throws Exception { htu.startMiniZKCluster(); MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 4); hbm.waitForActiveAndReadyMaster(); - log.info("Hbase test cluster started."); + LOG.info("Hbase test cluster started."); } @AfterClass