Skip to content


Cleaning up docs and style
Browse files Browse the repository at this point in the history
  • Loading branch information
mdemoret-nv committed Jun 17, 2022
1 parent d663206 commit 811eb9d
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 115 deletions.
117 changes: 66 additions & 51 deletions docs/source/developer_guide/guides/
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ class PythonNode : ...

Both the `PythonSource` and `PythonNode` classes are defined in the `pysrf/node.hpp` header.

Note: `SourceT` and `SinkT` types are typically `shared_ptr`s to a Morpheus message type. For example, `std::shared_ptr<MessageMeta>`.
Note: `SourceT` and `SinkT` types are typically `shared_ptr`s to a Morpheus message type. For example, `std::shared_ptr<MessageMeta>`. This allows the reference counting mechanisms used in Python and C++ to share the same count, properly cleaning up the objects when they are no longer referenced.

Note: The C++ implementation of a stage must receive and emit the same message types as the Python implementation.

Note: The "Python" in the `PythonSource` & `PythonNode` class names refers to the fact that these classes contain Python interfaces, not the implementation language.
Note: The "Python" in the `PythonSource` & `PythonNode` class names refers to the fact that these classes read and write
objects registered with python, not the implementation language.

## A Simple Pass Through Stage

Expand All @@ -77,7 +78,7 @@ To start with, we have our Morpheus and SRF-specific includes:

#include <morpheus/messages/multi.hpp> // for MultiMessage
#include <srf/core/segment.hpp> //for Segment
#include <srf/segment/builder.hpp> //for Segment
#include <pysrf/node.hpp> // for PythonNode

Expand All @@ -94,34 +95,34 @@ class PassThruStage : public srf::pysrf::PythonNode<std::shared_ptr<MultiMessage
using base_t = srf::pysrf::PythonNode<std::shared_ptr<MultiMessage>, std::shared_ptr<MultiMessage>>;
using base_t::operator_fn_t;
using base_t::reader_type_t;
using base_t::writer_type_t;
using base_t::subscribe_fn_t;
using base_t::sink_type_t;
using base_t::source_type_t;

PassThruStage(const srf::Segment &seg, const std::string &name);

operator_fn_t build_operator();
subscribe_fn_t build_operator();

