Skip to content

Commit

Permalink
Support ControlMessage output in the C++ impl of DeserializeStage (#1478
Browse files Browse the repository at this point in the history
)

* C++ impl of `DeserializeStage` now templates the output message type
* Add the ability to cast a Python instance of a `MessageMeta` to the C++ instance of `MessageMeta`. This removes the need to explicitly import the C++ impl of `MessageMeta` in order to set the payload of a `ControMessage`.
* Use C++ mode by default for LLM examples
* Support cudf DataFrames for both `WebScraperStage` and `RSSController`
* utility method `show_warning_message` now marked visible in lib
* Move `tests/test_deserialize_stage_pipe.py` -> `tests/stages/test_deserialize_stage_pipe.py` & `tests/test_message_meta.py` -> `tests/messages/test_message_meta.py`
* Update docstrings

Closes #1328
Closes #1480
Closes #1481 
Closes #1342

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1478
  • Loading branch information
dagardner-nv authored Feb 12, 2024
1 parent 33751a5 commit 77cc0e5
Show file tree
Hide file tree
Showing 34 changed files with 378 additions and 209 deletions.
2 changes: 0 additions & 2 deletions ci/iwyu/mappings.imp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
{ "symbol": ["nlohmann::json", "private", "<nlohmann/json_fwd.hpp>", "public"] },

# pybind11
{ "symbol": ["pybind11", "private", "<pybind11/cast.h>", "public"] },
{ "symbol": ["pybind11", "private", "<pybind11/embed.h>", "public"] },
{ "symbol": ["pybind11", "private", "<pybind11/pybind11.h>", "public"] },
{ "symbol": ["pybind11", "private", "<pybind11/pytypes.h>", "public"] },
Expand All @@ -133,7 +132,6 @@
{ "symbol": ["PyObject", "private", "<pybind11/pybind11.h>", "public"] },
{ "symbol": ["PySequence_GetItem", "private", "<pybind11/pybind11.h>", "public"] },
{ "symbol": ["pybind11::arg", "private", "<pybind11/pybind11.h>", "public"] },
{ "symbol": ["pybind11::detail::get_type_info", "private", "<pybind11/cast.h>", "public"] },
{ "symbol": ["pybind11::detail::key_error", "private", "<pybind11/pytypes.h>", "public"] },
{ "symbol": ["pybind11::detail::overload_cast_impl", "private", "<pybind11/pybind11.h>", "public"] },
{ "symbol": ["pybind11::detail::str_attr_accessor", "private", "<pybind11/pybind11.h>", "public"] },
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
callback=parse_log_level,
help="Specify the logging level to use.")
@click.option('--use_cpp',
default=False,
default=True,
type=bool,
help=("Whether or not to use C++ node and message types or to prefer python. "
"Only use as a last resort if bugs are encountered"))
Expand Down
6 changes: 1 addition & 5 deletions examples/llm/vdb_upload/module/web_scraper_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@

import mrc
import mrc.core.operators as ops
import pandas as pd
import requests
import requests_cache
from bs4 import BeautifulSoup
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pydantic import BaseModel
from pydantic import BaseModel # pylint: disable=no-name-in-module
from pydantic import ValidationError

import cudf
Expand Down Expand Up @@ -60,9 +59,6 @@ def download_and_split(msg: MessageMeta, text_splitter, link_column, session) ->

df = msg.copy_dataframe()

if isinstance(df, cudf.DataFrame):
df: pd.DataFrame = df.to_pandas()

# Convert the dataframe into a list of dictionaries
df_dicts = df.to_dict(orient="records")

Expand Down
10 changes: 7 additions & 3 deletions morpheus/_lib/include/morpheus/messages/control.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#pragma once

#include "morpheus/messages/memory/tensor_memory.hpp"
#include "morpheus/messages/meta.hpp"

#include <nlohmann/json.hpp>
Expand All @@ -29,7 +28,6 @@
#include <string>

