Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ControlMessage output in the C++ impl of DeserializeStage #1478

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
b4143f3
Relocate test under tests/stages
dagardner-nv Jan 24, 2024
a821405
Update tests to parametarize over the message_type
dagardner-nv Jan 24, 2024
1e4d948
Remove task_type and task_payload arguments
dagardner-nv Jan 24, 2024
40b37a2
Fix tests
dagardner-nv Jan 24, 2024
323b34f
First pass at C++ impl, need to consolidate logic into base
dagardner-nv Jan 25, 2024
035c4c9
Not working: Consolidate logic
dagardner-nv Jan 25, 2024
e6d1cf6
Just use a pair, fix creation of null shared_ptr
dagardner-nv Jan 25, 2024
4d3625f
Update CR
dagardner-nv Jan 25, 2024
2f08e41
Rename methods
dagardner-nv Jan 25, 2024
f8b7f17
Enable C++ mode by default in LLM examples
dagardner-nv Jan 26, 2024
38ba5fd
Remove use_python restriction
dagardner-nv Jan 26, 2024
70797bc
Update CR Year
dagardner-nv Jan 26, 2024
456d794
Remove unused imports
dagardner-nv Jan 26, 2024
d3bb42d
Fix comment
dagardner-nv Jan 26, 2024
888e16d
Don't expose the templates to the python module, instead just expose …
dagardner-nv Jan 26, 2024
4bda0a4
Remove symbol mappins, fix for #1480
dagardner-nv Jan 26, 2024
9ac1ec4
IWYU fixes
dagardner-nv Jan 26, 2024
baf3e21
Update docstrings
dagardner-nv Jan 26, 2024
d442584
Use slices rather than copies
dagardner-nv Jan 26, 2024
9de089a
Add the ability to cast a python MessageMeta to a cpp impl
dagardner-nv Jan 26, 2024
329f92b
Add the ability to cast a python MessageMeta to a C++ MessageMeta, an…
dagardner-nv Jan 26, 2024
7dcc098
Add docstring
dagardner-nv Jan 26, 2024
b5fafeb
Relocate message meta test
dagardner-nv Jan 26, 2024
92c358c
Update CR year
dagardner-nv Jan 26, 2024
7086deb
Test new message meta casts
dagardner-nv Jan 26, 2024
af3c386
IWYU fixes
dagardner-nv Jan 26, 2024
33bf5ff
No longer need to explicitly import the C++ impl
dagardner-nv Jan 26, 2024
dbd7e1c
Update CR year
dagardner-nv Jan 26, 2024
e28fa33
Release gil prior to calling get_column_names()
dagardner-nv Jan 27, 2024
c440f24
Add test for get_column_names
dagardner-nv Jan 29, 2024
ec2e105
Emit cudf dataframes rather than pandas
dagardner-nv Jan 29, 2024
0120ba1
Emit cudf DFs instread of pandas
dagardner-nv Jan 29, 2024
e597c8e
Remove disabling of C++ mode
dagardner-nv Jan 29, 2024
3bf302f
update tests to expect cudf
dagardner-nv Jan 29, 2024
eaebd0e
Remove unused imports
dagardner-nv Jan 29, 2024
c22f6a3
Update CR year
dagardner-nv Jan 29, 2024
e0173bc
Pin pytest version to 7.4
dagardner-nv Jan 29, 2024
fd484a4
Update deps
dagardner-nv Jan 29, 2024
11b63c3
Merge branch 'branch-24.03' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Feb 8, 2024
b173230
Remove redundant call to pybind11::gil_scoped_release, this triggers …
dagardner-nv Feb 8, 2024
b83a9dd
No need to cast to pandas anymore
dagardner-nv Feb 8, 2024
4d8ca60
No need to explicitly use the C++ impl for messagemeta
dagardner-nv Feb 8, 2024
e735d84
Add docstrings, remove out of date comment
dagardner-nv Feb 8, 2024
a27fa91
Remove un-needed import of C++ impl
dagardner-nv Feb 8, 2024
280017a
Merge branch 'branch-24.03' into david-deserialize-control-message-1328
dagardner-nv Feb 8, 2024
f8c22a8
Remove unused import
dagardner-nv Feb 9, 2024
25052a0
pylint fixes
dagardner-nv Feb 9, 2024
396cf08
Merge branch 'david-deserialize-control-message-1328' of github.com:d…
dagardner-nv Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ci/iwyu/mappings.imp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
{ "symbol": ["nlohmann::json", "private", "<nlohmann/json_fwd.hpp>", "public"] },

