Skip to content

Commit

Permalink
Merge pull request #98 from IBMStreams/develop
Browse files Browse the repository at this point in the history
HBASE merge develop to master #97
  • Loading branch information
anouri authored Dec 10, 2018
2 parents 8dead02 + 686fa1c commit 5f7796d
Show file tree
Hide file tree
Showing 20 changed files with 319 additions and 146 deletions.
4 changes: 2 additions & 2 deletions com.ibm.streamsx.hbase/.classpath
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="opt/java/bin" path="opt/java/src"/>
<classpathentry kind="src" output="impl/java/bin" path="impl/java/src"/>
<classpathentry exported="true" kind="con" path="com.ibm.streams.java/com.ibm.streams.operator"/>
<classpathentry exported="true" kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="opt/downloaded/commons-cli-1.2.jar"/>
Expand Down Expand Up @@ -28,5 +28,5 @@
<classpathentry kind="lib" path="opt/downloaded/slf4j-api-1.7.10.jar"/>
<classpathentry kind="lib" path="opt/downloaded/slf4j-log4j12-1.7.10.jar"/>
<classpathentry kind="lib" path="opt/downloaded/zookeeper-3.4.6.jar"/>
<classpathentry kind="output" path="opt/java/bin"/>
<classpathentry kind="output" path="bin"/>
</classpath>
1 change: 1 addition & 0 deletions com.ibm.streamsx.hbase/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/output/
/bin/
2 changes: 2 additions & 0 deletions com.ibm.streamsx.hbase/.project
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@
<nature>com.ibm.streams.studio.splproject.SPLProjectNature</nature>
<nature>org.eclipse.xtext.ui.shared.xtextNature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>com.ibm.etools.systems.projects.core.remoteunixnature</nature>
</natures>
</projectDescription>

11 changes: 11 additions & 0 deletions com.ibm.streamsx.hbase/.settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.source=1.8
2 changes: 1 addition & 1 deletion com.ibm.streamsx.hbase/impl/java/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
/bin
/bin

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.Set;

import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Table;
import org.apache.log4j.Logger;
Expand All @@ -29,11 +30,11 @@
import com.ibm.streams.operator.model.PrimitiveOperator;
import com.ibm.streams.operator.state.ConsistentRegionContext;

/**
/**
* Accepts tuples on input stream and makes the corresponding delete in the
* HBASE table. .
* <P>
*/
*/
@PrimitiveOperator(name = "HBASEDelete", namespace = "com.ibm.streamsx.hbase", description = "The `HBASEDelete` operator deletes an entry, an entire row, a columnFamily in a row, or a columnFamily, columnQualifier pair in a row from an HBase table. It can also optionally do a checkAndDelete operation."
+ HBASEOperator.DOC_BLANKLINE
+ "The behavior of the operator depends on its parameters:"
Expand Down Expand Up @@ -179,53 +180,65 @@ public synchronized void initialize(OperatorContext context)
@Override
public void process(StreamingInput<Tuple> stream, Tuple tuple)
throws Exception {
Table myTable = getHTable();
byte row[] = getRow(tuple);
Delete myDelete = new Delete(row);

if (DeleteMode.COLUMN_FAMILY == deleteMode) {
byte colF[] = getColumnFamily(tuple);
myDelete.addFamily(colF);
} else if (DeleteMode.COLUMN == deleteMode) {
byte colF[] = getColumnFamily(tuple);
byte colQ[] = getColumnQualifier(tuple);
if (deleteAll) {
myDelete.addColumns(colF, colQ);
} else {
myDelete.addColumn(colF, colQ);
}
Table myTable = null;


try {
myTable = getHTable(tuple);
} catch (TableNotFoundException e) {
e.printStackTrace();
logger.error(e.getMessage());
}

boolean success = false;
if (checkAttr != null) {
Tuple checkTuple = tuple.getTuple(checkAttrIndex);
// the check row and the row have to match, so don't use the
// checkRow.
byte checkRow[] = getRow(tuple);
byte checkColF[] = getBytes(checkTuple, checkColFIndex,
checkColFType);
byte checkColQ[] = getBytes(checkTuple, checkColQIndex,
checkColQType);
byte checkValue[] = getCheckValue(checkTuple);
success = myTable.checkAndDelete(checkRow, checkColF, checkColQ,
checkValue, myDelete);
} else if (batchSize == 0) {
logger.debug(Messages.getString("HBASE_DEL_DELETING", myDelete));
myTable.delete(myDelete);
} else {
synchronized (listLock) {
deleteList.add(myDelete);
if (deleteList.size() >= batchSize) {
myTable.delete(deleteList);
deleteList.clear();
if ( myTable != null ){
byte row[] = getRow(tuple);
Delete myDelete = new Delete(row);

if (DeleteMode.COLUMN_FAMILY == deleteMode) {
byte colF[] = getColumnFamily(tuple);
myDelete.addFamily(colF);
} else if (DeleteMode.COLUMN == deleteMode) {
byte colF[] = getColumnFamily(tuple);
byte colQ[] = getColumnQualifier(tuple);
if (deleteAll) {
myDelete.addColumns(colF, colQ);
} else {
myDelete.addColumn(colF, colQ);
}
}

boolean success = false;
if (checkAttr != null) {
Tuple checkTuple = tuple.getTuple(checkAttrIndex);
// the check row and the row have to match, so don't use the
// checkRow.
byte checkRow[] = getRow(tuple);
byte checkColF[] = getBytes(checkTuple, checkColFIndex,
checkColFType);
byte checkColQ[] = getBytes(checkTuple, checkColQIndex,
checkColQType);
byte checkValue[] = getCheckValue(checkTuple);
success = myTable.checkAndDelete(checkRow, checkColF, checkColQ,
checkValue, myDelete);
} else if (batchSize == 0) {
logger.debug(Messages.getString("HBASE_DEL_DELETING", myDelete));
myTable.delete(myDelete);
} else {
synchronized (listLock) {
deleteList.add(myDelete);
if (deleteList.size() >= batchSize) {
myTable.delete(deleteList);
deleteList.clear();
}
}
}

// Checks to see if an output tuple is necessary, and if so,
// submits it.
submitOutputTuple(tuple, success);
myTable.close();
}

// Checks to see if an output tuple is necessary, and if so,
// submits it.
submitOutputTuple(tuple, success);
myTable.close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.log4j.Logger;
import com.ibm.streams.operator.Attribute;
import com.ibm.streams.operator.OperatorContext;
Expand Down Expand Up @@ -317,7 +318,18 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple)
myGet.addFamily(colF);
}
}
Table myTable = getHTable();

Table myTable = null;

try {
myTable = getHTable(tuple);
} catch (TableNotFoundException e) {
e.printStackTrace();
logger.error(e.getMessage());
}

if ( myTable != null) {

Result r = myTable.get(myGet);

int numResults = r.size();
Expand Down Expand Up @@ -350,6 +362,7 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple)
// Submit new tuple to output port 0
outStream.submit(outTuple);
myTable.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.ibm.streamsx.hbase;

import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -127,9 +128,17 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple) throws
incr = tuple.getLong(incrAttrIndex);
}
}
Table myTable = getHTable();
long newValue = myTable.incrementColumnValue(row, colF, colQ, incr);
myTable.close();
Table myTable = null;
try {
myTable = getHTable(tuple);
} catch (TableNotFoundException e) {
e.printStackTrace();
}

if (myTable != null ){
myTable.incrementColumnValue(row, colF, colQ, incr);
myTable.close();
}
}

}
Loading

0 comments on commit 5f7796d

Please sign in to comment.