Skip to content

Commit

Permalink
Merge pull request #201 from IBMStreams/develop
Browse files Browse the repository at this point in the history
3.4.0
  • Loading branch information
markheger authored Apr 14, 2020
2 parents bd102e1 + 012a5e6 commit fff77cf
Show file tree
Hide file tree
Showing 16 changed files with 537 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<?xml version="1.0" ?>
<operatorModel
xmlns="http://www.ibm.com/xmlns/prod/streams/spl/operator"
xmlns:cmn="http://www.ibm.com/xmlns/prod/streams/spl/common"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.ibm.com/xmlns/prod/streams/spl/operator operatorModel.xsd">
<cppOperatorModel>
<context>
<description>
The RtpDecode Operator converts an input blob with 8 bit mylaw encoded PCM samples into uncompressed 16 bit samples.
The output stream attribute with the 16 bit samples must be of type `blob` and the 16 bit samples are stored in little
endian signed integer format.
</description>
<customOutputFunctions>
<customOutputFunction>
<name>DefaultFunctions</name>
<function>
<description>The default function for output attributes. By default, this function assigns the output attribute to the value of the input attribute with the same name.</description>
<prototype><![CDATA[<any T> T AsIs(T)]]></prototype>
</function>
<function>
<description>Return the number of converted PCM samples in the current tuple.</description>
<prototype><![CDATA[uint64 getNumberOfSamples()]]></prototype>
</function>
<function>
<description>Return the converted PCM payload (uncompressed 16 bit little endian integer samples).</description>
<prototype><![CDATA[blob getPcmPayload()]]></prototype>
</function>
</customOutputFunction>
</customOutputFunctions>
<providesSingleThreadedContext>Always</providesSingleThreadedContext>
</context>
<parameters>
<allowAny>false</allowAny>
<parameter>
<name>payload</name>
<description>The input stream attribute with the 8 bit mulaw encoded PCM samples to convert.</description>
<optional>false</optional>
<rewriteAllowed>false</rewriteAllowed>
<expressionMode>Attribute</expressionMode>
<type>blob</type>
<cardinality>1</cardinality>
</parameter>
</parameters>
<inputPorts>
<inputPortSet>
<description>
The input port with an blob attribute containing the PCM samples.
</description>
<tupleMutationAllowed>false</tupleMutationAllowed>
<windowingMode>NonWindowed</windowingMode>
<windowPunctuationInputMode>Oblivious</windowPunctuationInputMode>
<cardinality>1</cardinality>
<optional>false</optional>
</inputPortSet>
</inputPorts>
<outputPorts>
<outputPortSet>
<description>
The output port with an blob attribute containing the converted PCM samples. The other attributes are auto
assigned from input port to output port.
</description>
<expressionMode>Expression</expressionMode>
<autoAssignment>true</autoAssignment>
<completeAssignment>false</completeAssignment>
<rewriteAllowed>false</rewriteAllowed>
<outputFunctions>
<default>AsIs</default>
<type>DefaultFunctions</type>
</outputFunctions>
<windowPunctuationOutputMode>Preserving</windowPunctuationOutputMode>
<tupleMutationAllowed>true</tupleMutationAllowed>
<cardinality>1</cardinality>
<optional>false</optional>
</outputPortSet>
</outputPorts>
</cppOperatorModel>
</operatorModel>
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*******************************************************************************
* Copyright (C) 2020, International Business Machines Corporation
* All Rights Reserved
*******************************************************************************/
<%
my $payload = $model->getParameterByName("payload")->getValueAt(0)->getCppExpression();
%>

/* Additional includes go here */

<%SPL::CodeGen::implementationPrologue($model);%>

// Constructor
MY_OPERATOR::MY_OPERATOR() {
// Initialization code goes here
}

// Destructor
MY_OPERATOR::~MY_OPERATOR() {
// Finalization code goes here
}

