Skip to content

Commit

Permalink
Merge pull request #100 from IBMStreams/develop
Browse files Browse the repository at this point in the history
merge develop to master #99
  • Loading branch information
anouri authored Jan 21, 2019
2 parents 5f7796d + afc2dcf commit 9f5fbb7
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public abstract class HBASEOperatorWithInput extends HBASEOperator {
protected int colQualifierIndex = -1;

protected MetaType rowAttrType = null;
protected int valueAttrIndex = -1;
protected MetaType valueAttrType = null;

protected MetaType colQualifierType = null, colFamilyType = null;

Expand Down Expand Up @@ -96,6 +98,12 @@ protected byte[] getColumnQualifier(Tuple tuple) throws Exception {
}
}

protected byte[] getValue(Tuple tuple) throws Exception {

return getBytes(tuple, valueAttrIndex, valueAttrType);
}


/**
* For {rowAttrName,columnFamilyAttrName,columnQualifierAttrName}, if
* specified, ensures the attribute exists, and stores the index in class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ private enum PutMode {
final static String VALUE_NAME = "valueAttrName";
protected byte[][] qualifierArray = null;
protected MetaType[] attrType = null;
private int valueAttrIndex = -1;
private MetaType valueAttrType = null;
protected boolean bufferTransactions = false;

protected Object tableLock = new Object();
Expand Down Expand Up @@ -283,39 +281,43 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
byte colQ[] = getColumnQualifier(tuple);
byte value[] = getBytes(tuple, valueAttrIndex, valueAttrType);
myPut.addColumn(colF, colQ, value);
//// myPut.add(colF, colQ, value);
break;
case RECORD:
Tuple values = tuple.getTuple(valueAttr);
for (int i = 0; i < qualifierArray.length; i++) {
myPut.addColumn(colF, qualifierArray[i],
// myPut.add(colF, qualifierArray[i],
getBytes(values, i, attrType[i]));
}
break;
default:
// It should be impossible to get here.
throw new Exception("Unsupported Put type");
}

if (checkAttr != null) {
Tuple checkTuple = tuple.getTuple(checkAttrIndex);

// the row attribute and the check row attribute have to match, so
// don't even look
// in the check attribute for hte row.

if (successAttrName != null) {
byte checkRow[] = getRow(tuple);
byte checkColF[] = getBytes(checkTuple, checkColFIndex,
checkColFType);
byte checkColQ[] = getBytes(checkTuple, checkColQIndex,
checkColQType);
byte checkValue[] = getCheckValue(checkTuple);

success = myTable.checkAndPut(checkRow, checkColF, checkColQ,
checkValue, myPut);
if (checkAttr != null) {
Tuple checkTuple = tuple.getTuple(checkAttrIndex);
// the row attribute and the check row attribute have to match, so
// don't even look
// in the check attribute for the row.
byte checkColF[] = getBytes(checkTuple, checkColFIndex, checkColFType);
byte checkColQ[] = getBytes(checkTuple, checkColQIndex, checkColQType);
byte checkValue[] = getCheckValue(checkTuple);

success = myTable.checkAndPut(checkRow, checkColF, checkColQ, checkValue, myPut);
}else{
// set the success value without checkTuple
byte checkColQ[] = getColumnQualifier(tuple);
byte checkColF[] = getColumnFamily(tuple);
byte checkValue[] = getValue(tuple);
success = myTable.checkAndPut(checkRow,checkColF, checkColQ, checkValue, myPut);
}
logger.debug(Messages.getString("HBASE_PUT_RESULT", success));

} else if (!bufferTransactions && batchSize == 0) {
myTable.put(myPut);

myTable.put(myPut);
} else if (bufferTransactions){
safePut(myPut);
} else {
Expand All @@ -331,7 +333,7 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
if (!bufferTransactions){
myTable.close();
}
}
}
// Checks to see if an output tuple is necessary, and if so,
// submits it.
submitOutputTuple(tuple, success);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public abstract class HBASEPutDelete extends HBASEOperatorWithInput implements
static final String CHECK_ATTR_PARAM = "checkAttrName";
protected int checkAttrIndex = -1;
static final String SUCCESS_PARAM = "successAttr";
private String successAttrName = null;
public String successAttrName = null;
private int successAttrIndex = -1;
StreamingOutput<OutputTuple> outStream = null;

Expand All @@ -85,7 +85,7 @@ public void setCheckAttr(String name) {
protected static void successRequiresOutput(OperatorContextChecker checker) {
OperatorContext context = checker.getOperatorContext();
Set<String> params = context.getParameterNames();
if (params.contains(SUCCESS_PARAM)) {
if (params.contains(CHECK_ATTR_PARAM)) {

if (context.getStreamingOutputs().size() == 0) {
checker.setInvalidContext(Messages.getString("HBASE_PUT_DEL_INVALID_OUT_PARAM", SUCCESS_PARAM ), null);
Expand All @@ -105,9 +105,9 @@ protected static void successRequiresOutput(OperatorContextChecker checker) {
*/
protected static void compileTimeChecks(OperatorContextChecker checker,
String operatorName) {
// If successAttr is set, then we must be using checkAttrParam
successRequiresOutput(checker);
checker.checkDependentParameters(SUCCESS_PARAM, CHECK_ATTR_PARAM);
// If checkAttrParam is set, then we must be using successAttr
checker.checkDependentParameters(CHECK_ATTR_PARAM, SUCCESS_PARAM);
checkConsistentRegionSource(checker, operatorName);
if (!checker.checkExcludedParameters(CHECK_ATTR_PARAM, BATCHSIZE_NAME)){
checker.setInvalidContext(Messages.getString("HBASE_PUT_DEL_INVALID_PARAM", CHECK_ATTR_PARAM, BATCHSIZE_NAME), null);
Expand Down Expand Up @@ -210,15 +210,16 @@ public synchronized void initialize(OperatorContext context)
}

if (successAttrName != null) {
if (checkAttrIndex < 0) {
// if (checkAttrIndex < 0) {
// TODO do context check the right way.
throw new Exception(SUCCESS_PARAM + " only valid if "
+ CHECK_ATTR_PARAM + " exists");
}
// throw new Exception(SUCCESS_PARAM + " only valid if "
// + CHECK_ATTR_PARAM + " exists");
// }
// TODO also check that success attribute is only used if there's an
// output port
StreamSchema outSchema = outStream.getStreamSchema();
Attribute attr = outSchema.getAttribute(successAttrName);

if (attr == null) {
throw new Exception(
"passed in success attribute, but no attribute found");
Expand Down
10 changes: 9 additions & 1 deletion com.ibm.streamsx.hbase/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,16 @@ Here is an example to connect to the HBase server with Kerberos Authentication:

* The parameters 'tableNme' and 'tableNameAttribute' are optional, but only one of them must be set to define the name of table.

++ What is new in version 3.3.0

* The HBABSPut parameter 'successAttr' is not depending to the parameter 'checkAttrName'
* It returns in output parameter a boolean results after a successfully insert data into table.




</info:description>
<info:version>3.2.0</info:version>
<info:version>3.3.0</info:version>
<info:requiredProductVersion>4.0.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down

0 comments on commit 9f5fbb7

Please sign in to comment.