# pybind11
{ "symbol": ["pybind11", "private", "<pybind11/cast.h>", "public"] },
{ "symbol": ["pybind11", "private", "<pybind11/embed.h>", "public"] },
{ "symbol": ["pybind11", "private", "<pybind11/pybind11.h>", "public"] },
{ "symbol": ["pybind11", "private", "<pybind11/pytypes.h>", "public"] },
Expand All @@ -133,7 +132,6 @@
{ "symbol": ["PyObject", "private", "<pybind11/pybind11.h>", "public"] },
{ "symbol": ["PySequence_GetItem", "private", "<pybind11/pybind11.h>", "public"] },
{ "symbol": ["pybind11::arg", "private", "<pybind11/pybind11.h>", "public"] },
{ "symbol": ["pybind11::detail::get_type_info", "private", "<pybind11/cast.h>", "public"] },
{ "symbol": ["pybind11::detail::key_error", "private", "<pybind11/pytypes.h>", "public"] },
{ "symbol": ["pybind11::detail::overload_cast_impl", "private", "<pybind11/pybind11.h>", "public"] },
{ "symbol": ["pybind11::detail::str_attr_accessor", "private", "<pybind11/pybind11.h>", "public"] },
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
callback=parse_log_level,
help="Specify the logging level to use.")
@click.option('--use_cpp',
default=False,
default=True,
type=bool,
help=("Whether or not to use C++ node and message types or to prefer python. "
"Only use as a last resort if bugs are encountered"))
Expand Down
6 changes: 1 addition & 5 deletions examples/llm/vdb_upload/module/web_scraper_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@

import mrc
import mrc.core.operators as ops
import pandas as pd
import requests
import requests_cache
from bs4 import BeautifulSoup
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pydantic import BaseModel
from pydantic import BaseModel # pylint: disable=no-name-in-module
from pydantic import ValidationError

import cudf
Expand Down Expand Up @@ -60,9 +59,6 @@ def download_and_split(msg: MessageMeta, text_splitter, link_column, session) ->

df = msg.copy_dataframe()

if isinstance(df, cudf.DataFrame):
df: pd.DataFrame = df.to_pandas()

# Convert the dataframe into a list of dictionaries
df_dicts = df.to_dict(orient="records")

Expand Down
10 changes: 7 additions & 3 deletions morpheus/_lib/include/morpheus/messages/control.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#pragma once

#include "morpheus/messages/memory/tensor_memory.hpp"
#include "morpheus/messages/meta.hpp"

#include <nlohmann/json.hpp>
Expand All @@ -29,7 +28,6 @@
#include <string>

namespace morpheus {
class MessageMeta;

#pragma GCC visibility push(default)
enum class ControlMessageType
Expand Down Expand Up @@ -246,7 +244,7 @@ class ControlMessage
const nlohmann::json list_metadata() const;

/**
* @brief Set the payload object for the control message.
* @brief Get the payload object for the control message.
* @param payload
* A shared pointer to the message payload.
*/
Expand Down Expand Up @@ -308,6 +306,12 @@ struct ControlMessageProxy
static pybind11::object get_metadata(ControlMessage& self, std::optional<std::string> const& key);

static pybind11::dict list_metadata(ControlMessage& self);

/**
* @brief Set the payload object given a Python instance of MessageMeta
* @param meta
*/
static void payload_from_python_meta(ControlMessage& self, const pybind11::object& meta);
};