We explicitly set the visibility for the stage object in the namespace to default. This is due to a pybind11 requirement for module implementations to default symbol visibility to hidden (`-fvisibility=hidden`). More details about this can be found in the [pybind11 documentation](

For simplicity, we defined `base_t` as an alias for our base class type because the definition can be quite long. Our base class type also defines a few additional type aliases for us: `operator_fn_t`, `reader_type_t` and `writer_type_t`. The `reader_type_t` and `writer_type_t` aliases are shortcuts for specifying that we are a reader and writer of `std::shared_ptr<MultiMessage>`, respectively. `operator_fn_t` (read as "operator function type") is an alias for:
For simplicity, we defined `base_t` as an alias for our base class type because the definition can be quite long. Our base class type also defines a few additional type aliases for us: `subscribe_fn_t`, `sink_type_t` and `source_type_t`. The `sink_type_t` and `source_type_t` aliases are shortcuts for the sink and source types that this stage will be reading and writing. In this case both the `sink_type_t` and `source_type_t` resolve to `std::shared_ptr<MultiMessage>`. `subscribe_fn_t` (read as "subscribe function type") is an alias for:

std::function<Observable<R>(const Observable<T>& source)>
std::function<rxcpp::subscription(rxcpp::observable<T>, rxcpp::subscriber<R>)>

This means that a SRF operator function accepts an `Observable` of type `T` and returns an observable of type `R`. In our case, both `T` and `R` are `std::shared_ptr<MultiMessage>`.
This means that a SRF subscribe function accepts an `rxcpp::observable` of type `T` and `rxcpp::subscriber` of type `R` and returns a subscription. In our case, both `T` and `R` are `std::shared_ptr<MultiMessage>`.

All Morpheus C++ stages receive an instance of a SRF Segment and a name. Typically this is the Python class' `unique_name` property. Note that C++ segments don't receive an instance of the Morpheus config. Therefore, if there are any attributes in the config needed by the C++ class, it is the responsibility of the Python class to extract them and pass them in as parameters to the C++ class.
All Morpheus C++ stages receive an instance of a SRF Segment Builder and a name (Typically this is the Python class' `unique_name` property) when constructed from Python. Note that C++ segments don't receive an instance of the Morpheus config. Therefore, if there are any attributes in the config needed by the C++ class, it is the responsibility of the Python class to extract them and pass them in as parameters to the C++ class.

We will also define an interface proxy object to keep the class definition separated from the Python interface. This isn't strictly required, but it is a convention used internally by Morpheus. Our proxy object will define a static method named `init` which is responsible for constructing a `PassThruStage` instance and returning it wrapped in a `shared_ptr`. There are many common Python types that pybind11 [automatically converts]( to their associated C++ types. The SRF `Segment` is a C++ object with Python bindings. The proxy interface object is used to help insulate Python bindings from internal implementation details.
We will also define an interface proxy object to keep the class definition separated from the Python interface. This isn't strictly required, but it is a convention used internally by Morpheus. Our proxy object will define a static method named `init` which is responsible for constructing a `PassThruStage` instance and returning it wrapped in a `shared_ptr`. There are many common Python types that pybind11 [automatically converts]( to their associated C++ types. The SRF `Builder` is a C++ object with Python bindings. The proxy interface object is used to help insulate Python bindings from internal implementation details.

struct PassThruStageInterfaceProxy
static std::shared_ptr<PassThruStage> init(srf::Segment &seg, const std::string &name);
static std::shared_ptr<srf::segment::Object<PassThruStage>> init(srf::segment::Builder &builder, const std::string &name);

Expand All @@ -133,7 +134,7 @@ Putting it all together, our header file looks like this:
#pragma once

#include <morpheus/messages/multi.hpp> // for MultiMessage
#include <srf/core/segment.hpp> //for Segment
#include <srf/segment/builder.hpp> //for Segment
#include <pysrf/node.hpp> // for PythonNode

#include <memory>
Expand All @@ -150,18 +151,18 @@ class PassThruStage : public srf::pysrf::PythonNode<std::shared_ptr<MultiMessage
using base_t = srf::pysrf::PythonNode<std::shared_ptr<MultiMessage>, std::shared_ptr<MultiMessage>>;
using base_t::operator_fn_t;
using base_t::reader_type_t;
using base_t::writer_type_t;
using base_t::subscribe_fn_t;
using base_t::sink_type_t;
using base_t::source_type_t;

PassThruStage(const srf::Segment &seg, const std::string &name);

operator_fn_t build_operator();
subscribe_fn_t build_operator();

struct PassThruStageInterfaceProxy
static std::shared_ptr<PassThruStage> init(srf::Segment &seg, const std::string &name);
static std::shared_ptr<srf::segment::Object<PassThruStage>> init(srf::segment::Builder &builder, const std::string &name);

#pragma GCC visibility pop
Expand All @@ -180,24 +181,38 @@ Our includes section looks like:
#include <exception>

The constructor for our class is responsible for passing the output of `build_operator` to our base class, as well as calling the constructor for `srf::SegmentObject`:
The constructor for our class is responsible for passing the output of `build_operator` to our base class, as well as calling the constructor for `PythonNode`:

PassThruStage::PassThruStage(const srf::Segment& seg, const std::string& name) :
srf::SegmentObject(seg, name),
PythonNode(seg, name, build_operator())
PassThruStage::PassThruStage() :
The `build_operator` method defines an observer who is subscribed to our input `Observable`. The observer consists of three functions that are typically lambdas: `on_next`, `on_error`, and `on_completed`. Typically, these three functions call the associated methods on the output subscriber.
We can see that the output of `build_operator()` is not passed directly to the `PythonNode` constructor and instead gets passed to `base_t::op_factory_from_sub_fn()`. This is because reactive operators can be defined two ways:
1. Using the short form `std::function<rxcpp::observable<T>(rxcpp::observable<R>)` which is good when you can use an existing `rxcpp` operator
2. Using the long form `std::function<rxcpp::subscription(rxcpp::observable<T>, rxcpp::subscriber<R>)>` which allows for more customization and better control over the lifetime of objects.
It's possible to convert between the two signatures which is exactly what `base_t::op_factory_from_sub_fn()` does. If you wanted to use the short form, you could define the constructor of `PassThruStage` using:
PassThruStage::operator_fn_t PassThruStage::build_operator()
PassThruStage::PassThruStage() :
PythonNode([](rxcpp::observable<sink_type_t> obs){ return obs; })

However, this doesnt illustrate well how to customize a stage. So we will be using the long form signature for our examples.

The `build_operator` method defines an observer who is subscribed to our input `rxcpp::observable`. The observer consists of three functions that are typically lambdas: `on_next`, `on_error`, and `on_completed`. Typically, these three functions call the associated methods on the output subscriber.

PassThruStage::subscribe_fn_t PassThruStage::build_operator()
return [this](srf::Observable<reader_type_t>& input, srf::Subscriber<writer_type_t>& output) {
return [this](rxcpp::observable<sink_type_t>& input, rxcpp::subscriber<source_type_t>& output) {
return input.subscribe(
[this, &output](reader_type_t&& x) { output.on_next(std::move(x)); },
[this, &output](sink_type_t&& x) { output.on_next(std::move(x)); },
[&](std::exception_ptr error_ptr) { output.on_error(error_ptr); },
[&]() { output.on_completed(); }));
Expand All @@ -209,7 +224,7 @@ Note the use of `std::move` in the `on_next` function. In Morpheus, our messages
There are situations in which a C++ stage does need to interact with Python, and therefore acquiring the GIL is a requirement. In these situations, it is important to ensure that the GIL is released before calling the `on_next` method. This is typically accomplished using pybind11's [gil_scoped_acquire]( RAII class inside of a code block. Consider the following `on_next` lambda function from Morpheus' `SerializeStage`:

[this, &output](reader_type_t &&msg) {
[this, &output](sink_type_t &&msg) {
auto table_info = this->get_meta(msg);
std::shared_ptr<MessageMeta> meta;
Expand All @@ -224,16 +239,16 @@ We scoped the acquisition of the GIL such that it is held only for the parts of

## Python Proxy and Interface

The three things that all proxy interfaces need to do are:
1. Construct the stage wrapped in a `shared_ptr`
1. Register the stage with the SRF segment
1. Return a `shared_ptr` to the stage
The things that all proxy interfaces need to do are:
1. Construct the stage using the `srf::segment::Builder::construct_object` method
2. Return a `shared_ptr` to the stage wrapped in a `srf::segment::Object`

std::shared_ptr<PassThruStage> PassThruStageInterfaceProxy::init(srf::Segment& seg, const std::string& name)
PassThruStageInterfaceProxy::init(srf::segment::Builder& builder, const std::string& name)
auto stage = std::make_shared<PassThruStage>(seg, name);
auto stage = builder.construct_object<PassThruStage>(name);

return stage;
Expand All @@ -246,9 +261,9 @@ namespace py = pybind11;
// Define the pybind11 module m.
PYBIND11_MODULE(morpheus_example, m)
py::class_<PassThruStage, srf::SegmentObject, std::shared_ptr<PassThruStage>>(
py::class_<PassThruStage, srf::segment::ObjectProperties, std::shared_ptr<srf::segment::Object<PassThruStage>>>(
m, "PassThruStage", py::multiple_inheritance())
.def(py::init<>(&PassThruStageInterfaceProxy::init), py::arg("segment"), py::arg("name"));
.def(py::init<>(&PassThruStageInterfaceProxy::init), py::arg("builder"), py::arg("name"));

Expand All @@ -263,25 +278,25 @@ PYBIND11_MODULE(morpheus_example, m)

namespace morpheus_example {

PassThruStage::PassThruStage(const srf::Segment& seg, const std::string& name) :
srf::SegmentObject(seg, name),
PythonNode(seg, name, build_operator())
PassThruStage::PassThruStage() :

PassThruStage::operator_fn_t PassThruStage::build_operator()
PassThruStage::subscribe_fn_t PassThruStage::build_operator()
return [this](srf::Observable<reader_type_t>& input, srf::Subscriber<writer_type_t>& output) {
return [this](rxcpp::observable<sink_type_t>& input, rxcpp::subscriber<source_type_t>& output) {
return input.subscribe(
srf::make_observer<reader_type_t>([this, &output](reader_type_t&& x) { output.on_next(std::move(x)); },
rxcpp::make_observer<sink_type_t>([this, &output](sink_type_t&& x) { output.on_next(std::move(x)); },
[&](std::exception_ptr error_ptr) { output.on_error(error_ptr); },
[&]() { output.on_completed(); }));

std::shared_ptr<PassThruStage> PassThruStageInterfaceProxy::init(srf::Segment& seg, const std::string& name)
PassThruStageInterfaceProxy::init(srf::segment::Builder& builder, const std::string& name)
auto stage = std::make_shared<PassThruStage>(seg, name);
auto stage = builder.construct_object<PassThruStage>(name);

return stage;

Expand All @@ -290,7 +305,7 @@ namespace py = pybind11;
// Define the pybind11 module m.
PYBIND11_MODULE(morpheus_example, m)
py::class_<PassThruStage, srf::SegmentObject, std::shared_ptr<PassThruStage>>(
py::class_<PassThruStage, srf::segment::ObjectProperties, std::shared_ptr<srf::segment::Object<PassThruStage>>>(
m, "PassThruStage", py::multiple_inheritance())
.def(py::init<>(&PassThruStageInterfaceProxy::init), py::arg("segment"), py::arg("name"));
Expand All @@ -317,7 +332,7 @@ def supports_cpp_node(self):
def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair:
if self._build_cpp_node():
print("building cpp")
node = morpheus_example_cpp.PassThruStage(seg, self.unique_name)
node = morpheus_example_cpp.PassThruStage(builder, self.unique_name)
node = builder.make_node(self.unique_name, self.on_data)

Expand Down

0 comments on commit 811eb9d

Please sign in to comment.