Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rtp dev #200

Merged
merged 5 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<?xml version="1.0" ?>
<operatorModel
xmlns="http://www.ibm.com/xmlns/prod/streams/spl/operator"
xmlns:cmn="http://www.ibm.com/xmlns/prod/streams/spl/common"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.ibm.com/xmlns/prod/streams/spl/operator operatorModel.xsd">
<cppOperatorModel>
<context>
<description>
The RtpDecode Operator converts an input blob with 8 bit mylaw encoded PCM samples into uncompressed 16 bit samples.
The output stream attribute with the 16 bit samples must be of type `blob` and the 16 bit samples are stored in little
endian signed integer format.
</description>
<customOutputFunctions>
<customOutputFunction>
<name>DefaultFunctions</name>
<function>
<description>The default function for output attributes. By default, this function assigns the output attribute to the value of the input attribute with the same name.</description>
<prototype><![CDATA[<any T> T AsIs(T)]]></prototype>
</function>
<function>
<description>Return the number of converted PCM samples in the current tuple.</description>
<prototype><![CDATA[uint64 getNumberOfSamples()]]></prototype>
</function>
<function>
<description>Return the converted PCM payload (uncompressed 16 bit little endian integer samples).</description>
<prototype><![CDATA[blob getPcmPayload()]]></prototype>
</function>
</customOutputFunction>
</customOutputFunctions>
<providesSingleThreadedContext>Always</providesSingleThreadedContext>
</context>
<parameters>
<allowAny>false</allowAny>
<parameter>
<name>payload</name>
<description>The input stream attribute with the 8 bit mulaw encoded PCM samples to convert.</description>
<optional>false</optional>
<rewriteAllowed>false</rewriteAllowed>
<expressionMode>Attribute</expressionMode>
<type>blob</type>
<cardinality>1</cardinality>
</parameter>
</parameters>
<inputPorts>
<inputPortSet>
<description>
The input port with an blob attribute containing the PCM samples.
</description>
<tupleMutationAllowed>false</tupleMutationAllowed>
<windowingMode>NonWindowed</windowingMode>
<windowPunctuationInputMode>Oblivious</windowPunctuationInputMode>
<cardinality>1</cardinality>
<optional>false</optional>
</inputPortSet>
</inputPorts>
<outputPorts>
<outputPortSet>
<description>
The output port with an blob attribute containing the converted PCM samples. The other attributes are auto
assigned from input port to output port.
</description>
<expressionMode>Expression</expressionMode>
<autoAssignment>true</autoAssignment>
<completeAssignment>false</completeAssignment>
<rewriteAllowed>false</rewriteAllowed>
<outputFunctions>
<default>AsIs</default>
<type>DefaultFunctions</type>
</outputFunctions>
<windowPunctuationOutputMode>Preserving</windowPunctuationOutputMode>
<tupleMutationAllowed>true</tupleMutationAllowed>
<cardinality>1</cardinality>
<optional>false</optional>
</outputPortSet>
</outputPorts>
</cppOperatorModel>
</operatorModel>
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*******************************************************************************
* Copyright (C) 2020, International Business Machines Corporation
* All Rights Reserved
*******************************************************************************/
<%
my $payload = $model->getParameterByName("payload")->getValueAt(0)->getCppExpression();
%>

/* Additional includes go here */

<%SPL::CodeGen::implementationPrologue($model);%>

// Constructor
MY_OPERATOR::MY_OPERATOR() {
// Initialization code goes here
}

// Destructor
MY_OPERATOR::~MY_OPERATOR() {
// Finalization code goes here
}

int16_t MY_OPERATOR::MuLaw_Decode(int8_t number)
{
const uint16_t MULAW_BIAS = 33;
uint8_t sign = 0, position = 0;
int16_t decoded = 0;
number = ~number;
if (number & 0x80)
{
number &= ~(1 << 7);
sign = -1;
}
position = ((number & 0xF0) >> 4) + 5;
decoded = ((1 << position) | ((number & 0x0F) << (position - 4))
| (1 << (position - 5))) - MULAW_BIAS;
return (sign == 0) ? (decoded) : (-(decoded));
}

unsigned int MY_OPERATOR::getNumberOfSamples(unsigned int length) {
return length;
}

SPL::blob MY_OPERATOR::getPcmPayload(unsigned char * decompressed, unsigned int length) {
return blob(decompressed, length * sizeof(int16_t));
}

// Tuple processing for non-mutating ports
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
<%
my $inputPort = $model->getInputPortAt(0);
my $inputTuple = $inputPort->getCppTupleName();
%>
IPort0Type const & <%=$inputTuple%> = static_cast<IPort0Type const&> (tuple);

SPL::blob const & inputblob = <%=$payload%>;
char * incomingPayload = (char *)inputblob.getData();
const uint64_t length = inputblob.getSize();
unsigned char * decompressed = (unsigned char *)malloc(length*2);

for ( int i = 0; i < length; i++ ) {
int8_t oneByte = (int8_t)incomingPayload[i];
int16_t twoBytes = MuLaw_Decode(oneByte);
*(int16_t *)(decompressed+(i*2)) = twoBytes;
}

// assign results to output and auto assign
OPort0Type otuple;
<%
my $oport = $model->getOutputPortAt(0);
foreach my $attribute (@{$oport->getAttributes()}) {
my $name = $attribute->getName();
my $operation = $attribute->getAssignmentOutputFunctionName();

if ($operation eq "AsIs") {
my $init = $attribute->getAssignmentOutputFunctionParameterValueAt(0)->getCppExpression();
%>
otuple.set_<%=$name%>(<%=$init%>);
<% } elsif ($operation eq "getPcmPayload") { %>
otuple.set_<%=$name%>(MY_OPERATOR::<%=$operation%>(decompressed, length));
<% } elsif ($operation eq "getNumberOfSamples") { %>
otuple.set_<%=$name%>(MY_OPERATOR::<%=$operation%>(length));
<% }
}
%>