namespace morpheus {
class MessageMeta;

#pragma GCC visibility push(default)
enum class ControlMessageType
Expand Down Expand Up @@ -246,7 +244,7 @@ class ControlMessage
const nlohmann::json list_metadata() const;

/**
* @brief Set the payload object for the control message.
* @brief Get the payload object for the control message.
* @param payload
* A shared pointer to the message payload.
*/
Expand Down Expand Up @@ -308,6 +306,12 @@ struct ControlMessageProxy
static pybind11::object get_metadata(ControlMessage& self, std::optional<std::string> const& key);

static pybind11::dict list_metadata(ControlMessage& self);

/**
* @brief Set the payload object given a Python instance of MessageMeta
* @param meta
*/
static void payload_from_python_meta(ControlMessage& self, const pybind11::object& meta);
};

#pragma GCC visibility pop
Expand Down
9 changes: 9 additions & 0 deletions morpheus/_lib/include/morpheus/messages/meta.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ struct MessageMetaInterfaceProxy
*/
static std::shared_ptr<MessageMeta> init_python(pybind11::object&& data_frame);

/**
* @brief Initialize MessageMeta cpp object with a given a MessageMeta python objectand returns shared pointer as
* the result
*
* @param meta : Python MesageMeta object
* @return std::shared_ptr<MessageMeta>
*/
static std::shared_ptr<MessageMeta> init_python_meta(const pybind11::object& meta);

/**
* @brief Get messages count
*
Expand Down
147 changes: 116 additions & 31 deletions morpheus/_lib/include/morpheus/stages/deserialize.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,31 @@

#pragma once

#include "morpheus/messages/control.hpp"
#include "morpheus/messages/meta.hpp"
#include "morpheus/messages/multi.hpp"
#include "morpheus/types.hpp" // for TensorIndex

#include <boost/fiber/context.hpp>
#include <boost/fiber/future/future.hpp>
#include <mrc/node/rx_sink_base.hpp>
#include <mrc/node/rx_source_base.hpp>
#include <mrc/node/sink_properties.hpp>
#include <mrc/node/source_properties.hpp>
#include "morpheus/types.hpp" // for TensorIndex
#include "morpheus/utilities/python_util.hpp" // for show_warning_message
#include "morpheus/utilities/string_util.hpp" // for MORPHEUS_CONCAT_STR

#include <glog/logging.h>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <mrc/types.hpp>
#include <nlohmann/json.hpp>
#include <pybind11/pytypes.h> // for object
#include <pyerrors.h> // for PyExc_RuntimeWarning
#include <pymrc/node.hpp>
#include <rxcpp/rx.hpp>
// IWYU pragma: no_include "rxcpp/sources/rx-iterate.hpp"

#include <map>
#include <algorithm> // IWYU pragma: keep for std::min
#include <exception> // for exception_ptr
#include <memory>
#include <sstream> // IWYU pragma: keep for glog
#include <string>
#include <thread>
#include <vector>
#include <utility> // for pair

namespace morpheus {
/****** Component public implementations *******************/
/****** DeserializationStage********************************/

/**
* @addtogroup stages
Expand All @@ -51,15 +50,26 @@ namespace morpheus {
*/

#pragma GCC visibility push(default)
/**
* @brief Slices incoming Dataframes into smaller `batch_size`'d chunks. This stage accepts the `MessageMeta` output
* from `FileSourceStage`/`KafkaSourceStage` stages breaking them up into into `MultiMessage`'s. This should be one of
* the first stages after the `Source` object.
*/
class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<MultiMessage>>
using cm_task_t = std::pair<std::string, nlohmann::json>;

void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
TensorIndex start,
TensorIndex stop,
cm_task_t* task,
std::shared_ptr<MultiMessage>& windowed_message);

void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
TensorIndex start,
TensorIndex stop,
cm_task_t* task,
std::shared_ptr<ControlMessage>& windowed_message);