#pragma GCC visibility pop
Expand Down
9 changes: 9 additions & 0 deletions morpheus/_lib/include/morpheus/messages/meta.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ struct MessageMetaInterfaceProxy
*/
static std::shared_ptr<MessageMeta> init_python(pybind11::object&& data_frame);

/**
* @brief Initialize MessageMeta cpp object with a given a MessageMeta python objectand returns shared pointer as
* the result
*
* @param meta : Python MesageMeta object
* @return std::shared_ptr<MessageMeta>
*/
static std::shared_ptr<MessageMeta> init_python_meta(const pybind11::object& meta);

/**
* @brief Get messages count
*
Expand Down
147 changes: 116 additions & 31 deletions morpheus/_lib/include/morpheus/stages/deserialize.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,31 @@

#pragma once

#include "morpheus/messages/control.hpp"
#include "morpheus/messages/meta.hpp"
#include "morpheus/messages/multi.hpp"
#include "morpheus/types.hpp" // for TensorIndex

#include <boost/fiber/context.hpp>
#include <boost/fiber/future/future.hpp>
#include <mrc/node/rx_sink_base.hpp>
#include <mrc/node/rx_source_base.hpp>
#include <mrc/node/sink_properties.hpp>
#include <mrc/node/source_properties.hpp>
#include "morpheus/types.hpp" // for TensorIndex
#include "morpheus/utilities/python_util.hpp" // for show_warning_message
#include "morpheus/utilities/string_util.hpp" // for MORPHEUS_CONCAT_STR

#include <glog/logging.h>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <mrc/types.hpp>
#include <nlohmann/json.hpp>
#include <pybind11/pytypes.h> // for object
#include <pyerrors.h> // for PyExc_RuntimeWarning
#include <pymrc/node.hpp>
#include <rxcpp/rx.hpp>
// IWYU pragma: no_include "rxcpp/sources/rx-iterate.hpp"

#include <map>
#include <algorithm> // IWYU pragma: keep for std::min
#include <exception> // for exception_ptr
#include <memory>
#include <sstream> // IWYU pragma: keep for glog
#include <string>
#include <thread>
#include <vector>
#include <utility> // for pair

namespace morpheus {
/****** Component public implementations *******************/
/****** DeserializationStage********************************/

/**
* @addtogroup stages
Expand All @@ -51,15 +50,26 @@ namespace morpheus {
*/

#pragma GCC visibility push(default)
/**
* @brief Slices incoming Dataframes into smaller `batch_size`'d chunks. This stage accepts the `MessageMeta` output
* from `FileSourceStage`/`KafkaSourceStage` stages breaking them up into into `MultiMessage`'s. This should be one of
* the first stages after the `Source` object.
*/
class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<MultiMessage>>
using cm_task_t = std::pair<std::string, nlohmann::json>;

void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
TensorIndex start,
TensorIndex stop,
cm_task_t* task,
std::shared_ptr<MultiMessage>& windowed_message);

void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
TensorIndex start,
TensorIndex stop,
cm_task_t* task,
std::shared_ptr<ControlMessage>& windowed_message);

/****** DeserializationStage********************************/
template <typename OutputT>
class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<OutputT>>
{
public:
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<MultiMessage>>;
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<OutputT>>;
using typename base_t::sink_type_t;
using typename base_t::source_type_t;
using typename base_t::subscribe_fn_t;
Expand All @@ -69,17 +79,22 @@ class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMe
*
* @param batch_size Number of messages to be divided into each batch
* @param ensure_sliceable_index Whether or not to call `ensure_sliceable_index()` on all incoming `MessageMeta`
* @param task Optional task to be added to all outgoing `ControlMessage`s, ignored when `OutputT` is `MultiMessage`
*/
DeserializeStage(TensorIndex batch_size, bool ensure_sliceable_index = true);
DeserializeStage(TensorIndex batch_size,
bool ensure_sliceable_index = true,
std::unique_ptr<cm_task_t> task = nullptr) :
base_t(base_t::op_factory_from_sub_fn(build_operator())),
m_batch_size(batch_size),
m_ensure_sliceable_index(ensure_sliceable_index),
m_task(std::move(task)){};

private:
/**
* TODO(Documentation)
*/
subscribe_fn_t build_operator();

TensorIndex m_batch_size;
bool m_ensure_sliceable_index{true};
std::unique_ptr<cm_task_t> m_task{nullptr};
};

