Skip to content

Commit

Permalink
New operator com.ibm.streamsx.network.rtp::RtpDecode
Browse files Browse the repository at this point in the history
  • Loading branch information
joergboe committed Apr 9, 2020
1 parent c9cc110 commit f713578
Show file tree
Hide file tree
Showing 22 changed files with 426 additions and 829 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,75 @@
xmlns:cmn="http://www.ibm.com/xmlns/prod/streams/spl/common"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.ibm.com/xmlns/prod/streams/spl/operator operatorModel.xsd">
<cppOperatorModel>
<cppOperatorModel>
<context>
<customOutputFunctions>
<customOutputFunction>
<name>DefaultFunctions</name>
<function>
<description>The default function for output attributes. By default, this function assigns the output attribute to the value of the input attribute with the same name.</description>
<prototype><![CDATA[<any T> T AsIs(T)]]></prototype>
</function>
<function>
<description>Return number of PCM samples.</description>
<prototype><![CDATA[uint32 getNumberOfSamples()]]></prototype>
</function>
<function>
<description>Return PCM payload.</description>
<prototype><![CDATA[blob getPcmPayload()]]></prototype>
</function>
</customOutputFunction>
<description>
The RtpDecode Operator converts an input blob with 8 bit mylaw encoded PCM samples into uncompressed 16 bit samples.
The output stream attribute with the 16 bit samples must be of type `blob` and the 16 bit samples are stored in little
endian signed integer format.
</description>
<customOutputFunctions>
<customOutputFunction>
<name>DefaultFunctions</name>
<function>
<description>The default function for output attributes. By default, this function assigns the output attribute to the value of the input attribute with the same name.</description>
<prototype><![CDATA[<any T> T AsIs(T)]]></prototype>
</function>
<function>
<description>Return the number of converted PCM samples in the current tuple.</description>
<prototype><![CDATA[uint64 getNumberOfSamples()]]></prototype>
</function>
<function>
<description>Return the converted PCM payload (uncompressed 16 bit little endian integer samples).</description>
<prototype><![CDATA[blob getPcmPayload()]]></prototype>
</function>
</customOutputFunction>
</customOutputFunctions>
<providesSingleThreadedContext>Never</providesSingleThreadedContext>
</context>
<providesSingleThreadedContext>Always</providesSingleThreadedContext>
</context>
<parameters>
<allowAny>false</allowAny>
<!-- some optional elements
<parameter>
<name>paramName</name>
<description>brief description of the parameter</description>
<optional>true</optional>
<rewriteAllowed>true</rewriteAllowed>
<expressionMode>AttributeFree</expressionMode>
<type>rstring</type>
<name>payload</name>
<description>The input stream attribute with the 8 bit mulaw encoded PCM samples to convert.</description>
<optional>false</optional>
<rewriteAllowed>false</rewriteAllowed>
<expressionMode>Attribute</expressionMode>
<type>blob</type>
<cardinality>1</cardinality>
</parameter>
-->
</parameters>
<inputPorts>
<!-- some optional elements
<inputPortSet>
<description>
The input port with an blob attribute containing the PCM samples.
</description>
<tupleMutationAllowed>false</tupleMutationAllowed>
<windowingMode>NonWindowed</windowingMode>
<windowPunctuationInputMode>Oblivious</windowPunctuationInputMode>
<cardinality>2</cardinality>
<cardinality>1</cardinality>
<optional>false</optional>
</inputPortSet>
-->
<inputPortOpenSet>
<tupleMutationAllowed>true</tupleMutationAllowed>
<windowingMode>OptionallyWindowed</windowingMode>
<windowPunctuationInputMode>WindowBound</windowPunctuationInputMode>
</inputPortOpenSet>
</inputPortSet>
</inputPorts>
<outputPorts>
<outputPortSet>
<description>
The output port with an blob attribute containing the converted PCM samples. The other attributes are auto
assigned from input port to output port.
</description>
<expressionMode>Expression</expressionMode>
<autoAssignment>true</autoAssignment>
<completeAssignment>true</completeAssignment>
<rewriteAllowed>true</rewriteAllowed>
<completeAssignment>false</completeAssignment>
<rewriteAllowed>false</rewriteAllowed>
<outputFunctions>
<default>AsIs</default>
<type>DefaultFunctions</type>
</outputFunctions>
<windowPunctuationOutputMode>Free</windowPunctuationOutputMode>
<default>AsIs</default>
<type>DefaultFunctions</type>
</outputFunctions>
<windowPunctuationOutputMode>Preserving</windowPunctuationOutputMode>
<tupleMutationAllowed>true</tupleMutationAllowed>
<cardinality>1</cardinality>
<optional>false</optional>
</outputPortSet>
<outputPortOpenSet>
<expressionMode>Expression</expressionMode>
<autoAssignment>false</autoAssignment>
<completeAssignment>false</completeAssignment>
<rewriteAllowed>true</rewriteAllowed>
<windowPunctuationOutputMode>Generating</windowPunctuationOutputMode>
<tupleMutationAllowed>true</tupleMutationAllowed>
</outputPortOpenSet>
</outputPorts>
</cppOperatorModel>
</operatorModel>
Original file line number Diff line number Diff line change
@@ -1,51 +1,25 @@
/*******************************************************************************
* Copyright (C) 2016, International Business Machines Corporation
* Copyright (C) 2020, International Business Machines Corporation
* All Rights Reserved
*******************************************************************************/
<%
my $payload = $model->getParameterByName("payload")->getValueAt(0)->getCppExpression();
%>