int16_t MY_OPERATOR::MuLaw_Decode(int8_t number)
{
const uint16_t MULAW_BIAS = 33;
uint8_t sign = 0, position = 0;
int16_t decoded = 0;
number = ~number;
if (number & 0x80)
{
number &= ~(1 << 7);
sign = -1;
}
position = ((number & 0xF0) >> 4) + 5;
decoded = ((1 << position) | ((number & 0x0F) << (position - 4))
| (1 << (position - 5))) - MULAW_BIAS;
return (sign == 0) ? (decoded) : (-(decoded));
}

unsigned int MY_OPERATOR::getNumberOfSamples(unsigned int length) {
return length;
}

SPL::blob MY_OPERATOR::getPcmPayload(unsigned char * decompressed, unsigned int length) {
return blob(decompressed, length * sizeof(int16_t));
}

// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
<%
my $inputPort = $model->getInputPortAt(0);
my $inputTuple = $inputPort->getCppTupleName();
%>
IPort0Type const & <%=$inputTuple%> = static_cast<IPort0Type const&> (tuple);

SPL::blob const & inputblob = <%=$payload%>;
char * incomingPayload = (char *)inputblob.getData();
const uint64_t length = inputblob.getSize();
unsigned char * decompressed = (unsigned char *)malloc(length*2);

for ( int i = 0; i < length; i++ ) {
int8_t oneByte = (int8_t)incomingPayload[i];
int16_t twoBytes = MuLaw_Decode(oneByte);
*(int16_t *)(decompressed+(i*2)) = twoBytes;
}

// assign results to output and auto assign
OPort0Type otuple;
<%
my $oport = $model->getOutputPortAt(0);
foreach my $attribute (@{$oport->getAttributes()}) {
my $name = $attribute->getName();
my $operation = $attribute->getAssignmentOutputFunctionName();

if ($operation eq "AsIs") {
my $init = $attribute->getAssignmentOutputFunctionParameterValueAt(0)->getCppExpression();
%>
otuple.set_<%=$name%>(<%=$init%>);
<% } elsif ($operation eq "getPcmPayload") { %>
otuple.set_<%=$name%>(MY_OPERATOR::<%=$operation%>(decompressed, length));
<% } elsif ($operation eq "getNumberOfSamples") { %>
otuple.set_<%=$name%>(MY_OPERATOR::<%=$operation%>(length));
<% }
}
%>

submit(otuple, 0);
free(decompressed);
}

// Punctuation processing
void MY_OPERATOR::process(Punctuation const & punct, uint32_t port) {
if(punct==Punctuation::WindowMarker) {
submit(Punctuation::WindowMarker, 0);
}
}

<%SPL::CodeGen::implementationEpilogue($model);%>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*******************************************************************************
* Copyright (C) 2020, International Business Machines Corporation
* All Rights Reserved
*******************************************************************************/

<%SPL::CodeGen::headerPrologue($model);%>

class MY_OPERATOR : public MY_BASE_OPERATOR {
public:
// Constructor
MY_OPERATOR();

// Destructor
virtual ~MY_OPERATOR();

// Tuple processing for non-mutating ports
void process(Tuple const & tuple, uint32_t port);

// Punctuation processing
void process(Punctuation const & punct, uint32_t port);

unsigned int getNumberOfSamples(unsigned int length);
SPL::blob getPcmPayload(unsigned char * decompressed, unsigned int length);
private:
// Members
int16_t MuLaw_Decode(int8_t number);
int recNumber;
};

<%SPL::CodeGen::headerEpilogue($model);%>

6 changes: 5 additions & 1 deletion com.ibm.streamsx.network/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ Operators and functions for ingesting and parsing raw network data at the packet

This is an overview of changes for major and minor version upgrades.

++ What is new in version 3.4.0

* New operator com.ibm.streamsx.network.rtp::RtpDecode

++ What is changed in version 3.3.1

* Globalization support: Translated messages updated
Expand Down Expand Up @@ -36,7 +40,7 @@ This is an overview of changes for major and minor version upgrades.
* IPFilter operator: Changed traces with WARN trace level to TRACE level.