submit(otuple, 0);
free(decompressed);
}

// Punctuation processing
void MY_OPERATOR::process(Punctuation const & punct, uint32_t port) {
if(punct==Punctuation::WindowMarker) {
submit(Punctuation::WindowMarker, 0);
}
}

<%SPL::CodeGen::implementationEpilogue($model);%>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*******************************************************************************
* Copyright (C) 2020, International Business Machines Corporation
* All Rights Reserved
*******************************************************************************/

<%SPL::CodeGen::headerPrologue($model);%>

class MY_OPERATOR : public MY_BASE_OPERATOR {
public:
// Constructor
MY_OPERATOR();

// Destructor
virtual ~MY_OPERATOR();

// Tuple processing for non-mutating ports
void process(Tuple const & tuple, uint32_t port);

// Punctuation processing
void process(Punctuation const & punct, uint32_t port);

unsigned int getNumberOfSamples(unsigned int length);
SPL::blob getPcmPayload(unsigned char * decompressed, unsigned int length);
private:
// Members
int16_t MuLaw_Decode(int8_t number);
int recNumber;
};

<%SPL::CodeGen::headerEpilogue($model);%>

6 changes: 5 additions & 1 deletion com.ibm.streamsx.network/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ Operators and functions for ingesting and parsing raw network data at the packet

This is an overview of changes for major and minor version upgrades.

++ What is new in version 3.4.0

* New operator com.ibm.streamsx.network.rtp::RtpDecode

++ What is changed in version 3.3.1

* Globalization support: Translated messages updated
Expand Down Expand Up @@ -36,7 +40,7 @@ This is an overview of changes for major and minor version upgrades.
* IPFilter operator: Changed traces with WARN trace level to TRACE level.

</info:description>
<info:version>3.3.1</info:version>
<info:version>3.4.0</info:version>
<info:requiredProductVersion>4.0.1.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down
Binary file not shown.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion samples/SampleNetworkToolkitData/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<info:identity>
<info:name>SampleNetworkToolkitData</info:name>
<info:description>Small packet capture (PCAP) recordings of network traffic used by the sample applications.</info:description>
<info:version>2.0.0</info:version>
<info:version>2.1.0</info:version>
<info:requiredProductVersion>4.0.1.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down
12 changes: 12 additions & 0 deletions samples/SampleRtpDecode/.classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="impl/java/bin" path="impl/java/src"/>
<classpathentry exported="true" kind="con" path="com.ibm.streams.java/com.ibm.streams.operator"/>
<classpathentry exported="true" kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="src" path=".apt_generated">
<attributes>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="impl/java/bin"/>
</classpath>
29 changes: 29 additions & 0 deletions samples/SampleRtpDecode/.project
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>SampleRtpDecode</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.xtext.ui.shared.xtextBuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>com.ibm.streams.studio.splproject.builder.SPLProjectBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.xtext.ui.shared.xtextNature</nature>
<nature>com.ibm.streams.studio.splproject.SPLProjectNature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
20 changes: 20 additions & 0 deletions samples/SampleRtpDecode/info.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<info:toolkitInfoModel xmlns:common="http://www.ibm.com/xmlns/prod/streams/spl/common"
xmlns:info="http://www.ibm.com/xmlns/prod/streams/spl/toolkitInfo">
<info:identity>
<info:name>SampleRtpDecode</info:name>
<info:description></info:description>
<info:version>1.0.0</info:version>
<info:requiredProductVersion>4.0.1.0</info:requiredProductVersion>
</info:identity>
<info:dependencies>
<info:toolkit>
<common:name>com.ibm.streamsx.network</common:name>
<common:version>3.4.0</common:version>
</info:toolkit>
<info:toolkit>
<common:name>SampleNetworkToolkitData</common:name>
<common:version>2.1.0</common:version>
</info:toolkit>
</info:dependencies>
</info:toolkitInfoModel>
Empty file.
51 changes: 51 additions & 0 deletions samples/SampleRtpDecode/sample/TestRtpDecodeBasic.spl
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
** Copyright (C) 2019, 2020 International Business Machines Corporation
** All Rights Reserved
*/

namespace sample;

use com.ibm.streamsx.network.rtp::*;
use spl.math::*;

composite TestRtpDecodeBasic {
param
expression<rstring> $filename: getSubmissionTimeValue("filename", "../../SampleNetworkToolkitData/data/sample_rtp_decode_basic_ulaw.gsm" );
graph
stream<rstring filename, blob mulawspeech> MuLawSamplesStream as O = FileSource() {
param
file: $filename;
format: block;
blockSize: 128u;
output O:
filename = $filename;
}

stream<rstring filename, blob mulawspeech, blob decoded, uint64 noSamples, rstring conversationId> OutStream as O = RtpDecode(MuLawSamplesStream) {
param
payload: mulawspeech;
output O:
conversationId = filename,
decoded = getPcmPayload(),
noSamples = getNumberOfSamples();
}

() as Sink1 = Custom(OutStream as I) {
logic
onTuple I: printStringLn("filename=" + I.filename + " noSamples=" + (rstring)noSamples);
onPunct I: println(currentPunct());
}
() as Sink2 = FileSink(OutStream) {
param
file: "debug.TestRtpDecodeBasic.OutStreamText.out";
format: txt;
flush: 1u;
suppress: mulawspeech, decoded;
}
() as Sink3 = FileSink(OutStream) {
param
file: "debug.TestRtpDecodeBasic.OutStreamSamples.out";
format: block;
suppress: filename, mulawspeech, noSamples, conversationId;
}
}
Loading