/* Additional includes go here */

<%SPL::CodeGen::implementationPrologue($model);%>

// Constructor
MY_OPERATOR::MY_OPERATOR()
{
MY_OPERATOR::MY_OPERATOR() {
// Initialization code goes here
}

// Destructor
MY_OPERATOR::~MY_OPERATOR()
{
MY_OPERATOR::~MY_OPERATOR() {
// Finalization code goes here
}

// Notify port readiness
void MY_OPERATOR::allPortsReady()
{
// Notifies that all ports are ready. No tuples should be submitted before
// this. Source operators can use this method to spawn threads.

/*
createThreads(1); // Create source thread
*/
}

// Notify pending shutdown
void MY_OPERATOR::prepareToShutdown()
{
// This is an asynchronous call
}

// Processing for source and threaded operators
void MY_OPERATOR::process(uint32_t idx)
{
// A typical implementation will loop until shutdown
/*
while(!getPE().getShutdownRequested()) {
// do work ...
}
*/
}

int16_t MY_OPERATOR::MuLaw_Decode(int8_t number)
{
const uint16_t MULAW_BIAS = 33;
Expand All @@ -63,84 +37,62 @@ int16_t MY_OPERATOR::MuLaw_Decode(int8_t number)
return (sign == 0) ? (decoded) : (-(decoded));
}

unsigned int MY_OPERATOR::getNumberOfSamples(unsigned int length){
unsigned int MY_OPERATOR::getNumberOfSamples(unsigned int length) {
return length;
}

SPL::blob MY_OPERATOR::getPcmPayload(unsigned char * decompressed, unsigned int length){
SPL::blob MY_OPERATOR::getPcmPayload(unsigned char * decompressed, unsigned int length) {
return blob(decompressed, length * sizeof(int16_t));
}

// Tuple processing for mutating ports
void MY_OPERATOR::process(Tuple & tuple, uint32_t port)
{
// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
<%
my $inputPort = $model->getInputPortAt(0);
my $inputTuple = $inputPort->getCppTupleName();
%>
IPort0Type const & <%=$inputTuple%> = static_cast<IPort0Type const&> (tuple);
IPort0Type const & <%=$inputTuple%> = static_cast<IPort0Type const&> (tuple);

char * incomingPayload = (char *)<%=$inputTuple%>.get_payload().getData();
unsigned char * decompressed = (unsigned char *)malloc(5000);
int length = <%=$inputTuple%>.get_payloadLength();
for ( int i = 0; i < length; i++ )
{
int8_t oneByte = (int8_t)incomingPayload[i];
int16_t twoBytes = MuLaw_Decode(oneByte);
*(int16_t *)(decompressed+(i*2)) = twoBytes;
SPL::blob const & inputblob = <%=$payload%>;
char * incomingPayload = (char *)inputblob.getData();
const uint64_t length = inputblob.getSize();
unsigned char * decompressed = (unsigned char *)malloc(length*2);

for ( int i = 0; i < length; i++ ) {
int8_t oneByte = (int8_t)incomingPayload[i];
int16_t twoBytes = MuLaw_Decode(oneByte);
*(int16_t *)(decompressed+(i*2)) = twoBytes;
}
OPort0Type otuple;

// Forward Tuples we have chosen
<%
my $oport = $model->getOutputPortAt(0);
foreach my $attribute (@{$oport->getAttributes()}) {
my $name = $attribute->getName();
my $operation = $attribute->getAssignmentOutputFunctionName();

if ($operation eq "AsIs") {

my $init = $attribute->getAssignmentOutputFunctionParameterValueAt(0)->getCppExpression();%>
otuple.set_<%=$name%>(<%=$init%>);
<%} elsif ($operation eq "getPcmPayload") { %>
otuple.set_<%=$name%>(
MY_OPERATOR::<%=$operation%>(decompressed, length));
<%} elsif ($operation eq "getNumberOfSamples") { %>
otuple.set_<%=$name%>(
MY_OPERATOR::<%=$operation%>(length));
<%} else { %>
otuple.set_<%=$name%>(
MY_OPERATOR::<%=$operation%>());
<%}
}%>

submit(otuple, 0);
free(decompressed);

}
// assign results to output and auto assign
OPort0Type otuple;
<%
my $oport = $model->getOutputPortAt(0);
foreach my $attribute (@{$oport->getAttributes()}) {
my $name = $attribute->getName();
my $operation = $attribute->getAssignmentOutputFunctionName();

if ($operation eq "AsIs") {
my $init = $attribute->getAssignmentOutputFunctionParameterValueAt(0)->getCppExpression();
%>
otuple.set_<%=$name%>(<%=$init%>);
<% } elsif ($operation eq "getPcmPayload") { %>
otuple.set_<%=$name%>(MY_OPERATOR::<%=$operation%>(decompressed, length));
<% } elsif ($operation eq "getNumberOfSamples") { %>
otuple.set_<%=$name%>(MY_OPERATOR::<%=$operation%>(length));
<% }
}
%>

// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{
// Sample submit code
/*
OPort0Type otuple;
submit(otuple, 0); // submit to output port 0
*/
submit(otuple, 0);
free(decompressed);
}

// Punctuation processing
void MY_OPERATOR::process(Punctuation const & punct, uint32_t port)
{
/*
if(punct==Punctuation::WindowMarker) {
// ...;
} else if(punct==Punctuation::FinalMarker) {
// ...;
}
*/
void MY_OPERATOR::process(Punctuation const & punct, uint32_t port) {
if(punct==Punctuation::WindowMarker) {
submit(Punctuation::WindowMarker, 0);
}
}

<%SPL::CodeGen::implementationEpilogue($model);%>

Original file line number Diff line number Diff line change
@@ -1,31 +1,18 @@
/*******************************************************************************
* Copyright (C) 2016, International Business Machines Corporation
* Copyright (C) 2020, International Business Machines Corporation
* All Rights Reserved
*******************************************************************************/

<%SPL::CodeGen::headerPrologue($model);%>

class MY_OPERATOR : public MY_BASE_OPERATOR
{
class MY_OPERATOR : public MY_BASE_OPERATOR {
public:
// Constructor
MY_OPERATOR();

// Destructor
virtual ~MY_OPERATOR();
virtual ~MY_OPERATOR();

// Notify port readiness
void allPortsReady();

// Notify pending shutdown
void prepareToShutdown();

// Processing for source and threaded operators
void process(uint32_t idx);

// Tuple processing for mutating ports
void process(Tuple & tuple, uint32_t port);

// Tuple processing for non-mutating ports
void process(Tuple const & tuple, uint32_t port);

Expand All @@ -37,8 +24,8 @@ public:
private:
// Members
int16_t MuLaw_Decode(int8_t number);
int recNumber;
};
int recNumber;
};

<%SPL::CodeGen::headerEpilogue($model);%>

Loading

0 comments on commit f713578

Please sign in to comment.