From d0599a8faa8b704d4089c0f71278e30f2696e626 Mon Sep 17 00:00:00 2001 From: fancycoderzf Date: Thu, 12 Oct 2023 04:45:16 +0800 Subject: [PATCH 1/6] [INLONG-9009][Sort] Add HBase source and sink connector on flink 1.15 --- .../main/assemblies/sort-connectors-v1.15.xml | 8 + .../sort-end-to-end-tests-v1.15/pom.xml | 30 + .../sort/tests/HBaseSourceSinkTest.java | 141 ++ .../sort/tests/utils/HBaseContainer.java | 109 + .../src/test/resources/flinkSql/hbase_e2e.sql | 33 + .../sort-connectors/hbase/pom.xml | 244 +++ .../sort/hbase/HBase2DynamicTableFactory.java | 150 ++ .../hbase/sink/HBaseDynamicTableSink.java | 117 ++ .../sort/hbase/sink/HBaseSinkFunction.java | 376 ++++ .../org.apache.flink.table.factories.Factory | 16 + .../src/main/resources/hbase-default.xml | 1816 +++++++++++++++++ .../sort-flink-v1.15/sort-connectors/pom.xml | 1 + licenses/inlong-sort-connectors/LICENSE | 7 + 13 files changed, 3048 insertions(+) create mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/HBaseSourceSinkTest.java create mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java create mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/hbase_e2e.sql create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/resources/hbase-default.xml diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml index b7fead209a5..c4852942857 100644 --- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml +++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml @@ -75,5 +75,13 @@ 0644 + + ../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/target + inlong-sort/connectors + + sort-connector-hbase-v1.15-${project.version}.jar + + 0644 + diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml index 37244cf14f4..b833167d0f1 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml @@ -138,6 +138,17 @@ clickhouse-jdbc test + + junit + junit + ${junit.version} + test + + + org.apache.hadoop + hadoop-common + test + @@ -203,6 +214,14 @@ jar ${project.build.directory}/dependencies + + org.apache.inlong + sort-connector-hbase-v1.15 + ${project.version} + sort-connector-hbase.jar + jar + ${project.build.directory}/dependencies + @@ -213,6 +232,17 @@ pre-integration-test + + store-classpath-in-target-for-tests + + build-classpath + + package + + ${project.build.directory}/hadoop.classpath + org.apache.flink + + diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/HBaseSourceSinkTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/HBaseSourceSinkTest.java new file mode 100644 index 00000000000..503cba79cb9 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/HBaseSourceSinkTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests; + +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; +import org.apache.inlong.sort.tests.utils.HBaseContainer; +import org.apache.inlong.sort.tests.utils.TestUtils; + +import org.apache.flink.util.FileUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author zfancy + * @version 1.0 + */ +public class HBaseSourceSinkTest extends FlinkContainerTestEnv { + + private static final Logger LOG = LoggerFactory.getLogger(HBaseSourceSinkTest.class); + + private static final Path HADOOP_CP = TestUtils.getResource(".*hadoop.classpath"); + private static final Path hbaseJar = TestUtils.getResource("sort-connector-hbase.jar"); + // private static final Path hbaseJar = TestUtils.getResource("sort-connector-hbase-v1.15.jar"); + // Can't use getResource("xxx").getPath(), windows will don't know that path + private static final String sqlFile; + + private List hadoopCpJars; + + static { + try { + sqlFile = Paths.get(HBaseSourceSinkTest.class.getResource("/flinkSql/hbase_e2e.sql").toURI()).toString(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @ClassRule + public static final HBaseContainer HBASE_SOURCE_CONTAINER = (HBaseContainer) new HBaseContainer( + "2.2.3") + .withNetwork(NETWORK) + .withNetworkAliases("hbase1") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @ClassRule + public static final HBaseContainer HBASE_SINK_CONTAINER = (HBaseContainer) new HBaseContainer( + "2.2.3") + .withNetwork(NETWORK) + .withNetworkAliases("hbase2") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @Before + public void setup() throws IOException { + File hadoopClasspathFile = new File(HADOOP_CP.toAbsolutePath().toString()); + if (!hadoopClasspathFile.exists()) { + throw new FileNotFoundException( + "File that contains hadoop classpath " + HADOOP_CP + " does not exist."); + } + String classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile); + hadoopCpJars = + Arrays.stream(classPathContent.split(":")) + .map(Paths::get) + .collect(Collectors.toList()); + + // HBASE_SOURCE_CONTAINER.start(); + // HBASE_SINK_CONTAINER.start(); + waitUntilJobRunning(Duration.ofSeconds(30)); + initializeHBaseTable(); + } + + @AfterClass + public static void teardown() { + if (HBASE_SOURCE_CONTAINER != null) { + HBASE_SOURCE_CONTAINER.stop(); + } + if (HBASE_SINK_CONTAINER != null) { + HBASE_SINK_CONTAINER.stop(); + } + } + + private void initializeHBaseTable() { + try { + // HBASE_SOURCE_CONTAINER.start(); + // HBASE_SINK_CONTAINER.start(); + HBASE_SOURCE_CONTAINER.createTable("sourceTable", "family1", "family2"); + // HBASE_SINK_CONTAINER.createTable("sinkTable", "family1", "family2"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testHBaseSourceAndSink() throws Exception { + //still have some confusing problem + HBASE_SOURCE_CONTAINER.putData("sourceTable", "row1", "family1", "f1c1", "v1"); + HBASE_SOURCE_CONTAINER.putData("sourceTable", "row1", "family2", "f2c1", "v2"); + HBASE_SOURCE_CONTAINER.putData("sourceTable", "row1", "family2", "f2c2", "v3"); + HBASE_SOURCE_CONTAINER.putData("sourceTable", "row2", "family1", "f1c1", "v4"); + HBASE_SOURCE_CONTAINER.putData("sourceTable", "row2", "family2", "f2c1", "v5"); + HBASE_SOURCE_CONTAINER.putData("sourceTable", "row2", "family2", "f2c2", "v6"); + + Path[] jarPaths = new Path[hadoopCpJars.size() + 1]; + jarPaths[0] = hbaseJar; + for (int i = 0; i < hadoopCpJars.size(); i++) { + jarPaths[i + 1] = hadoopCpJars.get(i); + } + submitSQLJob(sqlFile, jarPaths); + waitUntilJobRunning(Duration.ofSeconds(10)); + } + +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java new file mode 100644 index 00000000000..4b7e164add2 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.util.Arrays; +import java.util.stream.Collectors; + +/** Standalone containerized HBase instance that builds the image on the fly. */ +@SuppressWarnings("rawtypes") +public class HBaseContainer extends GenericContainer { + + private static final String HBASE_BIN = "/opt/hbase/bin"; + private static final int MAX_RETRIES = 3; + + public HBaseContainer(String hbaseVersion) { + super(getImageFromDockerfile(hbaseVersion)); + } + + private static ImageFromDockerfile getImageFromDockerfile(String hbaseVersion) { + return new ImageFromDockerfile() + .withDockerfileFromBuilder( + builder -> builder.from("adoptopenjdk/openjdk8") + .env("HBASE_VERSION", hbaseVersion) + .run( + "export INITRD=no" + + " && export HBASE_DIST=\"http://archive.apache.org/dist/hbase\"" + + " && apt-get update -y" + + " && apt-get install -y --no-install-recommends curl" + + " && cd /opt" + + " && curl -SL $HBASE_DIST/$HBASE_VERSION/hbase-$HBASE_VERSION-bin.tar.gz" + + " | tar -x -z && mv hbase-${HBASE_VERSION} hbase") + .expose(2181) + .cmd( + "/bin/sh", + "-c", + String.format( + "nohup %s/start-hbase.sh & sleep infinity", + HBASE_BIN))); + } + + @Override + protected void containerIsStarted(InspectContainerResponse containerInfo) { + ExecResult res = null; + for (int i = 0; i < MAX_RETRIES; i++) { + try { + res = execCmd("scan 'hbase:meta'"); + if (res.getStdout().contains("hbase:namespace")) { + return; + } + Thread.sleep(5000L); + } catch (Exception e) { + throw new RuntimeException("Failed to verify if container is started.", e); + } + } + throw new IllegalStateException("Failed to start HBase properly:\n" + res); + } + + public Container.ExecResult createTable(String table, String... colFamilies) throws Exception { + String createCmd = + String.format("create '%s',", table) + + Arrays.stream(colFamilies) + .map(cf -> String.format("{NAME=>'%s'}", cf)) + .collect(Collectors.joining(",")); + + return execCmd(createCmd); + } + + public Container.ExecResult putData( + String table, String rowKey, String colFamily, String colQualifier, String val) + throws Exception { + String putCmd = + String.format( + "put '%s','%s','%s:%s','%s'", table, rowKey, colFamily, colQualifier, val); + + return execCmd(putCmd); + } + + public Container.ExecResult scanTable(String table) throws Exception { + String scanCmd = String.format("scan '%s'", table); + + return execCmd(scanCmd); + } + + private Container.ExecResult execCmd(String cmd) throws Exception { + String hbaseShellCmd = String.format("echo \"%s\" | %s/hbase shell", cmd, HBASE_BIN); + + return execInContainer("sh", "-c", hbaseShellCmd); + } +} \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/hbase_e2e.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/hbase_e2e.sql new file mode 100644 index 00000000000..5b67748d850 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/hbase_e2e.sql @@ -0,0 +1,33 @@ +CREATE TABLE MyHBaseSource ( + rowkey STRING, + family1 ROW, + family2 ROW +) WITH ( + 'connector' = 'hbase-2.2-inlong', + 'table-name' = 'sourceTable', + 'zookeeper.quorum' = 'hbase1:2181' +); + +CREATE TABLE MyHBaseSink +( + rowkey STRING, + family1 ROW, + family2 ROW +) WITH ( + 'connector' = 'hbase-2.2-inlong', + 'table-name' = 'sinkTable', + 'zookeeper.quorum' = 'hbase2:2181', + 'sink.buffer-flush.max-rows' = '1', + 'sink.buffer-flush.interval' = '2s' + ); + +INSERT INTO MyHBaseSink +SELECT rowkey, + ROW(a), + ROW(b, c) +FROM ( + SELECT rowkey, + REGEXP_REPLACE(family1.f1c1, 'v', 'value') as a, + family2.f2c1 as b, + family2.f2c2 as c + FROM MyHBaseSource) source; \ No newline at end of file diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml new file mode 100644 index 00000000000..b5a5c785515 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml @@ -0,0 +1,244 @@ + + + + 4.0.0 + + org.apache.inlong + sort-connectors-v1.15 + 1.10.0-SNAPSHOT + + + sort-connector-hbase-v1.15 + jar + Apache InLong - Sort-connector-hbase + + + ${project.parent.parent.parent.parent.parent.basedir} + 3.4.14 + + + + + + + org.apache.zookeeper + zookeeper + ${zookeeper.version} + + + + + + + org.apache.inlong + sort-connector-base + ${project.version} + + + org.apache.flink + flink-connector-hbase-2.2 + + + com.github.spotbugs + spotbugs-annotations + + + + + org.apache.hadoop + hadoop-common + provided + + + org.apache.inlong + sort-format-common + 1.10.0-SNAPSHOT + test + + + org.apache.inlong + sort-core + 1.10.0-SNAPSHOT + test + + + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${plugin.shade.version} + + + shade-flink + + shade + + package + + + + + hbase-default.xml + hbase-default.xml + + + + false + + + org.apache.inlong:* + org.apache.flink:flink-connector-hbase-base + org.apache.flink:flink-connector-hbase-2.2 + org.apache.hbase:hbase-* + org.apache.hbase.thirdparty:hbase-shaded-* + org.apache.zookeeper:zookeeper + org.apache.htrace:htrace-core4 + com.google.protobuf:protobuf-java + commons-codec:commons-codec + org.apache.commons:commons-crypto + org.apache.commons:commons-lang3 + io.netty:netty-all + com.google.protobuf:* + io.dropwizard.metrics:metrics-core + com.amazonaws:* + com.fasterxml.jackson.core:* + commons-logging:commons-logging + org.apache.httpcomponents:* + software.amazon.ion:* + joda-time:* + + + org.apache.hbase:hbase-metrics* + org.apache.hbase:hbase-server* + org.apache.hbase:hbase-hadoop*-compat + + + + + *:* + + + META-INF/services/com.fasterxml.** + META-INF/services/org.apache.hadoop.** + META-INF/services/javax.** + digesterRules.xml + properties.dtd + PropertyList-1.0.dtd + LICENSE.txt + *.proto + protobuf/* + + + + + + + org.apache.inlong.sort.base + org.apache.inlong.sort.hbase.shaded.org.apache.inlong.sort.base + + + org.apache.zookeeper + org.apache.flink.hbase.shaded.org.apache.zookeeper + + + org.apache.htrace + org.apache.flink.hbase.shaded.org.apache.htrace + + + com.google + org.apache.flink.hbase.shaded.com.google + + + com.yammer.metrics + org.apache.flink.hbase.shaded.com.yammer.metrics + + + org.apache.commons + org.apache.flink.hbase.shaded.org.apache.commons + + + org.apache.jute + org.apache.flink.hbase.shaded.org.apache.jute + + + io.netty + org.apache.flink.hbase.shaded.io.netty + + + org.apache.hadoop.hbase + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase + + + org.apache.hadoop.hbase.codec.* + + + + com.amazonaws + org.apache.inlong.sort.hbase.shaded.com.amazonaws + + + com.fasterxml.jackson.core + org.apache.inlong.sort.hbase.shaded.com.fasterxml.jackson.core + + + org.apache.commons.logging + org.apache.inlong.sort.hbase.shaded.org.apache.commons.logging + + + org.apache.http + org.apache.inlong.sort.hbase.shaded.org.apache.http + + + software.amazon.ion + org.apache.inlong.sort.hbase.shaded.software.amazon.ion + + + org.joda.time + org.apache.inlong.sort.hbase.shaded.org.joda.time + + + + + + + + + + diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java new file mode 100644 index 00000000000..eccb93b066a --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hbase; + +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils; +import org.apache.inlong.sort.hbase.sink.HBaseDynamicTableSink; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.hbase.options.HBaseLookupOptions; +import org.apache.flink.connector.hbase.options.HBaseWriteOptions; +import org.apache.flink.connector.hbase.util.HBaseTableSchema; +import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.hadoop.conf.Configuration; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_ASYNC; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_TTL; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_MAX_RETRIES; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.NULL_STRING_LITERAL; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.PROPERTIES_PREFIX; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseConfiguration; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseLookupOptions; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseWriteOptions; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.validatePrimaryKey; +import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; +import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper; +import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; + +/** HBase connector factory. */ +public class HBase2DynamicTableFactory + implements + DynamicTableSourceFactory, + DynamicTableSinkFactory { + + private static final String IDENTIFIER = "hbase-2.2-inlong"; + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context); + helper.validateExcept(PROPERTIES_PREFIX); + + final ReadableConfig tableOptions = helper.getOptions(); + + DataType tableSchema = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); + // Map options = context.getCatalogTable().getOptions(); + + validatePrimaryKey(tableSchema, new int[]{0}); + + String tableName = tableOptions.get(TABLE_NAME); + Configuration hbaseConf = getHBaseConfiguration(tableOptions); + HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions); + String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); + HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(tableSchema); + + return new HBaseDynamicTableSource( + hbaseConf, tableName, hbaseSchema, nullStringLiteral, lookupOptions); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context); + helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX); + + final ReadableConfig tableOptions = helper.getOptions(); + + DataType tableSchema = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); + // Map options = context.getCatalogTable().getOptions(); + + validatePrimaryKey(tableSchema, new int[]{0}); + + String tableName = tableOptions.get(TABLE_NAME); + Configuration hbaseConf = getHBaseConfiguration(tableOptions); + HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions); + String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); + HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(tableSchema); + String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); + String inlongAudit = tableOptions.get(INLONG_AUDIT); + final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(tableOptions); + final DirtySink dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); + return new HBaseDynamicTableSink( + tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral, + inlongMetric, inlongAudit, dirtyOptions, dirtySink); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> set = new HashSet<>(); + set.add(TABLE_NAME); + return set; + } + + @Override + public Set> optionalOptions() { + Set> set = new HashSet<>(); + set.add(ZOOKEEPER_ZNODE_PARENT); + set.add(ZOOKEEPER_QUORUM); + set.add(NULL_STRING_LITERAL); + set.add(SINK_BUFFER_FLUSH_MAX_SIZE); + set.add(SINK_BUFFER_FLUSH_MAX_ROWS); + set.add(SINK_BUFFER_FLUSH_INTERVAL); + set.add(SINK_PARALLELISM); + set.add(LOOKUP_ASYNC); + set.add(LOOKUP_CACHE_MAX_ROWS); + set.add(LOOKUP_CACHE_TTL); + set.add(LOOKUP_MAX_RETRIES); + set.add(INLONG_METRIC); + set.add(INLONG_AUDIT); + return set; + } +} \ No newline at end of file diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java new file mode 100644 index 00000000000..05f85ce1b98 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hbase.sink; + +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.hbase.options.HBaseWriteOptions; +import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter; +import org.apache.flink.connector.hbase.util.HBaseTableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.hadoop.conf.Configuration; + +import javax.annotation.Nullable; + +public class HBaseDynamicTableSink implements DynamicTableSink { + + private final String tableName; + private final HBaseTableSchema hbaseTableSchema; + private final Configuration hbaseConf; + private final HBaseWriteOptions writeOptions; + private final String nullStringLiteral; + private final String inlongMetric; + private final String inlongAudit; + private final DirtyOptions dirtyOptions; + private @Nullable final DirtySink dirtySink; + + public HBaseDynamicTableSink( + String tableName, + HBaseTableSchema hbaseTableSchema, + Configuration hbaseConf, + HBaseWriteOptions writeOptions, + String nullStringLiteral, + String inlongMetric, + String inlongAudit, + DirtyOptions dirtyOptions, + @Nullable DirtySink dirtySink) { + this.tableName = tableName; + this.hbaseTableSchema = hbaseTableSchema; + this.hbaseConf = hbaseConf; + this.writeOptions = writeOptions; + this.nullStringLiteral = nullStringLiteral; + this.inlongMetric = inlongMetric; + this.inlongAudit = inlongAudit; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; + } + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return ChangelogMode.all(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + HBaseSinkFunction sinkFunction = + new HBaseSinkFunction<>( + tableName, + hbaseConf, + new RowDataToMutationConverter(hbaseTableSchema, nullStringLiteral), + writeOptions.getBufferFlushMaxSizeInBytes(), + writeOptions.getBufferFlushMaxRows(), + writeOptions.getBufferFlushIntervalMillis(), + inlongMetric, inlongAudit, dirtyOptions, dirtySink); + return SinkFunctionProvider.of(sinkFunction, writeOptions.getParallelism()); + } + + @Override + public DynamicTableSink copy() { + return new HBaseDynamicTableSink( + tableName, hbaseTableSchema, hbaseConf, writeOptions, + nullStringLiteral, inlongMetric, inlongAudit, dirtyOptions, dirtySink); + } + + @Override + public String asSummaryString() { + return "HBase"; + } + + @VisibleForTesting + public HBaseTableSchema getHBaseTableSchema() { + return this.hbaseTableSchema; + } + + @VisibleForTesting + public HBaseWriteOptions getWriteOptions() { + return writeOptions; + } + + @VisibleForTesting + public Configuration getConfiguration() { + return this.hbaseConf; + } + + @VisibleForTesting + public String getTableName() { + return this.tableName; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java new file mode 100644 index 00000000000..eafffeaadd3 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hbase.sink; + +import org.apache.inlong.sort.base.dirty.DirtyData; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.DirtyType; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils; +import org.apache.inlong.sort.base.util.MetricStateUtils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.hbase.sink.HBaseMutationConverter; +import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + +@Internal +public class HBaseSinkFunction extends RichSinkFunction + implements + CheckpointedFunction, + BufferedMutator.ExceptionListener { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LoggerFactory.getLogger(HBaseSinkFunction.class); + + private final String hTableName; + private final byte[] serializedConfig; + + private final long bufferFlushMaxSizeInBytes; + private final long bufferFlushMaxMutations; + private final long bufferFlushIntervalMillis; + private final HBaseMutationConverter mutationConverter; + private final String inlongMetric; + private final String inlongAudit; + /** + * This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable} + * was thrown. + * + *

+ * Errors will be checked and rethrown before processing each input element, and when the + * sink is closed. + *

+ */ + private final AtomicReference failureThrowable = new AtomicReference<>(); + private transient ListState metricStateListState; + private transient MetricState metricState; + private SinkMetricData sinkMetricData; + private transient Connection connection; + private transient BufferedMutator mutator; + private transient ScheduledExecutorService executor; + private transient ScheduledFuture scheduledFuture; + private transient AtomicLong numPendingRequests; + private transient RuntimeContext runtimeContext; + private transient volatile boolean closed = false; + private Long dataSize = 0L; + private Long rowSize = 0L; + private final DirtyOptions dirtyOptions; + private @Nullable final DirtySink dirtySink; + + public HBaseSinkFunction( + String hTableName, + org.apache.hadoop.conf.Configuration conf, + HBaseMutationConverter mutationConverter, + long bufferFlushMaxSizeInBytes, + long bufferFlushMaxMutations, + long bufferFlushIntervalMillis, + String inlongMetric, + String inlongAudit, + DirtyOptions dirtyOptions, + @Nullable DirtySink dirtySink) { + this.hTableName = hTableName; + // Configuration is not serializable + this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); + this.mutationConverter = mutationConverter; + this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes; + this.bufferFlushMaxMutations = bufferFlushMaxMutations; + this.bufferFlushIntervalMillis = bufferFlushIntervalMillis; + this.inlongMetric = inlongMetric; + this.inlongAudit = inlongAudit; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; + } + + @Override + public void open(Configuration parameters) throws Exception { + LOGGER.info("Start hbase sink function open ..."); + org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration(); + try { + this.runtimeContext = getRuntimeContext(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(inlongAudit) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L) + .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) + .withRegisterMetric(MetricOption.RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup()); + } + if (dirtySink != null) { + dirtySink.open(parameters); + } + this.mutationConverter.open(); + this.numPendingRequests = new AtomicLong(0); + + if (null == connection) { + this.connection = ConnectionFactory.createConnection(config); + } + // create a parameter instance, set the table name and custom listener reference. + BufferedMutatorParams params = + new BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this); + if (bufferFlushMaxSizeInBytes > 0) { + params.writeBufferSize(bufferFlushMaxSizeInBytes); + } + this.mutator = connection.getBufferedMutator(params); + + if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) { + this.executor = + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("hbase-upsert-sink-flusher")); + this.scheduledFuture = + this.executor.scheduleWithFixedDelay( + () -> { + if (closed) { + return; + } + reportMetricAfterFlush(); + }, + bufferFlushIntervalMillis, + bufferFlushIntervalMillis, + TimeUnit.MILLISECONDS); + } + } catch (TableNotFoundException tnfe) { + LOGGER.error("The table " + hTableName + " not found ", tnfe); + throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe); + } catch (IOException ioe) { + LOGGER.error("Exception while creating connection to HBase.", ioe); + throw new RuntimeException("Cannot create connection to HBase.", ioe); + } + LOGGER.info("End hbase sink function open."); + } + + private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException { + // create default configuration from current runtime env (`hbase-site.xml` in classpath) + // first, + // and overwrite configuration using serialized configuration from client-side env + // (`hbase-site.xml` in classpath). + // user params from client-side have the highest priority + org.apache.hadoop.conf.Configuration runtimeConfig = + HBaseConfigurationUtil.deserializeConfiguration( + serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration()); + + // do validation: check key option(s) in final runtime configuration + if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) { + LOGGER.error( + "Can not connect to HBase without {} configuration", + HConstants.ZOOKEEPER_QUORUM); + throw new IOException( + "Check HBase configuration failed, lost: '" + + HConstants.ZOOKEEPER_QUORUM + + "'!"); + } + + return runtimeConfig; + } + + private void checkErrorAndRethrow() { + Throwable cause = failureThrowable.get(); + if (cause != null) { + LOGGER.error("An error occurred in HBaseSink.", cause); + throw new RuntimeException(cause); + } + } + + @Override + public void invoke(T value, Context context) { + checkErrorAndRethrow(); + RowData rowData = (RowData) value; + if (RowKind.UPDATE_BEFORE != rowData.getRowKind()) { + Mutation mutation = null; + try { + mutation = Preconditions.checkNotNull(mutationConverter.convertToMutation(value)); + rowSize++; + dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(value); + } catch (Exception e) { + LOGGER.error("Convert to mutation error", e); + if (!dirtyOptions.ignoreDirty()) { + throw new RuntimeException(e); + } + sinkMetricData.invokeDirtyWithEstimate(value); + if (dirtySink != null) { + DirtyData.Builder builder = DirtyData.builder(); + try { + builder.setData(rowData) + .setDirtyType(DirtyType.UNDEFINED) + .setLabels(dirtyOptions.getLabels()) + .setLogTag(dirtyOptions.getLogTag()) + .setDirtyMessage(e.getMessage()) + .setIdentifier(dirtyOptions.getIdentifier()); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOGGER.warn("Dirty sink failed", ex); + } + } + return; + } + try { + mutator.mutate(mutation); + } catch (Exception e) { + failureThrowable.compareAndSet(null, e); + } + } else { + rowSize++; + dataSize = dataSize + value.toString().getBytes(StandardCharsets.UTF_8).length; + } + // flush when the buffer number of mutations greater than the configured max size. + if (bufferFlushMaxMutations > 0 + && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) { + reportMetricAfterFlush(); + } + } + + private void reportMetricAfterFlush() { + try { + flush(); + if (sinkMetricData != null) { + sinkMetricData.invoke(rowSize, dataSize); + } + resetStateAfterFlush(); + } catch (Exception e) { + // fail the sink and skip the rest of the items + // if the failure handler decides to throw an exception + failureThrowable.compareAndSet(null, e); + } + } + + private void resetStateAfterFlush() { + dataSize = 0L; + rowSize = 0L; + } + + private void flush() throws IOException { + // BufferedMutator is thread-safe + mutator.flush(); + numPendingRequests.set(0); + checkErrorAndRethrow(); + } + + @Override + public void close() throws Exception { + closed = true; + + if (mutator != null) { + try { + mutator.close(); + } catch (IOException e) { + LOGGER.warn("Exception occurs while closing HBase BufferedMutator.", e); + } + this.mutator = null; + } + + if (connection != null) { + try { + connection.close(); + } catch (IOException e) { + LOGGER.warn("Exception occurs while closing HBase Connection.", e); + } + this.connection = null; + } + + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + if (executor != null) { + executor.shutdownNow(); + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + while (numPendingRequests.get() != 0) { + reportMetricAfterFlush(); + } + if (sinkMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData, + getRuntimeContext().getIndexOfThisSubtask()); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + if (this.inlongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { + }))); + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + } + + @Override + public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) + throws RetriesExhaustedWithDetailsException { + // fail the sink and skip the rest of the items + // if the failure handler decides to throw an exception + failureThrowable.compareAndSet(null, exception); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 00000000000..3ee47cd5ed8 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.inlong.sort.hbase.HBase2DynamicTableFactory \ No newline at end of file diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/resources/hbase-default.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/resources/hbase-default.xml new file mode 100644 index 00000000000..2d7e21d6369 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/resources/hbase-default.xml @@ -0,0 +1,1816 @@ + + + + + + + + + + hbase.tmp.dir + ${java.io.tmpdir}/hbase-${user.name} + Temporary directory on the local filesystem. + Change this setting to point to a location more permanent + than '/tmp', the usual resolve for java.io.tmpdir, as the + '/tmp' directory is cleared on machine restart. + + + hbase.rootdir + ${hbase.tmp.dir}/hbase + The directory shared by region servers and into + which HBase persists. The URL should be 'fully-qualified' + to include the filesystem scheme. For example, to specify the + HDFS directory '/hbase' where the HDFS instance's namenode is + running at namenode.example.org on port 9000, set this value to: + hdfs://namenode.example.org:9000/hbase. By default, we write + to whatever ${hbase.tmp.dir} is set too -- usually /tmp -- + so change this configuration or else all data will be lost on + machine restart. + + + hbase.cluster.distributed + false + The mode the cluster will be in. Possible values are + false for standalone mode and true for distributed mode. If + false, startup will run all HBase and ZooKeeper daemons together + in the one JVM. + + + hbase.zookeeper.quorum + localhost + Comma separated list of servers in the ZooKeeper ensemble + (This config. should have been named hbase.zookeeper.ensemble). + For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com". + By default this is set to localhost for local and pseudo-distributed modes + of operation. For a fully-distributed setup, this should be set to a full + list of ZooKeeper ensemble servers. If HBASE_MANAGES_ZK is set in hbase-env.sh + this is the list of servers which hbase will start/stop ZooKeeper on as + part of cluster start/stop. Client-side, we will take this list of + ensemble members and put it together with the hbase.zookeeper.property.clientPort + config. and pass it into zookeeper constructor as the connectString + parameter. + + + + + zookeeper.recovery.retry.maxsleeptime + 60000 + Max sleep time before retry zookeeper operations in milliseconds, + a max time is needed here so that sleep time won't grow unboundedly + + + + hbase.local.dir + ${hbase.tmp.dir}/local/ + Directory on the local filesystem to be used + as a local storage. + + + + + hbase.master.port + 16000 + The port the HBase Master should bind to. + + + hbase.master.info.port + 16010 + The port for the HBase Master web UI. + Set to -1 if you do not want a UI instance run. + + + hbase.master.info.bindAddress + 0.0.0.0 + The bind address for the HBase Master web UI + + + + hbase.master.logcleaner.ttl + 600000 + How long a WAL remain in the archive ({hbase.rootdir}/oldWALs) directory, + after which it will be cleaned by a Master thread. The value is in milliseconds. + + + hbase.master.procedurewalcleaner.ttl + 604800000 + How long a Procedure WAL will remain in the + archive directory, after which it will be cleaned + by a Master thread. The value is in milliseconds. + + + hbase.master.infoserver.redirect + true + Whether or not the Master listens to the Master web + UI port (hbase.master.info.port) and redirects requests to the web + UI server shared by the Master and RegionServer. Config. makes + sense when Master is serving Regions (not the default). + + + hbase.master.fileSplitTimeout + 600000 + Splitting a region, how long to wait on the file-splitting + step before aborting the attempt. Default: 600000. This setting used + to be known as hbase.regionserver.fileSplitTimeout in hbase-1.x. + Split is now run master-side hence the rename (If a + 'hbase.master.fileSplitTimeout' setting found, will use it to + prime the current 'hbase.master.fileSplitTimeout' + Configuration. + + + + + hbase.regionserver.port + 16020 + The port the HBase RegionServer binds to. + + + hbase.regionserver.info.port + 16030 + The port for the HBase RegionServer web UI + Set to -1 if you do not want the RegionServer UI to run. + + + hbase.regionserver.info.bindAddress + 0.0.0.0 + The address for the HBase RegionServer web UI + + + hbase.regionserver.info.port.auto + false + Whether or not the Master or RegionServer + UI should search for a port to bind to. Enables automatic port + search if hbase.regionserver.info.port is already in use. + Useful for testing, turned off by default. + + + hbase.regionserver.handler.count + 30 + Count of RPC Listener instances spun up on RegionServers. + Same property is used by the Master for count of master handlers. + Too many handlers can be counter-productive. Make it a multiple of + CPU count. If mostly read-only, handlers count close to cpu count + does well. Start with twice the CPU count and tune from there. + + + hbase.ipc.server.callqueue.handler.factor + 0.1 + Factor to determine the number of call queues. + A value of 0 means a single queue shared between all the handlers. + A value of 1 means that each handler has its own queue. + + + hbase.ipc.server.callqueue.read.ratio + 0 + Split the call queues into read and write queues. + The specified interval (which should be between 0.0 and 1.0) + will be multiplied by the number of call queues. + A value of 0 indicate to not split the call queues, meaning that both read and write + requests will be pushed to the same set of queues. + A value lower than 0.5 means that there will be less read queues than write queues. + A value of 0.5 means there will be the same number of read and write queues. + A value greater than 0.5 means that there will be more read queues than write queues. + A value of 1.0 means that all the queues except one are used to dispatch read requests. + + Example: Given the total number of call queues being 10 + a read.ratio of 0 means that: the 10 queues will contain both read/write requests. + a read.ratio of 0.3 means that: 3 queues will contain only read requests + and 7 queues will contain only write requests. + a read.ratio of 0.5 means that: 5 queues will contain only read requests + and 5 queues will contain only write requests. + a read.ratio of 0.8 means that: 8 queues will contain only read requests + and 2 queues will contain only write requests. + a read.ratio of 1 means that: 9 queues will contain only read requests + and 1 queues will contain only write requests. + + + + hbase.ipc.server.callqueue.scan.ratio + 0 + Given the number of read call queues, calculated from the total number + of call queues multiplied by the callqueue.read.ratio, the scan.ratio property + will split the read call queues into small-read and long-read queues. + A value lower than 0.5 means that there will be less long-read queues than short-read queues. + A value of 0.5 means that there will be the same number of short-read and long-read queues. + A value greater than 0.5 means that there will be more long-read queues than short-read queues + A value of 0 or 1 indicate to use the same set of queues for gets and scans. + + Example: Given the total number of read call queues being 8 + a scan.ratio of 0 or 1 means that: 8 queues will contain both long and short read requests. + a scan.ratio of 0.3 means that: 2 queues will contain only long-read requests + and 6 queues will contain only short-read requests. + a scan.ratio of 0.5 means that: 4 queues will contain only long-read requests + and 4 queues will contain only short-read requests. + a scan.ratio of 0.8 means that: 6 queues will contain only long-read requests + and 2 queues will contain only short-read requests. + + + + hbase.regionserver.msginterval + 3000 + Interval between messages from the RegionServer to Master + in milliseconds. + + + hbase.regionserver.logroll.period + 3600000 + Period at which we will roll the commit log regardless + of how many edits it has. + + + hbase.regionserver.logroll.errors.tolerated + 2 + The number of consecutive WAL close errors we will allow + before triggering a server abort. A setting of 0 will cause the + region server to abort if closing the current WAL writer fails during + log rolling. Even a small value (2 or 3) will allow a region server + to ride over transient HDFS errors. + + + hbase.regionserver.global.memstore.size + + Maximum size of all memstores in a region server before new + updates are blocked and flushes are forced. Defaults to 40% of heap (0.4). + Updates are blocked and flushes are forced until size of all memstores + in a region server hits hbase.regionserver.global.memstore.size.lower.limit. + The default value in this configuration has been intentionally left empty in order to + honor the old hbase.regionserver.global.memstore.upperLimit property if present. + + + + hbase.regionserver.global.memstore.size.lower.limit + + Maximum size of all memstores in a region server before flushes + are forced. Defaults to 95% of hbase.regionserver.global.memstore.size + (0.95). A 100% value for this value causes the minimum possible flushing + to occur when updates are blocked due to memstore limiting. The default + value in this configuration has been intentionally left empty in order to + honor the old hbase.regionserver.global.memstore.lowerLimit property if + present. + + + + hbase.systemtables.compacting.memstore.type + NONE + Determines the type of memstore to be used for system tables like + META, namespace tables etc. By default NONE is the type and hence we use the + default memstore for all the system tables. If we need to use compacting + memstore for system tables then set this property to BASIC/EAGER + + + + hbase.regionserver.optionalcacheflushinterval + 3600000 + + Maximum amount of time an edit lives in memory before being automatically flushed. + Default 1 hour. Set it to 0 to disable automatic flushing. + + + + hbase.regionserver.dns.interface + default + The name of the Network Interface from which a region server + should report its IP address. + + + hbase.regionserver.dns.nameserver + default + The host name or IP address of the name server (DNS) + which a region server should use to determine the host name used by the + master for communication and display purposes. + + + hbase.regionserver.regionSplitLimit + 1000 + + Limit for the number of regions after which no more region splitting + should take place. This is not hard limit for the number of regions + but acts as a guideline for the regionserver to stop splitting after + a certain limit. Default is set to 1000. + + + + + + zookeeper.session.timeout + 90000 + ZooKeeper session timeout in milliseconds. It is used in two different ways. + First, this value is used in the ZK client that HBase uses to connect to the ensemble. + It is also used by HBase when it starts a ZK server and it is passed as the 'maxSessionTimeout'. + See https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#ch_zkSessions. + For example, if an HBase region server connects to a ZK ensemble that's also managed + by HBase, then the session timeout will be the one specified by this configuration. + But, a region server that connects to an ensemble managed with a different configuration + will be subjected that ensemble's maxSessionTimeout. So, even though HBase might propose + using 90 seconds, the ensemble can have a max timeout lower than this and it will take + precedence. The current default maxSessionTimeout that ZK ships with is 40 seconds, which is lower than + HBase's. + + + + zookeeper.znode.parent + /hbase + Root ZNode for HBase in ZooKeeper. All of HBase's ZooKeeper + files that are configured with a relative path will go under this node. + By default, all of HBase's ZooKeeper file paths are configured with a + relative path, so they will all go under this directory unless changed. + + + + zookeeper.znode.acl.parent + acl + Root ZNode for access control lists. + + + hbase.zookeeper.dns.interface + default + The name of the Network Interface from which a ZooKeeper server + should report its IP address. + + + hbase.zookeeper.dns.nameserver + default + The host name or IP address of the name server (DNS) + which a ZooKeeper server should use to determine the host name used by the + master for communication and display purposes. + + + + hbase.zookeeper.peerport + 2888 + Port used by ZooKeeper peers to talk to each other. + See https://zookeeper.apache.org/doc/r3.3.3/zookeeperStarted.html#sc_RunningReplicatedZooKeeper + for more information. + + + hbase.zookeeper.leaderport + 3888 + Port used by ZooKeeper for leader election. + See https://zookeeper.apache.org/doc/r3.3.3/zookeeperStarted.html#sc_RunningReplicatedZooKeeper + for more information. + + + + + + hbase.zookeeper.property.initLimit + 10 + Property from ZooKeeper's config zoo.cfg. + The number of ticks that the initial synchronization phase can take. + + + hbase.zookeeper.property.syncLimit + 5 + Property from ZooKeeper's config zoo.cfg. + The number of ticks that can pass between sending a request and getting an + acknowledgment. + + + hbase.zookeeper.property.dataDir + ${hbase.tmp.dir}/zookeeper + Property from ZooKeeper's config zoo.cfg. + The directory where the snapshot is stored. + + + hbase.zookeeper.property.clientPort + 2181 + Property from ZooKeeper's config zoo.cfg. + The port at which the clients will connect. + + + hbase.zookeeper.property.maxClientCnxns + 300 + Property from ZooKeeper's config zoo.cfg. + Limit on number of concurrent connections (at the socket level) that a + single client, identified by IP address, may make to a single member of + the ZooKeeper ensemble. Set high to avoid zk connection issues running + standalone and pseudo-distributed. + + + + + + hbase.client.write.buffer + 2097152 + Default size of the BufferedMutator write buffer in bytes. + A bigger buffer takes more memory -- on both the client and server + side since server instantiates the passed write buffer to process + it -- but a larger buffer size reduces the number of RPCs made. + For an estimate of server-side memory-used, evaluate + hbase.client.write.buffer * hbase.regionserver.handler.count + + + hbase.client.pause + 100 + General client pause value. Used mostly as value to wait + before running a retry of a failed get, region lookup, etc. + See hbase.client.retries.number for description of how we backoff from + this initial pause amount and how this pause works w/ retries. + + + hbase.client.pause.cqtbe + + Whether or not to use a special client pause for + CallQueueTooBigException (cqtbe). Set this property to a higher value + than hbase.client.pause if you observe frequent CQTBE from the same + RegionServer and the call queue there keeps full + + + hbase.client.retries.number + 15 + Maximum retries. Used as maximum for all retryable + operations such as the getting of a cell's value, starting a row update, + etc. Retry interval is a rough function based on hbase.client.pause. At + first we retry at this interval but then with backoff, we pretty quickly reach + retrying every ten seconds. See HConstants#RETRY_BACKOFF for how the backup + ramps up. Change this setting and hbase.client.pause to suit your workload. + + + hbase.client.max.total.tasks + 100 + The maximum number of concurrent mutation tasks a single HTable instance will + send to the cluster. + + + hbase.client.max.perserver.tasks + 2 + The maximum number of concurrent mutation tasks a single HTable instance will + send to a single region server. + + + hbase.client.max.perregion.tasks + 1 + The maximum number of concurrent mutation tasks the client will + maintain to a single Region. That is, if there is already + hbase.client.max.perregion.tasks writes in progress for this region, new puts + won't be sent to this region until some writes finishes. + + + hbase.client.perserver.requests.threshold + 2147483647 + The max number of concurrent pending requests for one server in all client threads + (process level). Exceeding requests will be thrown ServerTooBusyException immediately to prevent + user's threads being occupied and blocked by only one slow region server. If you use a fix + number of threads to access HBase in a synchronous way, set this to a suitable value which is + related to the number of threads will help you. See + https://issues.apache.org/jira/browse/HBASE-16388 for details. + + + hbase.client.scanner.caching + 2147483647 + Number of rows that we try to fetch when calling next + on a scanner if it is not served from (local, client) memory. This configuration + works together with hbase.client.scanner.max.result.size to try and use the + network efficiently. The default value is Integer.MAX_VALUE by default so that + the network will fill the chunk size defined by hbase.client.scanner.max.result.size + rather than be limited by a particular number of rows since the size of rows varies + table to table. If you know ahead of time that you will not require more than a certain + number of rows from a scan, this configuration should be set to that row limit via + Scan#setCaching. Higher caching values will enable faster scanners but will eat up more + memory and some calls of next may take longer and longer times when the cache is empty. + Do not set this value such that the time between invocations is greater than the scanner + timeout; i.e. hbase.client.scanner.timeout.period + + + hbase.client.keyvalue.maxsize + 10485760 + Specifies the combined maximum allowed size of a KeyValue + instance. This is to set an upper boundary for a single entry saved in a + storage file. Since they cannot be split it helps avoiding that a region + cannot be split any further because the data is too large. It seems wise + to set this to a fraction of the maximum region size. Setting it to zero + or less disables the check. + + + hbase.server.keyvalue.maxsize + 10485760 + Maximum allowed size of an individual cell, inclusive of value and all key + components. A value of 0 or less disables the check. + The default value is 10MB. + This is a safety setting to protect the server from OOM situations. + + + + hbase.client.scanner.timeout.period + 60000 + Client scanner lease period in milliseconds. + + + hbase.client.localityCheck.threadPoolSize + 2 + + + + + hbase.bulkload.retries.number + 10 + Maximum retries. This is maximum number of iterations + to atomic bulk loads are attempted in the face of splitting operations + 0 means never give up. + + + hbase.master.balancer.maxRitPercent + 1.0 + The max percent of regions in transition when balancing. + The default value is 1.0. So there are no balancer throttling. If set this config to 0.01, + It means that there are at most 1% regions in transition when balancing. + Then the cluster's availability is at least 99% when balancing. + + + hbase.balancer.period + + 300000 + Period at which the region balancer runs in the Master. + + + hbase.normalizer.period + 300000 + Period at which the region normalizer runs in the Master. + + + hbase.regions.slop + 0.001 + Rebalance if any regionserver has average + (average * slop) regions. + The default value of this parameter is 0.001 in StochasticLoadBalancer (the default load balancer), + while the default is 0.2 in other load balancers (i.e., SimpleLoadBalancer). + + + hbase.server.thread.wakefrequency + 10000 + Time to sleep in between searches for work (in milliseconds). + Used as sleep interval by service threads such as log roller. + + + hbase.server.versionfile.writeattempts + 3 + + How many times to retry attempting to write a version file + before just aborting. Each attempt is separated by the + hbase.server.thread.wakefrequency milliseconds. + + + hbase.hregion.memstore.flush.size + 134217728 + + Memstore will be flushed to disk if size of the memstore + exceeds this number of bytes. Value is checked by a thread that runs + every hbase.server.thread.wakefrequency. + + + hbase.hregion.percolumnfamilyflush.size.lower.bound.min + 16777216 + + If FlushLargeStoresPolicy is used and there are multiple column families, + then every time that we hit the total memstore limit, we find out all the + column families whose memstores exceed a "lower bound" and only flush them + while retaining the others in memory. The "lower bound" will be + "hbase.hregion.memstore.flush.size / column_family_number" by default + unless value of this property is larger than that. If none of the families + have their memstore size more than lower bound, all the memstores will be + flushed (just as usual). + + + + hbase.hregion.preclose.flush.size + 5242880 + + If the memstores in a region are this size or larger when we go + to close, run a "pre-flush" to clear out memstores before we put up + the region closed flag and take the region offline. On close, + a flush is run under the close flag to empty memory. During + this time the region is offline and we are not taking on any writes. + If the memstore content is large, this flush could take a long time to + complete. The preflush is meant to clean out the bulk of the memstore + before putting up the close flag and taking the region offline so the + flush that runs under the close flag has little to do. + + + hbase.hregion.memstore.block.multiplier + 4 + + Block updates if memstore has hbase.hregion.memstore.block.multiplier + times hbase.hregion.memstore.flush.size bytes. Useful preventing + runaway memstore during spikes in update traffic. Without an + upper-bound, memstore fills such that when it flushes the + resultant flush files take a long time to compact or split, or + worse, we OOME. + + + hbase.hregion.memstore.mslab.enabled + true + + Enables the MemStore-Local Allocation Buffer, + a feature which works to prevent heap fragmentation under + heavy write loads. This can reduce the frequency of stop-the-world + GC pauses on large heaps. + + + hbase.hregion.max.filesize + 10737418240 + + Maximum HFile size. If the sum of the sizes of a region's HFiles has grown to exceed this + value, the region is split in two. + + + hbase.hregion.majorcompaction + 604800000 + Time between major compactions, expressed in milliseconds. Set to 0 to disable + time-based automatic major compactions. User-requested and size-based major compactions will + still run. This value is multiplied by hbase.hregion.majorcompaction.jitter to cause + compaction to start at a somewhat-random time during a given window of time. The default value + is 7 days, expressed in milliseconds. If major compactions are causing disruption in your + environment, you can configure them to run at off-peak times for your deployment, or disable + time-based major compactions by setting this parameter to 0, and run major compactions in a + cron job or by another external mechanism. + + + hbase.hregion.majorcompaction.jitter + 0.50 + A multiplier applied to hbase.hregion.majorcompaction to cause compaction to occur + a given amount of time either side of hbase.hregion.majorcompaction. The smaller the number, + the closer the compactions will happen to the hbase.hregion.majorcompaction + interval. + + + hbase.hstore.compactionThreshold + 3 + If more than this number of StoreFiles exist in any one Store + (one StoreFile is written per flush of MemStore), a compaction is run to rewrite all + StoreFiles into a single StoreFile. Larger values delay compaction, but when compaction does + occur, it takes longer to complete. + + + hbase.regionserver.compaction.enabled + true + Enable/disable compactions on by setting true/false. + We can further switch compactions dynamically with the + compaction_switch shell command. + + + hbase.hstore.flusher.count + 2 + The number of flush threads. With fewer threads, the MemStore flushes will be + queued. With more threads, the flushes will be executed in parallel, increasing the load on + HDFS, and potentially causing more compactions. + + + hbase.hstore.blockingStoreFiles + 16 + If more than this number of StoreFiles exist in any one Store (one StoreFile + is written per flush of MemStore), updates are blocked for this region until a compaction is + completed, or until hbase.hstore.blockingWaitTime has been exceeded. + + + hbase.hstore.blockingWaitTime + 90000 + The time for which a region will block updates after reaching the StoreFile limit + defined by hbase.hstore.blockingStoreFiles. After this time has elapsed, the region will stop + blocking updates even if a compaction has not been completed. + + + hbase.hstore.compaction.min + 3 + The minimum number of StoreFiles which must be eligible for compaction before + compaction can run. The goal of tuning hbase.hstore.compaction.min is to avoid ending up with + too many tiny StoreFiles to compact. Setting this value to 2 would cause a minor compaction + each time you have two StoreFiles in a Store, and this is probably not appropriate. If you + set this value too high, all the other values will need to be adjusted accordingly. For most + cases, the default value is appropriate. In previous versions of HBase, the parameter + hbase.hstore.compaction.min was named hbase.hstore.compactionThreshold. + + + hbase.hstore.compaction.max + 10 + The maximum number of StoreFiles which will be selected for a single minor + compaction, regardless of the number of eligible StoreFiles. Effectively, the value of + hbase.hstore.compaction.max controls the length of time it takes a single compaction to + complete. Setting it larger means that more StoreFiles are included in a compaction. For most + cases, the default value is appropriate. + + + hbase.hstore.compaction.min.size + 134217728 + A StoreFile (or a selection of StoreFiles, when using ExploringCompactionPolicy) + smaller than this size will always be eligible for minor compaction. + HFiles this size or larger are evaluated by hbase.hstore.compaction.ratio to determine if + they are eligible. Because this limit represents the "automatic include" limit for all + StoreFiles smaller than this value, this value may need to be reduced in write-heavy + environments where many StoreFiles in the 1-2 MB range are being flushed, because every + StoreFile will be targeted for compaction and the resulting StoreFiles may still be under the + minimum size and require further compaction. If this parameter is lowered, the ratio check is + triggered more quickly. This addressed some issues seen in earlier versions of HBase but + changing this parameter is no longer necessary in most situations. Default: 128 MB expressed + in bytes. + + + hbase.hstore.compaction.max.size + 9223372036854775807 + A StoreFile (or a selection of StoreFiles, when using ExploringCompactionPolicy) + larger than this size will be excluded from compaction. The effect of + raising hbase.hstore.compaction.max.size is fewer, larger StoreFiles that do not get + compacted often. If you feel that compaction is happening too often without much benefit, you + can try raising this value. Default: the value of LONG.MAX_VALUE, expressed in bytes. + + + hbase.hstore.compaction.ratio + 1.2F + For minor compaction, this ratio is used to determine whether a given StoreFile + which is larger than hbase.hstore.compaction.min.size is eligible for compaction. Its + effect is to limit compaction of large StoreFiles. The value of hbase.hstore.compaction.ratio + is expressed as a floating-point decimal. A large ratio, such as 10, will produce a single + giant StoreFile. Conversely, a low value, such as .25, will produce behavior similar to the + BigTable compaction algorithm, producing four StoreFiles. A moderate value of between 1.0 and + 1.4 is recommended. When tuning this value, you are balancing write costs with read costs. + Raising the value (to something like 1.4) will have more write costs, because you will + compact larger StoreFiles. However, during reads, HBase will need to seek through fewer + StoreFiles to accomplish the read. Consider this approach if you cannot take advantage of + Bloom filters. Otherwise, you can lower this value to something like 1.0 to reduce the + background cost of writes, and use Bloom filters to control the number of StoreFiles touched + during reads. For most cases, the default value is appropriate. + + + hbase.hstore.compaction.ratio.offpeak + 5.0F + Allows you to set a different (by default, more aggressive) ratio for determining + whether larger StoreFiles are included in compactions during off-peak hours. Works in the + same way as hbase.hstore.compaction.ratio. Only applies if hbase.offpeak.start.hour and + hbase.offpeak.end.hour are also enabled. + + + hbase.hstore.time.to.purge.deletes + 0 + The amount of time to delay purging of delete markers with future timestamps. If + unset, or set to 0, all delete markers, including those with future timestamps, are purged + during the next major compaction. Otherwise, a delete marker is kept until the major compaction + which occurs after the marker's timestamp plus the value of this setting, in milliseconds. + + + + hbase.offpeak.start.hour + -1 + The start of off-peak hours, expressed as an integer between 0 and 23, inclusive. + Set to -1 to disable off-peak. + + + hbase.offpeak.end.hour + -1 + The end of off-peak hours, expressed as an integer between 0 and 23, inclusive. Set + to -1 to disable off-peak. + + + hbase.regionserver.thread.compaction.throttle + 2684354560 + There are two different thread pools for compactions, one for large compactions and + the other for small compactions. This helps to keep compaction of lean tables (such as + hbase:meta) fast. If a compaction is larger than this threshold, it + goes into the large compaction pool. In most cases, the default value is appropriate. Default: + 2 x hbase.hstore.compaction.max x hbase.hregion.memstore.flush.size (which defaults to 128MB). + The value field assumes that the value of hbase.hregion.memstore.flush.size is unchanged from + the default. + + + hbase.regionserver.majorcompaction.pagecache.drop + true + Specifies whether to drop pages read/written into the system page cache by + major compactions. Setting it to true helps prevent major compactions from + polluting the page cache, which is almost always required, especially for clusters + with low/moderate memory to storage ratio. + + + hbase.regionserver.minorcompaction.pagecache.drop + true + Specifies whether to drop pages read/written into the system page cache by + minor compactions. Setting it to true helps prevent minor compactions from + polluting the page cache, which is most beneficial on clusters with low + memory to storage ratio or very write heavy clusters. You may want to set it to + false under moderate to low write workload when bulk of the reads are + on the most recently written data. + + + hbase.hstore.compaction.kv.max + 10 + The maximum number of KeyValues to read and then write in a batch when flushing or + compacting. Set this lower if you have big KeyValues and problems with Out Of Memory + Exceptions Set this higher if you have wide, small rows. + + + hbase.storescanner.parallel.seek.enable + false + + Enables StoreFileScanner parallel-seeking in StoreScanner, + a feature which can reduce response latency under special conditions. + + + hbase.storescanner.parallel.seek.threads + 10 + + The default thread pool size if parallel-seeking feature enabled. + + + hfile.block.cache.size + 0.4 + Percentage of maximum heap (-Xmx setting) to allocate to block cache + used by a StoreFile. Default of 0.4 means allocate 40%. + Set to 0 to disable but it's not recommended; you need at least + enough cache to hold the storefile indices. + + + hfile.block.index.cacheonwrite + false + This allows to put non-root multi-level index blocks into the block + cache at the time the index is being written. + + + hfile.index.block.max.size + 131072 + When the size of a leaf-level, intermediate-level, or root-level + index block in a multi-level block index grows to this size, the + block is written out and a new block is started. + + + hbase.bucketcache.ioengine + + Where to store the contents of the bucketcache. One of: offheap, + file, files or mmap. If a file or files, set it to file(s):PATH_TO_FILE. + mmap means the content will be in an mmaped file. Use mmap:PATH_TO_FILE. + See http://hbase.apache.org/book.html#offheap.blockcache for more information. + + + + hbase.bucketcache.size + + A float that EITHER represents a percentage of total heap memory + size to give to the cache (if < 1.0) OR, it is the total capacity in + megabytes of BucketCache. Default: 0.0 + + + hbase.bucketcache.bucket.sizes + + A comma-separated list of sizes for buckets for the bucketcache. + Can be multiple sizes. List block sizes in order from smallest to largest. + The sizes you use will depend on your data access patterns. + Must be a multiple of 256 else you will run into + 'java.io.IOException: Invalid HFile block magic' when you go to read from cache. + If you specify no values here, then you pick up the default bucketsizes set + in code (See BucketAllocator#DEFAULT_BUCKET_SIZES). + + + + hfile.format.version + 3 + The HFile format version to use for new files. + Version 3 adds support for tags in hfiles (See http://hbase.apache.org/book.html#hbase.tags). + Also see the configuration 'hbase.replication.rpc.codec'. + + + + hfile.block.bloom.cacheonwrite + false + Enables cache-on-write for inline blocks of a compound Bloom filter. + + + io.storefile.bloom.block.size + 131072 + The size in bytes of a single block ("chunk") of a compound Bloom + filter. This size is approximate, because Bloom blocks can only be + inserted at data block boundaries, and the number of keys per data + block varies. + + + hbase.rs.cacheblocksonwrite + false + Whether an HFile block should be added to the block cache when the + block is finished. + + + hbase.rpc.timeout + 60000 + This is for the RPC layer to define how long (millisecond) HBase client applications + take for a remote call to time out. It uses pings to check connections + but will eventually throw a TimeoutException. + + + hbase.client.operation.timeout + 1200000 + Operation timeout is a top-level restriction (millisecond) that makes sure a + blocking operation in Table will not be blocked more than this. In each operation, if rpc + request fails because of timeout or other reason, it will retry until success or throw + RetriesExhaustedException. But if the total time being blocking reach the operation timeout + before retries exhausted, it will break early and throw SocketTimeoutException. + + + hbase.cells.scanned.per.heartbeat.check + 10000 + The number of cells scanned in between heartbeat checks. Heartbeat + checks occur during the processing of scans to determine whether or not the + server should stop scanning in order to send back a heartbeat message to the + client. Heartbeat messages are used to keep the client-server connection alive + during long running scans. Small values mean that the heartbeat checks will + occur more often and thus will provide a tighter bound on the execution time of + the scan. Larger values mean that the heartbeat checks occur less frequently + + + + hbase.rpc.shortoperation.timeout + 10000 + This is another version of "hbase.rpc.timeout". For those RPC operation + within cluster, we rely on this configuration to set a short timeout limitation + for short operation. For example, short rpc timeout for region server's trying + to report to active master can benefit quicker master failover process. + + + hbase.ipc.client.tcpnodelay + true + Set no delay on rpc socket connections. See + http://docs.oracle.com/javase/1.5.0/docs/api/java/net/Socket.html#getTcpNoDelay() + + + hbase.regionserver.hostname + + This config is for experts: don't set its value unless you really know what you are doing. + When set to a non-empty value, this represents the (external facing) hostname for the underlying server. + See https://issues.apache.org/jira/browse/HBASE-12954 for details. + + + hbase.regionserver.hostname.disable.master.reversedns + false + This config is for experts: don't set its value unless you really know what you are doing. + When set to true, regionserver will use the current node hostname for the servername and HMaster will + skip reverse DNS lookup and use the hostname sent by regionserver instead. Note that this config and + hbase.regionserver.hostname are mutually exclusive. See https://issues.apache.org/jira/browse/HBASE-18226 + for more details. + + + + hbase.master.keytab.file + + Full path to the kerberos keytab file to use for logging in + the configured HMaster server principal. + + + hbase.master.kerberos.principal + + Ex. "hbase/_HOST@EXAMPLE.COM". The kerberos principal name + that should be used to run the HMaster process. The principal name should + be in the form: user/hostname@DOMAIN. If "_HOST" is used as the hostname + portion, it will be replaced with the actual hostname of the running + instance. + + + hbase.regionserver.keytab.file + + Full path to the kerberos keytab file to use for logging in + the configured HRegionServer server principal. + + + hbase.regionserver.kerberos.principal + + Ex. "hbase/_HOST@EXAMPLE.COM". The kerberos principal name + that should be used to run the HRegionServer process. The principal name + should be in the form: user/hostname@DOMAIN. If "_HOST" is used as the + hostname portion, it will be replaced with the actual hostname of the + running instance. An entry for this principal must exist in the file + specified in hbase.regionserver.keytab.file + + + + hadoop.policy.file + hbase-policy.xml + The policy configuration file used by RPC servers to make + authorization decisions on client requests. Only used when HBase + security is enabled. + + + hbase.superuser + + List of users or groups (comma-separated), who are allowed + full privileges, regardless of stored ACLs, across the cluster. + Only used when HBase security is enabled. + + + hbase.auth.key.update.interval + 86400000 + The update interval for master key for authentication tokens + in servers in milliseconds. Only used when HBase security is enabled. + + + hbase.auth.token.max.lifetime + 604800000 + The maximum lifetime in milliseconds after which an + authentication token expires. Only used when HBase security is enabled. + + + hbase.ipc.client.fallback-to-simple-auth-allowed + false + When a client is configured to attempt a secure connection, but attempts to + connect to an insecure server, that server may instruct the client to + switch to SASL SIMPLE (unsecure) authentication. This setting controls + whether or not the client will accept this instruction from the server. + When false (the default), the client will not allow the fallback to SIMPLE + authentication, and will abort the connection. + + + hbase.ipc.server.fallback-to-simple-auth-allowed + false + When a server is configured to require secure connections, it will + reject connection attempts from clients using SASL SIMPLE (unsecure) authentication. + This setting allows secure servers to accept SASL SIMPLE connections from clients + when the client requests. When false (the default), the server will not allow the fallback + to SIMPLE authentication, and will reject the connection. WARNING: This setting should ONLY + be used as a temporary measure while converting clients over to secure authentication. It + MUST BE DISABLED for secure operation. + + + hbase.display.keys + true + When this is set to true the webUI and such will display all start/end keys + as part of the table details, region names, etc. When this is set to false, + the keys are hidden. + + + hbase.coprocessor.enabled + true + Enables or disables coprocessor loading. If 'false' + (disabled), any other coprocessor related configuration will be ignored. + + + + hbase.coprocessor.user.enabled + true + Enables or disables user (aka. table) coprocessor loading. + If 'false' (disabled), any table coprocessor attributes in table + descriptors will be ignored. If "hbase.coprocessor.enabled" is 'false' + this setting has no effect. + + + + hbase.coprocessor.region.classes + + A comma-separated list of Coprocessors that are loaded by + default on all tables. For any override coprocessor method, these classes + will be called in order. After implementing your own Coprocessor, just put + it in HBase's classpath and add the fully qualified class name here. + A coprocessor can also be loaded on demand by setting HTableDescriptor. + + + hbase.coprocessor.master.classes + + A comma-separated list of + org.apache.hadoop.hbase.coprocessor.MasterObserver coprocessors that are + loaded by default on the active HMaster process. For any implemented + coprocessor methods, the listed classes will be called in order. After + implementing your own MasterObserver, just put it in HBase's classpath + and add the fully qualified class name here. + + + hbase.coprocessor.abortonerror + true + Set to true to cause the hosting server (master or regionserver) + to abort if a coprocessor fails to load, fails to initialize, or throws an + unexpected Throwable object. Setting this to false will allow the server to + continue execution but the system wide state of the coprocessor in question + will become inconsistent as it will be properly executing in only a subset + of servers, so this is most useful for debugging only. + + + hbase.rest.port + 8080 + The port for the HBase REST server. + + + hbase.rest.readonly + false + Defines the mode the REST server will be started in. Possible values are: + false: All HTTP methods are permitted - GET/PUT/POST/DELETE. + true: Only the GET method is permitted. + + + hbase.rest.threads.max + 100 + The maximum number of threads of the REST server thread pool. + Threads in the pool are reused to process REST requests. This + controls the maximum number of requests processed concurrently. + It may help to control the memory used by the REST server to + avoid OOM issues. If the thread pool is full, incoming requests + will be queued up and wait for some free threads. + + + hbase.rest.threads.min + 2 + The minimum number of threads of the REST server thread pool. + The thread pool always has at least these number of threads so + the REST server is ready to serve incoming requests. + + + hbase.rest.support.proxyuser + false + Enables running the REST server to support proxy-user mode. + + + hbase.defaults.for.version + 2.2.3 + This defaults file was compiled for version ${project.version}. This variable is used + to make sure that a user doesn't have an old version of hbase-default.xml on the + classpath. + + + hbase.defaults.for.version.skip + false + Set to true to skip the 'hbase.defaults.for.version' check. + Setting this to true can be useful in contexts other than + the other side of a maven generation; i.e. running in an + IDE. You'll want to set this boolean to true to avoid + seeing the RuntimeException complaint: "hbase-default.xml file + seems to be for and old version of HBase (\${hbase.version}), this + version is X.X.X-SNAPSHOT" + + + hbase.table.lock.enable + true + Set to true to enable locking the table in zookeeper for schema change operations. + Table locking from master prevents concurrent schema modifications to corrupt table + state. + + + hbase.table.max.rowsize + 1073741824 + + Maximum size of single row in bytes (default is 1 Gb) for Get'ting + or Scan'ning without in-row scan flag set. If row size exceeds this limit + RowTooBigException is thrown to client. + + + + hbase.thrift.minWorkerThreads + 16 + The "core size" of the thread pool. New threads are created on every + connection until this many threads are created. + + + hbase.thrift.maxWorkerThreads + 1000 + The maximum size of the thread pool. When the pending request queue + overflows, new threads are created until their number reaches this number. + After that, the server starts dropping connections. + + + hbase.thrift.maxQueuedRequests + 1000 + The maximum number of pending Thrift connections waiting in the queue. If + there are no idle threads in the pool, the server queues requests. Only + when the queue overflows, new threads are added, up to + hbase.thrift.maxQueuedRequests threads. + + + hbase.regionserver.thrift.framed + false + Use Thrift TFramedTransport on the server side. + This is the recommended transport for thrift servers and requires a similar setting + on the client side. Changing this to false will select the default transport, + vulnerable to DoS when malformed requests are issued due to THRIFT-601. + + + + hbase.regionserver.thrift.framed.max_frame_size_in_mb + 2 + Default frame size when using framed transport, in MB + + + hbase.regionserver.thrift.compact + false + Use Thrift TCompactProtocol binary serialization protocol. + + + hbase.rootdir.perms + 700 + FS Permissions for the root data subdirectory in a secure (kerberos) setup. + When master starts, it creates the rootdir with this permissions or sets the permissions + if it does not match. + + + hbase.wal.dir.perms + 700 + FS Permissions for the root WAL directory in a secure(kerberos) setup. + When master starts, it creates the WAL dir with this permissions or sets the permissions + if it does not match. + + + hbase.data.umask.enable + false + Enable, if true, that file permissions should be assigned + to the files written by the regionserver + + + hbase.data.umask + 000 + File permissions that should be used to write data + files when hbase.data.umask.enable is true + + + hbase.snapshot.enabled + true + Set to true to allow snapshots to be taken / restored / cloned. + + + hbase.snapshot.restore.take.failsafe.snapshot + true + Set to true to take a snapshot before the restore operation. + The snapshot taken will be used in case of failure, to restore the previous state. + At the end of the restore operation this snapshot will be deleted + + + hbase.snapshot.restore.failsafe.name + hbase-failsafe-{snapshot.name}-{restore.timestamp} + Name of the failsafe snapshot taken by the restore operation. + You can use the {snapshot.name}, {table.name} and {restore.timestamp} variables + to create a name based on what you are restoring. + + + hbase.snapshot.working.dir + + Location where the snapshotting process will occur. The location of the + completed snapshots will not change, but the temporary directory where the snapshot + process occurs will be set to this location. This can be a separate filesystem than + the root directory, for performance increase purposes. See HBASE-21098 for more + information + + + hbase.server.compactchecker.interval.multiplier + 1000 + The number that determines how often we scan to see if compaction is necessary. + Normally, compactions are done after some events (such as memstore flush), but if + region didn't receive a lot of writes for some time, or due to different compaction + policies, it may be necessary to check it periodically. The interval between checks is + hbase.server.compactchecker.interval.multiplier multiplied by + hbase.server.thread.wakefrequency. + + + hbase.lease.recovery.timeout + 900000 + How long we wait on dfs lease recovery in total before giving up. + + + hbase.lease.recovery.dfs.timeout + 64000 + How long between dfs recover lease invocations. Should be larger than the sum of + the time it takes for the namenode to issue a block recovery command as part of + datanode; dfs.heartbeat.interval and the time it takes for the primary + datanode, performing block recovery to timeout on a dead datanode; usually + dfs.client.socket-timeout. See the end of HBASE-8389 for more. + + + hbase.column.max.version + 1 + New column family descriptors will use this value as the default number of versions + to keep. + + + dfs.client.read.shortcircuit + false + + If set to true, this configuration parameter enables short-circuit local + reads. + + + + dfs.domain.socket.path + none + + This is a path to a UNIX domain socket that will be used for + communication between the DataNode and local HDFS clients, if + dfs.client.read.shortcircuit is set to true. If the string "_PORT" is + present in this path, it will be replaced by the TCP port of the DataNode. + Be careful about permissions for the directory that hosts the shared + domain socket; dfsclient will complain if open to other users than the HBase user. + + + + hbase.dfs.client.read.shortcircuit.buffer.size + 131072 + If the DFSClient configuration + dfs.client.read.shortcircuit.buffer.size is unset, we will + use what is configured here as the short circuit read default + direct byte buffer size. DFSClient native default is 1MB; HBase + keeps its HDFS files open so number of file blocks * 1MB soon + starts to add up and threaten OOME because of a shortage of + direct memory. So, we set it down from the default. Make + it > the default hbase block size set in the HColumnDescriptor + which is usually 64k. + + + + hbase.regionserver.checksum.verify + true + + If set to true (the default), HBase verifies the checksums for hfile + blocks. HBase writes checksums inline with the data when it writes out + hfiles. HDFS (as of this writing) writes checksums to a separate file + than the data file necessitating extra seeks. Setting this flag saves + some on i/o. Checksum verification by HDFS will be internally disabled + on hfile streams when this flag is set. If the hbase-checksum verification + fails, we will switch back to using HDFS checksums (so do not disable HDFS + checksums! And besides this feature applies to hfiles only, not to WALs). + If this parameter is set to false, then hbase will not verify any checksums, + instead it will depend on checksum verification being done in the HDFS client. + + + + hbase.hstore.bytes.per.checksum + 16384 + + Number of bytes in a newly created checksum chunk for HBase-level + checksums in hfile blocks. + + + + hbase.hstore.checksum.algorithm + CRC32C + + Name of an algorithm that is used to compute checksums. Possible values + are NULL, CRC32, CRC32C. + + + + hbase.client.scanner.max.result.size + 2097152 + Maximum number of bytes returned when calling a scanner's next method. + Note that when a single row is larger than this limit the row is still returned completely. + The default value is 2MB, which is good for 1ge networks. + With faster and/or high latency networks this value should be increased. + + + + hbase.server.scanner.max.result.size + 104857600 + Maximum number of bytes returned when calling a scanner's next method. + Note that when a single row is larger than this limit the row is still returned completely. + The default value is 100MB. + This is a safety setting to protect the server from OOM situations. + + + + hbase.status.published + false + + This setting activates the publication by the master of the status of the region server. + When a region server dies and its recovery starts, the master will push this information + to the client application, to let them cut the connection immediately instead of waiting + for a timeout. + + + + hbase.status.multicast.address.ip + 226.1.1.3 + + Multicast address to use for the status publication by multicast. + + + + hbase.status.multicast.address.port + 16100 + + Multicast port to use for the status publication by multicast. + + + + hbase.dynamic.jars.dir + ${hbase.rootdir}/lib + + The directory from which the custom filter JARs can be loaded + dynamically by the region server without the need to restart. However, + an already loaded filter/co-processor class would not be un-loaded. See + HBASE-1936 for more details. + + Does not apply to coprocessors. + + + + hbase.security.authentication + simple + + Controls whether or not secure authentication is enabled for HBase. + Possible values are 'simple' (no authentication), and 'kerberos'. + + + + hbase.master.loadbalance.bytable + false + Factor Table name when the balancer runs. + Default: false. + + + + hbase.rest.csrf.enabled + false + + Set to true to enable protection against cross-site request forgery (CSRF) + + + + hbase.rest-csrf.browser-useragents-regex + ^Mozilla.*,^Opera.* + + A comma-separated list of regular expressions used to match against an HTTP + request's User-Agent header when protection against cross-site request + forgery (CSRF) is enabled for REST server by setting + hbase.rest.csrf.enabled to true. If the incoming User-Agent matches + any of these regular expressions, then the request is considered to be sent + by a browser, and therefore CSRF prevention is enforced. If the request's + User-Agent does not match any of these regular expressions, then the request + is considered to be sent by something other than a browser, such as scripted + automation. In this case, CSRF is not a potential attack vector, so + the prevention is not enforced. This helps achieve backwards-compatibility + with existing automation that has not been updated to send the CSRF + prevention header. + + + + hbase.security.exec.permission.checks + false + + If this setting is enabled and ACL based access control is active (the + AccessController coprocessor is installed either as a system coprocessor + or on a table as a table coprocessor) then you must grant all relevant + users EXEC privilege if they require the ability to execute coprocessor + endpoint calls. EXEC privilege, like any other permission, can be + granted globally to a user, or to a user on a per table or per namespace + basis. For more information on coprocessor endpoints, see the coprocessor + section of the HBase online manual. For more information on granting or + revoking permissions using the AccessController, see the security + section of the HBase online manual. + + + + hbase.procedure.regionserver.classes + + A comma-separated list of + org.apache.hadoop.hbase.procedure.RegionServerProcedureManager procedure managers that are + loaded by default on the active HRegionServer process. The lifecycle methods (init/start/stop) + will be called by the active HRegionServer process to perform the specific globally barriered + procedure. After implementing your own RegionServerProcedureManager, just put it in + HBase's classpath and add the fully qualified class name here. + + + + hbase.procedure.master.classes + + A comma-separated list of + org.apache.hadoop.hbase.procedure.MasterProcedureManager procedure managers that are + loaded by default on the active HMaster process. A procedure is identified by its signature and + users can use the signature and an instant name to trigger an execution of a globally barriered + procedure. After implementing your own MasterProcedureManager, just put it in HBase's classpath + and add the fully qualified class name here. + + + hbase.regionserver.storefile.refresh.period + 0 + + The period (in milliseconds) for refreshing the store files for the secondary regions. 0 + means this feature is disabled. Secondary regions sees new files (from flushes and + compactions) from primary once the secondary region refreshes the list of files in the + region (there is no notification mechanism). But too frequent refreshes might cause + extra Namenode pressure. If the files cannot be refreshed for longer than HFile TTL + (hbase.master.hfilecleaner.ttl) the requests are rejected. Configuring HFile TTL to a larger + value is also recommended with this setting. + + + + hbase.region.replica.replication.enabled + false + + Whether asynchronous WAL replication to the secondary region replicas is enabled or not. + If this is enabled, a replication peer named "region_replica_replication" will be created + which will tail the logs and replicate the mutations to region replicas for tables that + have region replication > 1. If this is enabled once, disabling this replication also + requires disabling the replication peer using shell or Admin java class. + Replication to secondary region replicas works over standard inter-cluster replication. + + + + hbase.security.visibility.mutations.checkauths + false + + This property if enabled, will check whether the labels in the visibility + expression are associated with the user issuing the mutation + + + + hbase.http.max.threads + 16 + + The maximum number of threads that the HTTP Server will create in its + ThreadPool. + + + + hbase.replication.source.maxthreads + 10 + + The maximum number of threads any replication source will use for + shipping edits to the sinks in parallel. This also limits the number of + chunks each replication batch is broken into. Larger values can improve + the replication throughput between the master and slave clusters. The + default of 10 will rarely need to be changed. + + + + + hbase.http.staticuser.user + dr.stack + + The user name to filter as, on static web filters + while rendering content. An example use is the HDFS + web UI (user to be used for browsing files). + + + + hbase.regionserver.handler.abort.on.error.percent + 0.5 + The percent of region server RPC threads failed to abort RS. + -1 Disable aborting; 0 Abort if even a single handler has died; + 0.x Abort only when this percent of handlers have died; + 1 Abort only all of the handers have died. + + + + hbase.mob.file.cache.size + 1000 + + Number of opened file handlers to cache. + A larger value will benefit reads by providing more file handlers per mob + file cache and would reduce frequent file opening and closing. + However, if this is set too high, this could lead to a "too many opened file handlers" + The default value is 1000. + + + + hbase.mob.cache.evict.period + 3600 + + The amount of time in seconds before the mob cache evicts cached mob files. + The default value is 3600 seconds. + + + + hbase.mob.cache.evict.remain.ratio + 0.5f + + The ratio (between 0.0 and 1.0) of files that remains cached after an eviction + is triggered when the number of cached mob files exceeds the hbase.mob.file.cache.size. + The default value is 0.5f. + + + + hbase.master.mob.ttl.cleaner.period + 86400 + + The period that ExpiredMobFileCleanerChore runs. The unit is second. + The default value is one day. The MOB file name uses only the date part of + the file creation time in it. We use this time for deciding TTL expiry of + the files. So the removal of TTL expired files might be delayed. The max + delay might be 24 hrs. + + + + hbase.mob.compaction.mergeable.threshold + 1342177280 + + If the size of a mob file is less than this value, it's regarded as a small + file and needs to be merged in mob compaction. The default value is 1280MB. + + + + hbase.mob.delfile.max.count + 3 + + The max number of del files that is allowed in the mob compaction. + In the mob compaction, when the number of existing del files is larger than + this value, they are merged until number of del files is not larger this value. + The default value is 3. + + + + hbase.mob.compaction.batch.size + 100 + + The max number of the mob files that is allowed in a batch of the mob compaction. + The mob compaction merges the small mob files to bigger ones. If the number of the + small files is very large, it could lead to a "too many opened file handlers" in the merge. + And the merge has to be split into batches. This value limits the number of mob files + that are selected in a batch of the mob compaction. The default value is 100. + + + + hbase.mob.compaction.chore.period + 604800 + + The period that MobCompactionChore runs. The unit is second. + The default value is one week. + + + + hbase.mob.compaction.threads.max + 1 + + The max number of threads used in MobCompactor. + + + + hbase.snapshot.master.timeout.millis + 300000 + + Timeout for master for the snapshot procedure execution. + + + + hbase.snapshot.region.timeout + 300000 + + Timeout for regionservers to keep threads in snapshot request pool waiting. + + + + hbase.rpc.rows.warning.threshold + 5000 + + Number of rows in a batch operation above which a warning will be logged. + + + + hbase.master.wait.on.service.seconds + 30 + Default is 5 minutes. Make it 30 seconds for tests. See + HBASE-19794 for some context. + + + + + + + hbase.master.logcleaner.plugins + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner,org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner + A comma-separated list of BaseLogCleanerDelegate invoked by + the LogsCleaner service. These WAL cleaners are called in order, + so put the cleaner that prunes the most files in front. To + implement your own BaseLogCleanerDelegate, just put it in HBase's classpath + and add the fully qualified class name here. Always add the above + default log cleaners in the list. + + + hbase.master.hfilecleaner.plugins + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner + A comma-separated list of BaseHFileCleanerDelegate invoked by + the HFileCleaner service. These HFiles cleaners are called in order, + so put the cleaner that prunes the most files in front. To + implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath + and add the fully qualified class name here. Always add the above + default log cleaners in the list as they will be overwritten in + hbase-site.xml. + + + hbase.regionserver.hlog.reader.impl + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader + The WAL file reader implementation. + + + hbase.regionserver.hlog.writer.impl + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter + The WAL file writer implementation. + + + hbase.regionserver.region.split.policy + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.regionserver.SteppingSplitPolicy + + A split policy determines when a region should be split. The various + other split policies that are available currently are BusyRegionSplitPolicy, + ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy, + DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy, and + SteppingSplitPolicy. DisabledRegionSplitPolicy blocks manual region splitting. + + + + hbase.status.publisher.class + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.master.ClusterStatusPublisher$MulticastPublisher + + Implementation of the status publication with a multicast message. + + + + hbase.status.listener.class + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ClusterStatusListener$MulticastListener + + Implementation of the status listener with a multicast message. + + + + hbase.rest.filter.classes + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.rest.filter.GzipFilter + + Servlet filters for REST service. + + + + hbase.master.loadbalancer.class + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer + + Class used to execute the regions balancing when the period occurs. + See the class comment for more on how it works + http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.html + It replaces the DefaultLoadBalancer as the default (since renamed + as the SimpleLoadBalancer). + + + + hbase.coordinated.state.manager.class + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager + Fully qualified name of class implementing coordinated state manager. + + + hbase.http.filter.initializers + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.http.lib.StaticUserWebFilter + + A comma separated list of class names. Each class in the list must extend + org.apache.hadoop.hbase.http.FilterInitializer. The corresponding Filter will + be initialized. Then, the Filter will be applied to all user facing jsp + and servlet web pages. + The ordering of the list defines the ordering of the filters. + The default StaticUserWebFilter add a user principal as defined by the + hbase.http.staticuser.user property. + + + + hbase.replication.rpc.codec + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.codec.KeyValueCodecWithTags + + The codec that is to be used when replication is enabled so that + the tags are also replicated. This is used along with HFileV3 which + supports tags in them. If tags are not used or if the hfile version used + is HFileV2 then KeyValueCodec can be used as the replication codec. Note that + using KeyValueCodecWithTags for replication when there are no tags causes no harm. + + + + hbase.master.normalizer.class + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer + + Class used to execute the region normalization when the period occurs. + See the class comment for more on how it works + http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.html + + + + hbase.mob.compactor.class + org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor + + Implementation of mob compactor, the default one is PartitionedMobCompactor. + + + \ No newline at end of file diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml index 74c17313ff0..47d2df5dab6 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml @@ -38,6 +38,7 @@ mysql-cdc iceberg pulsar + hbase diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 56dad549cbe..b35243f2178 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -808,6 +808,13 @@ Source : flink-connector-pulsar 4.0-SNAPSHOT (Please note that the software have been modified.) License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE +1.3.19 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java + + Source : flink-connector-hbase-2.2 1.15.4 (Please note that the software have been modified.) + License : https://github.com/apache/flink/blob/master/LICENSE + ======================================================================= Apache InLong Subcomponents: From c30242aa1168b13a91dfc9b63a974f92992f916d Mon Sep 17 00:00:00 2001 From: fancycoderzf Date: Thu, 12 Oct 2023 05:00:49 +0800 Subject: [PATCH 2/6] [INLONG-9009][Sort] Delete e2e Test file and update pom.xml --- .../sort/tests/HBaseSourceSinkTest.java | 141 ------------------ .../sort-connectors/hbase/pom.xml | 12 -- 2 files changed, 153 deletions(-) delete mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/HBaseSourceSinkTest.java diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/HBaseSourceSinkTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/HBaseSourceSinkTest.java deleted file mode 100644 index 503cba79cb9..00000000000 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/HBaseSourceSinkTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.tests; - -import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; -import org.apache.inlong.sort.tests.utils.HBaseContainer; -import org.apache.inlong.sort.tests.utils.TestUtils; - -import org.apache.flink.util.FileUtils; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.output.Slf4jLogConsumer; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Duration; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -/** - * @author zfancy - * @version 1.0 - */ -public class HBaseSourceSinkTest extends FlinkContainerTestEnv { - - private static final Logger LOG = LoggerFactory.getLogger(HBaseSourceSinkTest.class); - - private static final Path HADOOP_CP = TestUtils.getResource(".*hadoop.classpath"); - private static final Path hbaseJar = TestUtils.getResource("sort-connector-hbase.jar"); - // private static final Path hbaseJar = TestUtils.getResource("sort-connector-hbase-v1.15.jar"); - // Can't use getResource("xxx").getPath(), windows will don't know that path - private static final String sqlFile; - - private List hadoopCpJars; - - static { - try { - sqlFile = Paths.get(HBaseSourceSinkTest.class.getResource("/flinkSql/hbase_e2e.sql").toURI()).toString(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - - @ClassRule - public static final HBaseContainer HBASE_SOURCE_CONTAINER = (HBaseContainer) new HBaseContainer( - "2.2.3") - .withNetwork(NETWORK) - .withNetworkAliases("hbase1") - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - @ClassRule - public static final HBaseContainer HBASE_SINK_CONTAINER = (HBaseContainer) new HBaseContainer( - "2.2.3") - .withNetwork(NETWORK) - .withNetworkAliases("hbase2") - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - @Before - public void setup() throws IOException { - File hadoopClasspathFile = new File(HADOOP_CP.toAbsolutePath().toString()); - if (!hadoopClasspathFile.exists()) { - throw new FileNotFoundException( - "File that contains hadoop classpath " + HADOOP_CP + " does not exist."); - } - String classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile); - hadoopCpJars = - Arrays.stream(classPathContent.split(":")) - .map(Paths::get) - .collect(Collectors.toList()); - - // HBASE_SOURCE_CONTAINER.start(); - // HBASE_SINK_CONTAINER.start(); - waitUntilJobRunning(Duration.ofSeconds(30)); - initializeHBaseTable(); - } - - @AfterClass - public static void teardown() { - if (HBASE_SOURCE_CONTAINER != null) { - HBASE_SOURCE_CONTAINER.stop(); - } - if (HBASE_SINK_CONTAINER != null) { - HBASE_SINK_CONTAINER.stop(); - } - } - - private void initializeHBaseTable() { - try { - // HBASE_SOURCE_CONTAINER.start(); - // HBASE_SINK_CONTAINER.start(); - HBASE_SOURCE_CONTAINER.createTable("sourceTable", "family1", "family2"); - // HBASE_SINK_CONTAINER.createTable("sinkTable", "family1", "family2"); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Test - public void testHBaseSourceAndSink() throws Exception { - //still have some confusing problem - HBASE_SOURCE_CONTAINER.putData("sourceTable", "row1", "family1", "f1c1", "v1"); - HBASE_SOURCE_CONTAINER.putData("sourceTable", "row1", "family2", "f2c1", "v2"); - HBASE_SOURCE_CONTAINER.putData("sourceTable", "row1", "family2", "f2c2", "v3"); - HBASE_SOURCE_CONTAINER.putData("sourceTable", "row2", "family1", "f1c1", "v4"); - HBASE_SOURCE_CONTAINER.putData("sourceTable", "row2", "family2", "f2c1", "v5"); - HBASE_SOURCE_CONTAINER.putData("sourceTable", "row2", "family2", "f2c2", "v6"); - - Path[] jarPaths = new Path[hadoopCpJars.size() + 1]; - jarPaths[0] = hbaseJar; - for (int i = 0; i < hadoopCpJars.size(); i++) { - jarPaths[i + 1] = hadoopCpJars.get(i); - } - submitSQLJob(sqlFile, jarPaths); - waitUntilJobRunning(Duration.ofSeconds(10)); - } - -} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml index b5a5c785515..f521058337f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml @@ -78,18 +78,6 @@ 1.10.0-SNAPSHOT test - - - - - - - - - - - - From 5e70baba47729cf95aaad3c684abeac6c84bd9fd Mon Sep 17 00:00:00 2001 From: fancycoderzf Date: Thu, 12 Oct 2023 20:48:29 +0800 Subject: [PATCH 3/6] [INLONG-9009][Sort] Update two pom.xml --- .../sort-end-to-end-tests-v1.15/pom.xml | 8 -------- .../sort-connectors/hbase/pom.xml | 20 +++++++++---------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml index b833167d0f1..6c76be65954 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml @@ -214,14 +214,6 @@ jar ${project.build.directory}/dependencies - - org.apache.inlong - sort-connector-hbase-v1.15 - ${project.version} - sort-connector-hbase.jar - jar - ${project.build.directory}/dependencies - diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml index f521058337f..25ffa9637f5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/pom.xml @@ -69,13 +69,13 @@ org.apache.inlong sort-format-common - 1.10.0-SNAPSHOT + ${project.version} test org.apache.inlong sort-core - 1.10.0-SNAPSHOT + ${project.version} test @@ -161,35 +161,35 @@ org.apache.zookeeper - org.apache.flink.hbase.shaded.org.apache.zookeeper + org.apache.inlong.sort.hbase.shaded.org.apache.zookeeper org.apache.htrace - org.apache.flink.hbase.shaded.org.apache.htrace + org.apache.inlong.sort.hbase.shaded.org.apache.htrace com.google - org.apache.flink.hbase.shaded.com.google + org.apache.inlong.sort.hbase.shaded.com.google com.yammer.metrics - org.apache.flink.hbase.shaded.com.yammer.metrics + org.apache.inlong.sort.hbase.shaded.com.yammer.metrics org.apache.commons - org.apache.flink.hbase.shaded.org.apache.commons + org.apache.inlong.sort.hbase.shaded.org.apache.commons org.apache.jute - org.apache.flink.hbase.shaded.org.apache.jute + org.apache.inlong.sort.hbase.shaded.org.apache.jute io.netty - org.apache.flink.hbase.shaded.io.netty + org.apache.inlong.sort.hbase.shaded.io.netty org.apache.hadoop.hbase - org.apache.flink.hbase.shaded.org.apache.hadoop.hbase + org.apache.inlong.sort.hbase.shaded.org.apache.hadoop.hbase From 4f3fbf7ca1b3703bedfb7c9cffda7fd762c6851e Mon Sep 17 00:00:00 2001 From: fancycoderzf Date: Mon, 16 Oct 2023 11:26:41 +0800 Subject: [PATCH 4/6] [INLONG-9009][Sort] Remove useless codes --- .../org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java index eccb93b066a..a9b4e0f790f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java @@ -77,7 +77,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { final ReadableConfig tableOptions = helper.getOptions(); DataType tableSchema = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); - // Map options = context.getCatalogTable().getOptions(); validatePrimaryKey(tableSchema, new int[]{0}); @@ -99,7 +98,6 @@ public DynamicTableSink createDynamicTableSink(Context context) { final ReadableConfig tableOptions = helper.getOptions(); DataType tableSchema = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); - // Map options = context.getCatalogTable().getOptions(); validatePrimaryKey(tableSchema, new int[]{0}); From cb06f9c779f904405c2379b650a44f287703ab67 Mon Sep 17 00:00:00 2001 From: fancycoderzf Date: Mon, 16 Oct 2023 13:10:26 +0800 Subject: [PATCH 5/6] [INLONG-9009][Sort] Remove unused e2e test files --- .../sort-end-to-end-tests-v1.15/pom.xml | 11 -- .../sort/tests/utils/HBaseContainer.java | 109 ------------------ .../src/test/resources/flinkSql/hbase_e2e.sql | 33 ------ 3 files changed, 153 deletions(-) delete mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java delete mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/hbase_e2e.sql diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml index 6c76be65954..895ea52675f 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml @@ -138,17 +138,6 @@ clickhouse-jdbc test - - junit - junit - ${junit.version} - test - - - org.apache.hadoop - hadoop-common - test - diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java deleted file mode 100644 index 4b7e164add2..00000000000 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.tests.utils; - -import com.github.dockerjava.api.command.InspectContainerResponse; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.images.builder.ImageFromDockerfile; - -import java.util.Arrays; -import java.util.stream.Collectors; - -/** Standalone containerized HBase instance that builds the image on the fly. */ -@SuppressWarnings("rawtypes") -public class HBaseContainer extends GenericContainer { - - private static final String HBASE_BIN = "/opt/hbase/bin"; - private static final int MAX_RETRIES = 3; - - public HBaseContainer(String hbaseVersion) { - super(getImageFromDockerfile(hbaseVersion)); - } - - private static ImageFromDockerfile getImageFromDockerfile(String hbaseVersion) { - return new ImageFromDockerfile() - .withDockerfileFromBuilder( - builder -> builder.from("adoptopenjdk/openjdk8") - .env("HBASE_VERSION", hbaseVersion) - .run( - "export INITRD=no" - + " && export HBASE_DIST=\"http://archive.apache.org/dist/hbase\"" - + " && apt-get update -y" - + " && apt-get install -y --no-install-recommends curl" - + " && cd /opt" - + " && curl -SL $HBASE_DIST/$HBASE_VERSION/hbase-$HBASE_VERSION-bin.tar.gz" - + " | tar -x -z && mv hbase-${HBASE_VERSION} hbase") - .expose(2181) - .cmd( - "/bin/sh", - "-c", - String.format( - "nohup %s/start-hbase.sh & sleep infinity", - HBASE_BIN))); - } - - @Override - protected void containerIsStarted(InspectContainerResponse containerInfo) { - ExecResult res = null; - for (int i = 0; i < MAX_RETRIES; i++) { - try { - res = execCmd("scan 'hbase:meta'"); - if (res.getStdout().contains("hbase:namespace")) { - return; - } - Thread.sleep(5000L); - } catch (Exception e) { - throw new RuntimeException("Failed to verify if container is started.", e); - } - } - throw new IllegalStateException("Failed to start HBase properly:\n" + res); - } - - public Container.ExecResult createTable(String table, String... colFamilies) throws Exception { - String createCmd = - String.format("create '%s',", table) - + Arrays.stream(colFamilies) - .map(cf -> String.format("{NAME=>'%s'}", cf)) - .collect(Collectors.joining(",")); - - return execCmd(createCmd); - } - - public Container.ExecResult putData( - String table, String rowKey, String colFamily, String colQualifier, String val) - throws Exception { - String putCmd = - String.format( - "put '%s','%s','%s:%s','%s'", table, rowKey, colFamily, colQualifier, val); - - return execCmd(putCmd); - } - - public Container.ExecResult scanTable(String table) throws Exception { - String scanCmd = String.format("scan '%s'", table); - - return execCmd(scanCmd); - } - - private Container.ExecResult execCmd(String cmd) throws Exception { - String hbaseShellCmd = String.format("echo \"%s\" | %s/hbase shell", cmd, HBASE_BIN); - - return execInContainer("sh", "-c", hbaseShellCmd); - } -} \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/hbase_e2e.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/hbase_e2e.sql deleted file mode 100644 index 5b67748d850..00000000000 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/hbase_e2e.sql +++ /dev/null @@ -1,33 +0,0 @@ -CREATE TABLE MyHBaseSource ( - rowkey STRING, - family1 ROW, - family2 ROW -) WITH ( - 'connector' = 'hbase-2.2-inlong', - 'table-name' = 'sourceTable', - 'zookeeper.quorum' = 'hbase1:2181' -); - -CREATE TABLE MyHBaseSink -( - rowkey STRING, - family1 ROW, - family2 ROW -) WITH ( - 'connector' = 'hbase-2.2-inlong', - 'table-name' = 'sinkTable', - 'zookeeper.quorum' = 'hbase2:2181', - 'sink.buffer-flush.max-rows' = '1', - 'sink.buffer-flush.interval' = '2s' - ); - -INSERT INTO MyHBaseSink -SELECT rowkey, - ROW(a), - ROW(b, c) -FROM ( - SELECT rowkey, - REGEXP_REPLACE(family1.f1c1, 'v', 'value') as a, - family2.f2c1 as b, - family2.f2c2 as c - FROM MyHBaseSource) source; \ No newline at end of file From 7592d51be778128dbdbc9683706965083f3ed899 Mon Sep 17 00:00:00 2001 From: fancycoderzf Date: Mon, 16 Oct 2023 13:14:10 +0800 Subject: [PATCH 6/6] [INLONG-9009][Sort] Undo 'pom.xml' changes --- .../sort-end-to-end-tests-v1.15/pom.xml | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml index 895ea52675f..37244cf14f4 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml @@ -213,17 +213,6 @@ pre-integration-test - - store-classpath-in-target-for-tests - - build-classpath - - package - - ${project.build.directory}/hadoop.classpath - org.apache.flink - -