/****** DeserializationStage********************************/
template <typename OutputT>
class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<OutputT>>
{
public:
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<MultiMessage>>;
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<OutputT>>;
using typename base_t::sink_type_t;
using typename base_t::source_type_t;
using typename base_t::subscribe_fn_t;
Expand All @@ -69,17 +79,22 @@ class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMe
*
* @param batch_size Number of messages to be divided into each batch
* @param ensure_sliceable_index Whether or not to call `ensure_sliceable_index()` on all incoming `MessageMeta`
* @param task Optional task to be added to all outgoing `ControlMessage`s, ignored when `OutputT` is `MultiMessage`
*/
DeserializeStage(TensorIndex batch_size, bool ensure_sliceable_index = true);
DeserializeStage(TensorIndex batch_size,
bool ensure_sliceable_index = true,
std::unique_ptr<cm_task_t> task = nullptr) :
base_t(base_t::op_factory_from_sub_fn(build_operator())),
m_batch_size(batch_size),
m_ensure_sliceable_index(ensure_sliceable_index),
m_task(std::move(task)){};

private:
/**
* TODO(Documentation)
*/
subscribe_fn_t build_operator();

TensorIndex m_batch_size;
bool m_ensure_sliceable_index{true};
std::unique_ptr<cm_task_t> m_task{nullptr};
};

/****** DeserializationStageInterfaceProxy******************/
Expand All @@ -89,18 +104,88 @@ class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMe
struct DeserializeStageInterfaceProxy
{
/**
* @brief Create and initialize a DeserializationStage, and return the result
* @brief Create and initialize a DeserializationStage that emits MultiMessage's, and return the result
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
* @param batch_size : Number of messages to be divided into each batch
* @return std::shared_ptr<mrc::segment::Object<DeserializeStage>>
* @param ensure_sliceable_index Whether or not to call `ensure_sliceable_index()` on all incoming `MessageMeta`
* @return std::shared_ptr<mrc::segment::Object<DeserializeStage<MultiMessage>>>
*/
static std::shared_ptr<mrc::segment::Object<DeserializeStage>> init(mrc::segment::Builder& builder,
const std::string& name,
TensorIndex batch_size,
bool ensure_sliceable_index);
static std::shared_ptr<mrc::segment::Object<DeserializeStage<MultiMessage>>> init_multi(
mrc::segment::Builder& builder, const std::string& name, TensorIndex batch_size, bool ensure_sliceable_index);

/**
* @brief Create and initialize a DeserializationStage that emits ControlMessage's, and return the result.
* If `task_type` is not None, `task_payload` must also be not None, and vice versa.
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
* @param batch_size : Number of messages to be divided into each batch
* @param ensure_sliceable_index Whether or not to call `ensure_sliceable_index()` on all incoming `MessageMeta`
* @param task_type : Optional task type to be added to all outgoing messages
* @param task_payload : Optional json object describing the task to be added to all outgoing messages
* @return std::shared_ptr<mrc::segment::Object<DeserializeStage<ControlMessage>>>
*/
static std::shared_ptr<mrc::segment::Object<DeserializeStage<ControlMessage>>> init_cm(
mrc::segment::Builder& builder,
const std::string& name,
TensorIndex batch_size,
bool ensure_sliceable_index,
const pybind11::object& task_type,
const pybind11::object& task_payload);
};

template <typename OutputT>
typename DeserializeStage<OutputT>::subscribe_fn_t DeserializeStage<OutputT>::build_operator()
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output](sink_type_t incoming_message) {
if (!incoming_message->has_sliceable_index())
{
if (m_ensure_sliceable_index)
{
auto old_index_name = incoming_message->ensure_sliceable_index();

if (old_index_name.has_value())
{
// Generate a warning
LOG(WARNING) << MORPHEUS_CONCAT_STR(
"Incoming MessageMeta does not have a unique and monotonic index. Updating index "
"to be unique. Existing index will be retained in column '"
<< *old_index_name << "'");
}
}
else
{
utilities::show_warning_message(
"Detected a non-sliceable index on an incoming MessageMeta. Performance when taking slices "
"of messages may be degraded. Consider setting `ensure_sliceable_index==True`",
PyExc_RuntimeWarning);
}
}
// Loop over the MessageMeta and create sub-batches
for (TensorIndex i = 0; i < incoming_message->count(); i += this->m_batch_size)
{
std::shared_ptr<OutputT> windowed_message{nullptr};
make_output_message(incoming_message,
i,
std::min(i + this->m_batch_size, incoming_message->count()),
m_task.get(),
windowed_message);
output.on_next(std::move(windowed_message));
}
},
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
output.on_completed();
}));
};
}

