diff --git a/com.ibm.streamsx.hbase/.classpath b/com.ibm.streamsx.hbase/.classpath index cf35b74..ed582e6 100644 --- a/com.ibm.streamsx.hbase/.classpath +++ b/com.ibm.streamsx.hbase/.classpath @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <classpath> - <classpathentry kind="src" output="opt/java/bin" path="opt/java/src"/> + <classpathentry kind="src" output="impl/java/bin" path="impl/java/src"/> <classpathentry exported="true" kind="con" path="com.ibm.streams.java/com.ibm.streams.operator"/> <classpathentry exported="true" kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry kind="lib" path="opt/downloaded/commons-cli-1.2.jar"/> @@ -28,5 +28,5 @@ <classpathentry kind="lib" path="opt/downloaded/slf4j-api-1.7.10.jar"/> <classpathentry kind="lib" path="opt/downloaded/slf4j-log4j12-1.7.10.jar"/> <classpathentry kind="lib" path="opt/downloaded/zookeeper-3.4.6.jar"/> - <classpathentry kind="output" path="opt/java/bin"/> + <classpathentry kind="output" path="bin"/> </classpath> diff --git a/com.ibm.streamsx.hbase/.gitignore b/com.ibm.streamsx.hbase/.gitignore index 16be8f2..2423b00 100644 --- a/com.ibm.streamsx.hbase/.gitignore +++ b/com.ibm.streamsx.hbase/.gitignore @@ -1 +1,2 @@ /output/ +/bin/ diff --git a/com.ibm.streamsx.hbase/.project b/com.ibm.streamsx.hbase/.project index 03c5553..c82cd2c 100644 --- a/com.ibm.streamsx.hbase/.project +++ b/com.ibm.streamsx.hbase/.project @@ -25,5 +25,7 @@ <nature>com.ibm.streams.studio.splproject.SPLProjectNature</nature> <nature>org.eclipse.xtext.ui.shared.xtextNature</nature> <nature>org.eclipse.jdt.core.javanature</nature> + <nature>com.ibm.etools.systems.projects.core.remoteunixnature</nature> </natures> </projectDescription> + diff --git a/com.ibm.streamsx.hbase/.settings/org.eclipse.jdt.core.prefs b/com.ibm.streamsx.hbase/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..3a21537 --- /dev/null +++ b/com.ibm.streamsx.hbase/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,11 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 +org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve +org.eclipse.jdt.core.compiler.compliance=1.8 +org.eclipse.jdt.core.compiler.debug.lineNumber=generate +org.eclipse.jdt.core.compiler.debug.localVariable=generate +org.eclipse.jdt.core.compiler.debug.sourceFile=generate +org.eclipse.jdt.core.compiler.problem.assertIdentifier=error +org.eclipse.jdt.core.compiler.problem.enumIdentifier=error +org.eclipse.jdt.core.compiler.source=1.8 diff --git a/com.ibm.streamsx.hbase/impl/java/.gitignore b/com.ibm.streamsx.hbase/impl/java/.gitignore index 167fb2c..4052fca 100644 --- a/com.ibm.streamsx.hbase/impl/java/.gitignore +++ b/com.ibm.streamsx.hbase/impl/java/.gitignore @@ -1,2 +1,2 @@ /bin -/bin + diff --git a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEDelete.java b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEDelete.java index 1024b83..8d44c6a 100644 --- a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEDelete.java +++ b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEDelete.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Set; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Table; import org.apache.log4j.Logger; @@ -29,11 +30,11 @@ import com.ibm.streams.operator.model.PrimitiveOperator; import com.ibm.streams.operator.state.ConsistentRegionContext; -/** +/** * Accepts tuples on input stream and makes the corresponding delete in the * HBASE table. . * <P> - */ + */ @PrimitiveOperator(name = "HBASEDelete", namespace = "com.ibm.streamsx.hbase", description = "The `HBASEDelete` operator deletes an entry, an entire row, a columnFamily in a row, or a columnFamily, columnQualifier pair in a row from an HBase table. It can also optionally do a checkAndDelete operation." + HBASEOperator.DOC_BLANKLINE + "The behavior of the operator depends on its parameters:" @@ -179,53 +180,65 @@ public synchronized void initialize(OperatorContext context) @Override public void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception { - Table myTable = getHTable(); - byte row[] = getRow(tuple); - Delete myDelete = new Delete(row); - if (DeleteMode.COLUMN_FAMILY == deleteMode) { - byte colF[] = getColumnFamily(tuple); - myDelete.addFamily(colF); - } else if (DeleteMode.COLUMN == deleteMode) { - byte colF[] = getColumnFamily(tuple); - byte colQ[] = getColumnQualifier(tuple); - if (deleteAll) { - myDelete.addColumns(colF, colQ); - } else { - myDelete.addColumn(colF, colQ); - } + Table myTable = null; + + + try { + myTable = getHTable(tuple); + } catch (TableNotFoundException e) { + e.printStackTrace(); + logger.error(e.getMessage()); } - boolean success = false; - if (checkAttr != null) { - Tuple checkTuple = tuple.getTuple(checkAttrIndex); - // the check row and the row have to match, so don't use the - // checkRow. - byte checkRow[] = getRow(tuple); - byte checkColF[] = getBytes(checkTuple, checkColFIndex, - checkColFType); - byte checkColQ[] = getBytes(checkTuple, checkColQIndex, - checkColQType); - byte checkValue[] = getCheckValue(checkTuple); - success = myTable.checkAndDelete(checkRow, checkColF, checkColQ, - checkValue, myDelete); - } else if (batchSize == 0) { - logger.debug(Messages.getString("HBASE_DEL_DELETING", myDelete)); - myTable.delete(myDelete); - } else { - synchronized (listLock) { - deleteList.add(myDelete); - if (deleteList.size() >= batchSize) { - myTable.delete(deleteList); - deleteList.clear(); + if ( myTable != null ){ + byte row[] = getRow(tuple); + Delete myDelete = new Delete(row); + + if (DeleteMode.COLUMN_FAMILY == deleteMode) { + byte colF[] = getColumnFamily(tuple); + myDelete.addFamily(colF); + } else if (DeleteMode.COLUMN == deleteMode) { + byte colF[] = getColumnFamily(tuple); + byte colQ[] = getColumnQualifier(tuple); + if (deleteAll) { + myDelete.addColumns(colF, colQ); + } else { + myDelete.addColumn(colF, colQ); } } + + boolean success = false; + if (checkAttr != null) { + Tuple checkTuple = tuple.getTuple(checkAttrIndex); + // the check row and the row have to match, so don't use the + // checkRow. + byte checkRow[] = getRow(tuple); + byte checkColF[] = getBytes(checkTuple, checkColFIndex, + checkColFType); + byte checkColQ[] = getBytes(checkTuple, checkColQIndex, + checkColQType); + byte checkValue[] = getCheckValue(checkTuple); + success = myTable.checkAndDelete(checkRow, checkColF, checkColQ, + checkValue, myDelete); + } else if (batchSize == 0) { + logger.debug(Messages.getString("HBASE_DEL_DELETING", myDelete)); + myTable.delete(myDelete); + } else { + synchronized (listLock) { + deleteList.add(myDelete); + if (deleteList.size() >= batchSize) { + myTable.delete(deleteList); + deleteList.clear(); + } + } + } + + // Checks to see if an output tuple is necessary, and if so, + // submits it. + submitOutputTuple(tuple, success); + myTable.close(); } - - // Checks to see if an output tuple is necessary, and if so, - // submits it. - submitOutputTuple(tuple, success); - myTable.close(); } /** diff --git a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEGet.java b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEGet.java index e4a3ff2..29d1ba1 100644 --- a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEGet.java +++ b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEGet.java @@ -12,6 +12,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.log4j.Logger; import com.ibm.streams.operator.Attribute; import com.ibm.streams.operator.OperatorContext; @@ -317,7 +318,18 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple) myGet.addFamily(colF); } } - Table myTable = getHTable(); + + Table myTable = null; + + try { + myTable = getHTable(tuple); + } catch (TableNotFoundException e) { + e.printStackTrace(); + logger.error(e.getMessage()); + } + + if ( myTable != null) { + Result r = myTable.get(myGet); int numResults = r.size(); @@ -350,6 +362,7 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple) // Submit new tuple to output port 0 outStream.submit(outTuple); myTable.close(); + } } } diff --git a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEIncrement.java b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEIncrement.java index bbd23fc..ccb1f41 100644 --- a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEIncrement.java +++ b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEIncrement.java @@ -3,6 +3,7 @@ package com.ibm.streamsx.hbase; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Table; import org.apache.log4j.Logger; @@ -127,9 +128,17 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple) throws incr = tuple.getLong(incrAttrIndex); } } - Table myTable = getHTable(); - long newValue = myTable.incrementColumnValue(row, colF, colQ, incr); - myTable.close(); + Table myTable = null; + try { + myTable = getHTable(tuple); + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + + if (myTable != null ){ + myTable.incrementColumnValue(row, colF, colQ, incr); + myTable.close(); + } } } diff --git a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEOperator.java b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEOperator.java index dc2ef5c..93526c8 100644 --- a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEOperator.java +++ b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEOperator.java @@ -10,19 +10,23 @@ import java.util.ArrayList; import java.util.List; +import org.apache.log4j.Logger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; + import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.log4j.Logger; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.ConnectionFactory; import com.ibm.streams.operator.AbstractOperator; import com.ibm.streams.operator.Attribute; +import com.ibm.streams.operator.TupleAttribute; import com.ibm.streams.operator.OperatorContext; import com.ibm.streams.operator.OperatorContext.ContextCheck; import com.ibm.streams.operator.StreamSchema; @@ -62,7 +66,7 @@ public abstract class HBASEOperator extends AbstractOperator { protected List<String> staticColumnQualifierList = null; public final static Charset RSTRING_CHAR_SET = Charset.forName("UTF-8"); protected Charset charset = RSTRING_CHAR_SET; - private String tableName = null; + public String tableName = null; protected byte tableNameBytes[] = null; private static String hbaseSite = null; private String fAuthPrincipal = null; @@ -70,6 +74,8 @@ public abstract class HBASEOperator extends AbstractOperator { protected Connection connection = null; static final String TABLE_PARAM_NAME = "tableName"; + public TupleAttribute<Tuple, String> tableNameAttribute; + static final String TABLE_NAME_ATTRIBUTE = "tableNameAttribute"; static final String ROW_PARAM_NAME = "rowAttrName"; static final String STATIC_COLF_NAME = "staticColumnFamily"; static final String STATIC_COLQ_NAME = "staticColumnQualifier"; @@ -81,6 +87,9 @@ public abstract class HBASEOperator extends AbstractOperator { static final String VALID_TYPE_STRING = "rstring, ustring, blob, or int64"; static final int BYTES_IN_LONG = Long.SIZE / Byte.SIZE; + + org.apache.log4j.Logger logger = Logger.getLogger(this.getClass()); + @Parameter(name = HBASE_SITE_PARAM_NAME, optional = true, description = "The **hbaseSite** parameter specifies the path of hbase-site.xml file. This is the recommended way to specify the HBASE configuration. If not specified, then `HBASE_HOME` must be set when the operator runs, and it will use `$HBASE_SITE/conf/hbase-site.xml`") public void setHbaseSite(String name) { hbaseSite = name; @@ -91,10 +100,16 @@ public void getCharset(String _name) { charset = Charset.forName(_name); } - @Parameter(name = TABLE_PARAM_NAME, optional = false, description = "Name of the HBASE table. If it does not exist, the operator will throw an exception on startup") + @Parameter(name = TABLE_PARAM_NAME, optional = true, description = "Name of the HBASE table. It is an optional parameter but one of these parameters must be set in opeartor: 'tableName' or 'tableNameAttribute'. Cannot be used with 'tableNameAttribute'. If the table does not exist, the operator will throw an exception") public void setTableName(String _name) { tableName = _name; } + + @Parameter(name = TABLE_NAME_ATTRIBUTE, optional = true, description = "Name of the attribute on the input tuple containing the tableName. Use this parameter to pass the table name to the operator via input port. Cannot be used with parameter 'tableName'. This is suitable for tables with the same schema.") + public void setTableNameAttr(TupleAttribute<Tuple, String> tableNameAttribute) throws IOException { + this.tableNameAttribute = tableNameAttribute; + } + @Parameter(name = STATIC_COLF_NAME, optional = true, description = "If this parameter is specified, it will be used as the columnFamily for all operations. (Compare to columnFamilyAttrName.) For HBASEScan, it can have cardinality greater than one.") public void setStaticColumnFamily(List<String> name) { @@ -141,7 +156,7 @@ protected static void checkConsistentRegionSource(OperatorContextChecker checker * * @param checker */ - @ContextCheck(compile = false) + @ContextCheck(compile = true) public static void runtimeHBaseOperatorChecks(OperatorContextChecker checker) { OperatorContext context = checker.getOperatorContext(); // The hbase site must either be specified by a parameter, or we must look it up relative to an environment variable. @@ -151,8 +166,22 @@ public static void runtimeHBaseOperatorChecks(OperatorContextChecker checker) { checker.setInvalidContext(Messages.getString("HBASE_OP_NO_HBASE_HOME", HBASE_SITE_PARAM_NAME), null); } } + + if ((!context.getParameterNames().contains(TABLE_PARAM_NAME)) + && (!context.getParameterNames().contains(TABLE_NAME_ATTRIBUTE))) { + checker.setInvalidContext("One of these parameters must be set in opeartor: '" + TABLE_PARAM_NAME + "' or '" + TABLE_NAME_ATTRIBUTE +"'", null); + } + } + + @ContextCheck(compile = true) + public static void checkTableName(OperatorContextChecker checker) { + // Cannot specify both tableNameAttribute and a tableName + checker.checkExcludedParameters(TABLE_NAME_ATTRIBUTE, TABLE_PARAM_NAME); + checker.checkExcludedParameters(TABLE_PARAM_NAME, TABLE_NAME_ATTRIBUTE); } + + /** * Helper function to check that an attribute is the right type and return the index if so. * @@ -271,8 +300,7 @@ protected int checkAndGetIndex(StreamSchema schema, String attrName, MetaType al public synchronized void initialize(OperatorContext context) throws Exception { // Must call super.initialize(context) to correctly setup an operator. super.initialize(context); - Logger.getLogger(this.getClass()).trace( - "Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId()); + logger.trace("Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId()); ArrayList<String>libList = new ArrayList<>(); String hadoopHome = System.getenv("HADOOP_HOME"); String hbaseHome = System.getenv("HBASE_HOME"); @@ -289,7 +317,7 @@ public synchronized void initialize(OperatorContext context) throws Exception { try { context.addClassLibraries(libList.toArray(new String[0])); } catch (Exception e) { - Logger.getLogger(this.getClass()).error(Messages.getString("HBASE_OP_NO_CLASSPATH")); + logger.error(Messages.getString("HBASE_OP_NO_CLASSPATH")); } @@ -307,22 +335,20 @@ public synchronized void initialize(OperatorContext context) throws Exception { fAuthKeytab = context.getPE().getApplicationDirectory().getAbsolutePath() + File.separator + fAuthKeytab; } - tableNameBytes = tableName.getBytes(charset); + if (tableName != null){ + tableNameBytes = tableName.getBytes(charset); + } getConnection(); } - - - protected void getConnection() throws IOException { - - System.out.println("hbaseSite:\t" + hbaseSite); + logger.info("hbaseSite:\t" + hbaseSite); Configuration conf = HBaseConfiguration.create(); conf.addResource(new Path(hbaseSite)); if ((fAuthPrincipal != null) && (fAuthKeytab != null)) { // kerberos authentication - System.out.println("fAuthKeytab:\t" + fAuthKeytab); - System.out.println("fAuthPrincipal:\t" + fAuthPrincipal); + logger.info("fAuthKeytab:\t" + fAuthKeytab); + logger.info("fAuthPrincipal:\t" + fAuthPrincipal); conf.set("hadoop.security.authentication", "kerberos"); conf.set("hbase.security.authentication", "kerberos"); UserGroupInformation.setConfiguration(conf); @@ -332,7 +358,7 @@ protected void getConnection() throws IOException { connection = ConnectionFactory.createConnection(HBaseConfiguration.create(conf)); Admin admin = connection.getAdmin(); if (admin.getConnection() == null) { - Logger.getLogger(this.getClass()).error("HBase connection failed"); + logger.error("HBase connection failed"); } /* @@ -354,12 +380,51 @@ protected void getConnection() throws IOException { * * @return HTable object. * @throws IOException + * */ - protected Table getHTable() throws IOException { - final TableName tableName = TableName.valueOf(tableNameBytes); - return connection.getTable(tableName); + + protected Table getHTable() throws TableNotFoundException, IOException { + if (tableName == null){ + return null; + } + + final TableName tableTableName = TableName.valueOf(tableNameBytes); + try (Admin admin = this.connection.getAdmin()) { + if (!admin.tableExists(tableTableName)) { + throw new TableNotFoundException("Table '" + tableTableName.getNameAsString() + + "' does not exists."); + } + } + return connection.getTable(tableTableName); + } + + + protected Table getHTable(Tuple tuple) throws TableNotFoundException, IOException { + String TableNameStr = null; + if (tableName != null) { + TableNameStr = tableName; + } else { + TableNameStr = tuple.getString(tableNameAttribute.getAttribute().getIndex()); + } + + if (TableNameStr == null) return null; + + if (TableNameStr.length() < 1 ) return null; + + byte TableNameBytes[] = TableNameStr.getBytes(charset); + final TableName tableTableName = TableName.valueOf(TableNameBytes); + + try (Admin admin = this.connection.getAdmin()) { + if (!admin.tableExists(tableTableName)) { + throw new TableNotFoundException("Table '" + tableTableName.getNameAsString() + + "' does not exists."); + } + } + + return connection.getTable(tableTableName); } + protected Table getHTable(String sTableName) throws IOException { byte TableNameBytes[] = sTableName.getBytes(charset); final TableName tableName = TableName.valueOf(TableNameBytes); @@ -386,7 +451,7 @@ protected TableName getTableName(String sTableName) throws IOException { @Override public synchronized void shutdown() throws Exception { OperatorContext context = getOperatorContext(); - Logger.getLogger(this.getClass()).trace( + logger.trace( "Operator " + context.getName() + " shutting down in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId()); if (connection != null && !connection.isClosed()) { connection.close(); diff --git a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPut.java b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPut.java index 460b69f..2db813f 100644 --- a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPut.java +++ b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPut.java @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; @@ -189,7 +190,7 @@ public static void checkForBatchSizeParam(OperatorContextChecker checker) { * When enableBuffer is true and we are disabling autoflush, * keep a pointer to the hbase table. This value will be null otherwise */ - private Table cachedTable; + private Table cachedTable = null; /** @@ -208,15 +209,20 @@ public synchronized void initialize(OperatorContext context) putList = new ArrayList<Put>(batchSize); } + StreamingInput<Tuple> inputPort = context.getStreamingInputs().get(0); +// Tuple tuple = inputPort. + StreamSchema schema = inputPort.getStreamSchema(); + Tuple tuple = schema.getTuple(); + if (bufferTransactions) { Logger.getLogger(this.getClass()).trace(Messages.getString("HBASE_PUT_DISABLING_FLUSH")); - cachedTable= getHTable(); + cachedTable= getHTable(tuple); // cachedTable = connection.getTable(tableNameBytes); // cachedTable.setAutoFlush(false, true); } - StreamingInput<Tuple> inputPort = context.getStreamingInputs().get(0); - StreamSchema schema = inputPort.getStreamSchema(); + + Attribute attr = schema.getAttribute(valueAttr); if (attr.getType().getMetaType() == MetaType.TUPLE) { @@ -260,66 +266,72 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple) byte colF[] = getColumnFamily(tuple); boolean success = false; Put myPut = new Put(row); -// HTableInterface table = null; - Table table = null; + Table myTable = null; if (!bufferTransactions) { -// table = connection.getTable(tableNameBytes); - table = getHTable(); - } - switch (putMode) { - - case ENTRY: - byte colQ[] = getColumnQualifier(tuple); - byte value[] = getBytes(tuple, valueAttrIndex, valueAttrType); - myPut.addColumn(colF, colQ, value); -//// myPut.add(colF, colQ, value); - break; - case RECORD: - Tuple values = tuple.getTuple(valueAttr); - for (int i = 0; i < qualifierArray.length; i++) { - myPut.addColumn(colF, qualifierArray[i], -// myPut.add(colF, qualifierArray[i], - getBytes(values, i, attrType[i])); - } - break; - default: - // It should be impossible to get here. - throw new Exception("Unsupported Put type"); + try { + myTable = getHTable(tuple); + } catch (TableNotFoundException e) { + e.printStackTrace(); + logger.error(e.getMessage()); + } } - if (checkAttr != null) { - Tuple checkTuple = tuple.getTuple(checkAttrIndex); - - // the row attribute and the check row attribute have to match, so - // don't even look - // in the check attribute for hte row. - byte checkRow[] = getRow(tuple); - byte checkColF[] = getBytes(checkTuple, checkColFIndex, - checkColFType); - byte checkColQ[] = getBytes(checkTuple, checkColQIndex, - checkColQType); - byte checkValue[] = getCheckValue(checkTuple); - - success = table.checkAndPut(checkRow, checkColF, checkColQ, - checkValue, myPut); - logger.debug(Messages.getString("HBASE_PUT_RESULT", success)); - } else if (!bufferTransactions && batchSize == 0) { - table.put(myPut); - } else if (bufferTransactions){ - safePut(myPut); - } else { - synchronized (listLock) { - putList.add(myPut); - if (putList.size() >= batchSize) { - logger.debug(Messages.getString("HBASE_PUT_SUBMITTING_BATCH")); - table.put(putList); - putList.clear(); + if (myTable != null) { + switch (putMode) { + + case ENTRY: + byte colQ[] = getColumnQualifier(tuple); + byte value[] = getBytes(tuple, valueAttrIndex, valueAttrType); + myPut.addColumn(colF, colQ, value); + //// myPut.add(colF, colQ, value); + break; + case RECORD: + Tuple values = tuple.getTuple(valueAttr); + for (int i = 0; i < qualifierArray.length; i++) { + myPut.addColumn(colF, qualifierArray[i], + // myPut.add(colF, qualifierArray[i], + getBytes(values, i, attrType[i])); } + break; + default: + // It should be impossible to get here. + throw new Exception("Unsupported Put type"); } - } - if (!bufferTransactions){ - table.close(); - } + + if (checkAttr != null) { + Tuple checkTuple = tuple.getTuple(checkAttrIndex); + + // the row attribute and the check row attribute have to match, so + // don't even look + // in the check attribute for hte row. + byte checkRow[] = getRow(tuple); + byte checkColF[] = getBytes(checkTuple, checkColFIndex, + checkColFType); + byte checkColQ[] = getBytes(checkTuple, checkColQIndex, + checkColQType); + byte checkValue[] = getCheckValue(checkTuple); + + success = myTable.checkAndPut(checkRow, checkColF, checkColQ, + checkValue, myPut); + logger.debug(Messages.getString("HBASE_PUT_RESULT", success)); + } else if (!bufferTransactions && batchSize == 0) { + myTable.put(myPut); + } else if (bufferTransactions){ + safePut(myPut); + } else { + synchronized (listLock) { + putList.add(myPut); + if (putList.size() >= batchSize) { + logger.debug(Messages.getString("HBASE_PUT_SUBMITTING_BATCH")); + myTable.put(putList); + putList.clear(); + } + } + } + if (!bufferTransactions){ + myTable.close(); + } + } // Checks to see if an output tuple is necessary, and if so, // submits it. submitOutputTuple(tuple, success); @@ -330,8 +342,10 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple) */ protected void safePut(Put p) throws IOException { synchronized (tableLock) { - cachedTable.put(p); - } + if (cachedTable != null) { + cachedTable.put(p); + } + } } @Override @@ -385,7 +399,6 @@ protected void flushBuffer() throws IOException { * Flush the internal list of puts we keep when using our own batch puts instead of autoflush */ protected synchronized void flushInternalBuffer() throws IOException { -// HTableInterface table = connection.getTable(tableNameBytes); Table table = getHTable(); synchronized (listLock) { if (table != null && putList != null && putList.size() > 0) { diff --git a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPutDelete.java b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPutDelete.java index 541f615..8cac45a 100644 --- a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPutDelete.java +++ b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPutDelete.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Set; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Table; import org.apache.log4j.Logger; @@ -48,6 +49,9 @@ public abstract class HBASEPutDelete extends HBASEOperatorWithInput implements protected MetaType checkColFType = null, checkColQType = null, checkValueType = null; + Logger logger = Logger.getLogger(this.getClass()); + + final protected Object listLock = new Object(); protected int batchSize = 0; @@ -160,13 +164,30 @@ public synchronized void initialize(OperatorContext context) // Must call super.initialize(context) to correctly setup an operator. super.initialize(context); - Table table = getHTable(); + Table myTable = null; + + StreamingInput<Tuple> inputPort = context.getStreamingInputs().get(0); +// Tuple tuple = inputPort. + StreamSchema schema = inputPort.getStreamSchema(); + Tuple tuple = schema.getTuple(); + + + try { + myTable = getHTable(tuple); + } catch (TableNotFoundException e) { + e.printStackTrace(); + logger.error(e.getMessage()); + } - if (null == table) { +/* + if (null == myTable) { Logger.getLogger(this.getClass()).error(Messages.getString("HBASE_PUT_DEL_NO_TABLE_ACCESS")); throw new Exception("Cannot access table. Check configuration"); } +*/ + if (myTable != null) { + StreamingInput<Tuple> input = context.getStreamingInputs().get(0); StreamSchema inputSchema = input.getStreamSchema(); if (checkAttr != null) { @@ -204,7 +225,8 @@ public synchronized void initialize(OperatorContext context) } successAttrIndex = attr.getIndex(); } - table.close(); + myTable.close(); + } context.registerStateHandler(this); } diff --git a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEScan.java b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEScan.java index 010727c..73bd5af 100644 --- a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEScan.java +++ b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEScan.java @@ -16,6 +16,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.RegionLocator; @@ -918,11 +919,20 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple) throw new Exception("Internal error. Unknown input mode " + inputMode); } -// HTableInterface myTable = connection.getTable(tableNameBytes); - Table myTable = getHTable(); - ResultScanner resultScanner = startScan(myTable, myScan); - submitResults(tuple, resultScanner, (long) -1); - myTable.close(); + + Table myTable = null; + try { + myTable = getHTable(tuple); + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + + if (myTable != null ){ + + ResultScanner resultScanner = startScan(myTable, myScan); + submitResults(tuple, resultScanner, (long) -1); + myTable.close(); + } out.punctuate(Punctuation.WINDOW_MARKER); } diff --git a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/messages/messages_en_US.properties b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/messages/messages_en_US.properties index d034cf1..0ba31ff 100644 --- a/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/messages/messages_en_US.properties +++ b/com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/messages/messages_en_US.properties @@ -12,7 +12,7 @@ HBASE_GET_INVALID_OUTPUT=CDIST2903E Wrong number of outputs; expected 1 found '' HBASE_GET_SET_TIME_RANGE=CDIST2904E Setting time range to ''{0}'' . HBASE_OP_NO_CONSISTENT_REGION=CDIST2905E ERROR: The following operator cannot be the start of a consistent region: ''{0}'' . -HBASE_OP_NO_HBASE_HOME=CDIST2906E ERROR: If ''{0}'' not specified, then HBASE_HOME must be set in runtime environment. +HBASE_OP_NO_HBASE_HOME=CDIST2906E ERROR: The hbase configuration file must either be specified by parameter ''{0}'' , or the HBASE_HOME environment variable must be set. HBASE_OP_INVALID_ATTR=CDIST2907E Attribute ''{0}'' has invalid type ''{1}'' . HBASE_OP_NO_CLASSPATH=CDIST2908E ERROR: adding libraries to classpath. diff --git a/com.ibm.streamsx.hbase/impl/lib/.gitignore b/com.ibm.streamsx.hbase/impl/lib/.gitignore new file mode 100644 index 0000000..5e56e04 --- /dev/null +++ b/com.ibm.streamsx.hbase/impl/lib/.gitignore @@ -0,0 +1 @@ +/bin diff --git a/com.ibm.streamsx.hbase/info.xml b/com.ibm.streamsx.hbase/info.xml index 880cf4f..4b0885e 100644 --- a/com.ibm.streamsx.hbase/info.xml +++ b/com.ibm.streamsx.hbase/info.xml @@ -112,6 +112,11 @@ Alternatively, you can fully qualify the operators that are provided by toolkit 7. Run the application. You can submit the application as a job by using the **streamtool submitjob** command or by using Streams Studio. ++ What's new +This is an overview of changes for major and minor version upgrades. For details see the Releases in public Github. + +++ What is new in version 3.1.0 + **Kerberos Authentication** Kerberos authentication is a network protocol to provide strong authentication for client/server applications. @@ -124,8 +129,6 @@ The **authPrincipal** parameter specifies the Kerberos principal, which is typic If not done already, enable Kerberos authentication on your Hadoop cluster using the following links to find out "how to enable the kerberos authentication". - - [https://hortonworks.com/blog/ambari-kerberos-support-hbase-1/] [https://www.cloudera.com/documentation/enterprise/latest/topics/cm_sg_s4_kerb_wizard.html] @@ -151,9 +154,15 @@ Here is an example to connect to the HBase server with Kerberos Authentication: valueAttrName : "bookData" ; } +++ What is new in version 3.2.0 + +* The HBASE operators provides a new parameter 'tableNameAttribute'. + Use this parameter to pass the table name to the operator via input port. Cannot be used with tableName + +* The parameters 'tableNme' and 'tableNameAttribute' are optional, but only one of them must be set to define the name of table. </info:description> - <info:version>3.1.0</info:version> + <info:version>3.2.0</info:version> <info:requiredProductVersion>4.0.0.0</info:requiredProductVersion> </info:identity> <info:dependencies/> diff --git a/com.ibm.streamsx.hbase/pom.xml b/com.ibm.streamsx.hbase/pom.xml index 988b448..0e7bcea 100644 --- a/com.ibm.streamsx.hbase/pom.xml +++ b/com.ibm.streamsx.hbase/pom.xml @@ -15,7 +15,7 @@ <groupId>com.ibm.streamsx.hbase</groupId> <artifactId>streamsx.hbase</artifactId> <packaging>jar</packaging> - <version>3.1.0</version> + <version>3.2.0</version> <name>com.ibm.streamsx.hbase</name> <repositories> <repository> diff --git a/extraSamples/DeleteSample/.gitignore b/extraSamples/DeleteSample/.gitignore new file mode 100644 index 0000000..16be8f2 --- /dev/null +++ b/extraSamples/DeleteSample/.gitignore @@ -0,0 +1 @@ +/output/ diff --git a/extraSamples/IncrementSample/.gitignore b/extraSamples/IncrementSample/.gitignore new file mode 100644 index 0000000..16be8f2 --- /dev/null +++ b/extraSamples/IncrementSample/.gitignore @@ -0,0 +1 @@ +/output/ diff --git a/extraSamples/ScanBooks/.gitignore b/extraSamples/ScanBooks/.gitignore new file mode 100644 index 0000000..16be8f2 --- /dev/null +++ b/extraSamples/ScanBooks/.gitignore @@ -0,0 +1 @@ +/output/ diff --git a/extraSamples/TableScan/.gitignore b/extraSamples/TableScan/.gitignore new file mode 100644 index 0000000..16be8f2 --- /dev/null +++ b/extraSamples/TableScan/.gitignore @@ -0,0 +1 @@ +/output/