/****** DeserializationStageInterfaceProxy******************/
Expand All @@ -89,18 +104,88 @@ class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMe
struct DeserializeStageInterfaceProxy
{
/**
* @brief Create and initialize a DeserializationStage, and return the result
* @brief Create and initialize a DeserializationStage that emits MultiMessage's, and return the result
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
* @param batch_size : Number of messages to be divided into each batch
* @return std::shared_ptr<mrc::segment::Object<DeserializeStage>>
* @param ensure_sliceable_index Whether or not to call `ensure_sliceable_index()` on all incoming `MessageMeta`
* @return std::shared_ptr<mrc::segment::Object<DeserializeStage<MultiMessage>>>
*/
static std::shared_ptr<mrc::segment::Object<DeserializeStage>> init(mrc::segment::Builder& builder,
const std::string& name,
TensorIndex batch_size,
bool ensure_sliceable_index);
static std::shared_ptr<mrc::segment::Object<DeserializeStage<MultiMessage>>> init_multi(
mrc::segment::Builder& builder, const std::string& name, TensorIndex batch_size, bool ensure_sliceable_index);

/**
* @brief Create and initialize a DeserializationStage that emits ControlMessage's, and return the result.
* If `task_type` is not None, `task_payload` must also be not None, and vice versa.
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
* @param batch_size : Number of messages to be divided into each batch
* @param ensure_sliceable_index Whether or not to call `ensure_sliceable_index()` on all incoming `MessageMeta`
* @param task_type : Optional task type to be added to all outgoing messages
* @param task_payload : Optional json object describing the task to be added to all outgoing messages
* @return std::shared_ptr<mrc::segment::Object<DeserializeStage<ControlMessage>>>
*/
static std::shared_ptr<mrc::segment::Object<DeserializeStage<ControlMessage>>> init_cm(
mrc::segment::Builder& builder,
const std::string& name,
TensorIndex batch_size,
bool ensure_sliceable_index,
const pybind11::object& task_type,
const pybind11::object& task_payload);
};

template <typename OutputT>
typename DeserializeStage<OutputT>::subscribe_fn_t DeserializeStage<OutputT>::build_operator()
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output](sink_type_t incoming_message) {
if (!incoming_message->has_sliceable_index())
{
if (m_ensure_sliceable_index)
{
auto old_index_name = incoming_message->ensure_sliceable_index();

if (old_index_name.has_value())
{
// Generate a warning
LOG(WARNING) << MORPHEUS_CONCAT_STR(
"Incoming MessageMeta does not have a unique and monotonic index. Updating index "
"to be unique. Existing index will be retained in column '"
<< *old_index_name << "'");
}
}
else
{
utilities::show_warning_message(
"Detected a non-sliceable index on an incoming MessageMeta. Performance when taking slices "
"of messages may be degraded. Consider setting `ensure_sliceable_index==True`",
PyExc_RuntimeWarning);
}
}
// Loop over the MessageMeta and create sub-batches
for (TensorIndex i = 0; i < incoming_message->count(); i += this->m_batch_size)
{
std::shared_ptr<OutputT> windowed_message{nullptr};
make_output_message(incoming_message,
i,
std::min(i + this->m_batch_size, incoming_message->count()),
m_task.get(),
windowed_message);
output.on_next(std::move(windowed_message));
}
},
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
output.on_completed();
}));
};
}