#pragma GCC visibility pop
/** @} */ // end of group
} // namespace morpheus
2 changes: 2 additions & 0 deletions morpheus/_lib/include/morpheus/utilities/python_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ using PyObject = _object; // NOLINT(readability-identifier-naming)

namespace morpheus::utilities {

#pragma GCC visibility push(default)
/**
* @brief Shows a python warning using the `warnings.warn` module. These warnings can be suppressed and work different
* than `logger.warn()`
Expand All @@ -38,5 +39,6 @@ namespace morpheus::utilities {
void show_warning_message(const std::string& deprecation_message,
PyObject* category = nullptr,
ssize_t stack_level = 1);
#pragma GCC visibility pop

} // namespace morpheus::utilities
2 changes: 2 additions & 0 deletions morpheus/_lib/messages/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class ControlMessage():
def payload(self) -> MessageMeta: ...
@typing.overload
def payload(self, arg0: MessageMeta) -> None: ...
@typing.overload
def payload(self, meta: object) -> None: ...
def remove_task(self, task_type: str) -> dict: ...
def set_metadata(self, key: str, value: object) -> None: ...
@typing.overload
Expand Down
14 changes: 4 additions & 10 deletions morpheus/_lib/messages/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,11 @@
#include "morpheus/messages/multi_tensor.hpp"
#include "morpheus/objects/data_table.hpp"
#include "morpheus/objects/mutable_table_ctx_mgr.hpp"
#include "morpheus/types.hpp" // for TensorIndex
#include "morpheus/utilities/cudf_util.hpp"
#include "morpheus/utilities/string_util.hpp"
#include "morpheus/version.hpp"

#include <boost/fiber/future/future.hpp>
#include <mrc/channel/status.hpp> // for Status
#include <mrc/edge/edge_connector.hpp>
#include <mrc/node/rx_sink_base.hpp>
#include <mrc/node/rx_source_base.hpp>
#include <mrc/types.hpp>
#include <nlohmann/json.hpp>
#include <pybind11/cast.h>
#include <pybind11/functional.h> // IWYU pragma: keep
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
Expand All @@ -57,9 +49,7 @@
#include <pymrc/utils.hpp> // for pymrc::import
#include <rxcpp/rx.hpp>

#include <array>
#include <filesystem>
#include <map>
#include <memory>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -386,6 +376,10 @@ PYBIND11_MODULE(messages, _module)
.def("list_metadata", &ControlMessageProxy::list_metadata)
.def("payload", pybind11::overload_cast<>(&ControlMessage::payload), py::return_value_policy::move)
.def("payload", pybind11::overload_cast<const std::shared_ptr<MessageMeta>&>(&ControlMessage::payload))
.def(
"payload",
pybind11::overload_cast<ControlMessage&, const py::object&>(&ControlMessageProxy::payload_from_python_meta),
py::arg("meta"))
.def("remove_task", &ControlMessageProxy::remove_task, py::arg("task_type"))
.def("set_metadata", &ControlMessageProxy::set_metadata, py::arg("key"), py::arg("value"))
.def("task_type", pybind11::overload_cast<>(&ControlMessage::task_type))
Expand Down
7 changes: 7 additions & 0 deletions morpheus/_lib/src/messages/control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "morpheus/messages/control.hpp"

#include "morpheus/messages/meta.hpp"

#include <glog/logging.h>
#include <pybind11/pytypes.h>
#include <pymrc/utils.hpp>
Expand Down Expand Up @@ -261,4 +263,9 @@ void ControlMessageProxy::config(ControlMessage& self, py::dict& config)
self.config(mrc::pymrc::cast_from_pyobject(config));
}

void ControlMessageProxy::payload_from_python_meta(ControlMessage& self, const pybind11::object& meta)
{
self.payload(MessageMetaInterfaceProxy::init_python_meta(meta));
}

} // namespace morpheus
Loading

0 comments on commit 77cc0e5

Please sign in to comment.