</info:description>
<info:version>3.3.1</info:version>
<info:version>3.4.0</info:version>
<info:requiredProductVersion>4.0.1.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down
Binary file not shown.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion samples/SampleNetworkToolkitData/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<info:identity>
<info:name>SampleNetworkToolkitData</info:name>
<info:description>Small packet capture (PCAP) recordings of network traffic used by the sample applications.</info:description>
<info:version>2.0.0</info:version>
<info:version>2.1.0</info:version>
<info:requiredProductVersion>4.0.1.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down
12 changes: 12 additions & 0 deletions samples/SampleRtpDecode/.classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="impl/java/bin" path="impl/java/src"/>
<classpathentry exported="true" kind="con" path="com.ibm.streams.java/com.ibm.streams.operator"/>
<classpathentry exported="true" kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="src" path=".apt_generated">
<attributes>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="impl/java/bin"/>
</classpath>
29 changes: 29 additions & 0 deletions samples/SampleRtpDecode/.project
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>SampleRtpDecode</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.xtext.ui.shared.xtextBuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>com.ibm.streams.studio.splproject.builder.SPLProjectBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.xtext.ui.shared.xtextNature</nature>
<nature>com.ibm.streams.studio.splproject.SPLProjectNature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
20 changes: 20 additions & 0 deletions samples/SampleRtpDecode/info.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<info:toolkitInfoModel xmlns:common="http://www.ibm.com/xmlns/prod/streams/spl/common"
xmlns:info="http://www.ibm.com/xmlns/prod/streams/spl/toolkitInfo">
<info:identity>
<info:name>SampleRtpDecode</info:name>
<info:description></info:description>
<info:version>1.0.0</info:version>
<info:requiredProductVersion>4.0.1.0</info:requiredProductVersion>
</info:identity>
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.network</common:name>
<common:version>3.4.0</common:version>
</info:toolkit>
<info:toolkit>
<common:name>SampleNetworkToolkitData</common:name>
<common:version>2.1.0</common:version>
</info:toolkit>
</info:dependencies>
</info:toolkitInfoModel>
Empty file.
51 changes: 51 additions & 0 deletions samples/SampleRtpDecode/sample/TestRtpDecodeBasic.spl
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
** Copyright (C) 2019, 2020 International Business Machines Corporation
** All Rights Reserved
*/

namespace sample;

use com.ibm.streamsx.network.rtp::*;
use spl.math::*;

composite TestRtpDecodeBasic {
param
expression<rstring> $filename: getSubmissionTimeValue("filename", "../../SampleNetworkToolkitData/data/sample_rtp_decode_basic_ulaw.gsm" );
graph
stream<rstring filename, blob mulawspeech> MuLawSamplesStream as O = FileSource() {
param
file: $filename;
format: block;
blockSize: 128u;
output O:
filename = $filename;
}

stream<rstring filename, blob mulawspeech, blob decoded, uint64 noSamples, rstring conversationId> OutStream as O = RtpDecode(MuLawSamplesStream) {
param
payload: mulawspeech;
output O:
conversationId = filename,
decoded = getPcmPayload(),
noSamples = getNumberOfSamples();
}

() as Sink1 = Custom(OutStream as I) {
logic
onTuple I: printStringLn("filename=" + I.filename + " noSamples=" + (rstring)noSamples);
onPunct I: println(currentPunct());
}
() as Sink2 = FileSink(OutStream) {
param
file: "debug.TestRtpDecodeBasic.OutStreamText.out";
format: txt;
flush: 1u;
suppress: mulawspeech, decoded;
}
() as Sink3 = FileSink(OutStream) {
param
file: "debug.TestRtpDecodeBasic.OutStreamSamples.out";
format: block;
suppress: filename, mulawspeech, noSamples, conversationId;
}
}
Loading

0 comments on commit fff77cf

Please sign in to comment.