#pragma GCC visibility pop
/** @} */ // end of group
} // namespace morpheus
2 changes: 2 additions & 0 deletions morpheus/_lib/include/morpheus/utilities/python_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ using PyObject = _object; // NOLINT(readability-identifier-naming)

namespace morpheus::utilities {

#pragma GCC visibility push(default)
/**
* @brief Shows a python warning using the `warnings.warn` module. These warnings can be suppressed and work different
* than `logger.warn()`
Expand All @@ -38,5 +39,6 @@ namespace morpheus::utilities {
void show_warning_message(const std::string& deprecation_message,
PyObject* category = nullptr,
ssize_t stack_level = 1);
#pragma GCC visibility pop

} // namespace morpheus::utilities
2 changes: 2 additions & 0 deletions morpheus/_lib/messages/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class ControlMessage():
def payload(self) -> MessageMeta: ...
@typing.overload
def payload(self, arg0: MessageMeta) -> None: ...
@typing.overload
def payload(self, meta: object) -> None: ...
def remove_task(self, task_type: str) -> dict: ...
def set_metadata(self, key: str, value: object) -> None: ...
@typing.overload
Expand Down
14 changes: 4 additions & 10 deletions morpheus/_lib/messages/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,11 @@
#include "morpheus/messages/multi_tensor.hpp"
#include "morpheus/objects/data_table.hpp"
#include "morpheus/objects/mutable_table_ctx_mgr.hpp"
#include "morpheus/types.hpp" // for TensorIndex
#include "morpheus/utilities/cudf_util.hpp"
#include "morpheus/utilities/string_util.hpp"
#include "morpheus/version.hpp"

#include <boost/fiber/future/future.hpp>
#include <mrc/channel/status.hpp> // for Status
#include <mrc/edge/edge_connector.hpp>
#include <mrc/node/rx_sink_base.hpp>
#include <mrc/node/rx_source_base.hpp>
#include <mrc/types.hpp>
#include <nlohmann/json.hpp>
#include <pybind11/cast.h>
#include <pybind11/functional.h> // IWYU pragma: keep
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
Expand All @@ -57,9 +49,7 @@
#include <pymrc/utils.hpp> // for pymrc::import
#include <rxcpp/rx.hpp>

#include <array>
#include <filesystem>
#include <map>
#include <memory>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -386,6 +376,10 @@ PYBIND11_MODULE(messages, _module)
.def("list_metadata", &ControlMessageProxy::list_metadata)
.def("payload", pybind11::overload_cast<>(&ControlMessage::payload), py::return_value_policy::move)
.def("payload", pybind11::overload_cast<const std::shared_ptr<MessageMeta>&>(&ControlMessage::payload))
.def(
"payload",
pybind11::overload_cast<ControlMessage&, const py::object&>(&ControlMessageProxy::payload_from_python_meta),
py::arg("meta"))
.def("remove_task", &ControlMessageProxy::remove_task, py::arg("task_type"))
.def("set_metadata", &ControlMessageProxy::set_metadata, py::arg("key"), py::arg("value"))
.def("task_type", pybind11::overload_cast<>(&ControlMessage::task_type))
Expand Down
7 changes: 7 additions & 0 deletions morpheus/_lib/src/messages/control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "morpheus/messages/control.hpp"

#include "morpheus/messages/meta.hpp"

#include <glog/logging.h>
#include <pybind11/pytypes.h>
#include <pymrc/utils.hpp>
Expand Down Expand Up @@ -261,4 +263,9 @@ void ControlMessageProxy::config(ControlMessage& self, py::dict& config)
self.config(mrc::pymrc::cast_from_pyobject(config));
}

void ControlMessageProxy::payload_from_python_meta(ControlMessage& self, const pybind11::object& meta)
{
self.payload(MessageMetaInterfaceProxy::init_python_meta(meta));
}

} // namespace morpheus
Loading
Loading