Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ControlMessage output in the C++ impl of DeserializeStage #1478

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
b4143f3
Relocate test under tests/stages
dagardner-nv Jan 24, 2024
a821405
Update tests to parametarize over the message_type
dagardner-nv Jan 24, 2024
1e4d948
Remove task_type and task_payload arguments
dagardner-nv Jan 24, 2024
40b37a2
Fix tests
dagardner-nv Jan 24, 2024
323b34f
First pass at C++ impl, need to consolidate logic into base
dagardner-nv Jan 25, 2024
035c4c9
Not working: Consolidate logic
dagardner-nv Jan 25, 2024
e6d1cf6
Just use a pair, fix creation of null shared_ptr
dagardner-nv Jan 25, 2024
4d3625f
Update CR
dagardner-nv Jan 25, 2024
2f08e41
Rename methods
dagardner-nv Jan 25, 2024
f8b7f17
Enable C++ mode by default in LLM examples
dagardner-nv Jan 26, 2024
38ba5fd
Remove use_python restriction
dagardner-nv Jan 26, 2024
70797bc
Update CR Year
dagardner-nv Jan 26, 2024
456d794
Remove unused imports
dagardner-nv Jan 26, 2024
d3bb42d
Fix comment
dagardner-nv Jan 26, 2024
888e16d
Don't expose the templates to the python module, instead just expose …
dagardner-nv Jan 26, 2024
4bda0a4
Remove symbol mappins, fix for #1480
dagardner-nv Jan 26, 2024
9ac1ec4
IWYU fixes
dagardner-nv Jan 26, 2024
baf3e21
Update docstrings
dagardner-nv Jan 26, 2024
d442584
Use slices rather than copies
dagardner-nv Jan 26, 2024
9de089a
Add the ability to cast a python MessageMeta to a cpp impl
dagardner-nv Jan 26, 2024
329f92b
Add the ability to cast a python MessageMeta to a C++ MessageMeta, an…
dagardner-nv Jan 26, 2024
7dcc098
Add docstring
dagardner-nv Jan 26, 2024
b5fafeb
Relocate message meta test
dagardner-nv Jan 26, 2024
92c358c
Update CR year
dagardner-nv Jan 26, 2024
7086deb
Test new message meta casts
dagardner-nv Jan 26, 2024
af3c386
IWYU fixes
dagardner-nv Jan 26, 2024
33bf5ff
No longer need to explicitly import the C++ impl
dagardner-nv Jan 26, 2024
dbd7e1c
Update CR year
dagardner-nv Jan 26, 2024
e28fa33
Release gil prior to calling get_column_names()
dagardner-nv Jan 27, 2024
c440f24
Add test for get_column_names
dagardner-nv Jan 29, 2024
ec2e105
Emit cudf dataframes rather than pandas
dagardner-nv Jan 29, 2024
0120ba1
Emit cudf DFs instread of pandas
dagardner-nv Jan 29, 2024
e597c8e
Remove disabling of C++ mode
dagardner-nv Jan 29, 2024
3bf302f
update tests to expect cudf
dagardner-nv Jan 29, 2024
eaebd0e
Remove unused imports
dagardner-nv Jan 29, 2024
c22f6a3
Update CR year
dagardner-nv Jan 29, 2024
e0173bc
Pin pytest version to 7.4
dagardner-nv Jan 29, 2024
fd484a4
Update deps
dagardner-nv Jan 29, 2024
11b63c3
Merge branch 'branch-24.03' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Feb 8, 2024
b173230
Remove redundant call to pybind11::gil_scoped_release, this triggers …
dagardner-nv Feb 8, 2024
b83a9dd
No need to cast to pandas anymore
dagardner-nv Feb 8, 2024
4d8ca60
No need to explicitly use the C++ impl for messagemeta
dagardner-nv Feb 8, 2024
e735d84
Add docstrings, remove out of date comment
dagardner-nv Feb 8, 2024
a27fa91
Remove un-needed import of C++ impl
dagardner-nv Feb 8, 2024
280017a
Merge branch 'branch-24.03' into david-deserialize-control-message-1328
dagardner-nv Feb 8, 2024
f8c22a8
Remove unused import
dagardner-nv Feb 9, 2024
25052a0
pylint fixes
dagardner-nv Feb 9, 2024
396cf08
Merge branch 'david-deserialize-control-message-1328' of github.com:d…
dagardner-nv Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/llm/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,7 @@
callback=parse_log_level,
help="Specify the logging level to use.")
@click.option('--use_cpp',
default=False,
default=True,
type=bool,
help=("Whether or not to use C++ node and message types or to prefer python. "
"Only use as a last resort if bugs are encountered"))
Expand Down
5 changes: 4 additions & 1 deletion examples/llm/vdb_upload/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
import time

from morpheus.config import Config
from morpheus.config import CppConfig
from morpheus.config import PipelineModes
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
Expand Down Expand Up @@ -48,6 +49,8 @@ def pipeline(num_threads: int,
vector_db_resource_name: str,
triton_server_url: str):

# WebScraperStage requires C++ mode to be disabled
CppConfig.set_should_use_cpp(False)
config = Config()
config.mode = PipelineModes.NLP

Expand Down
130 changes: 113 additions & 17 deletions morpheus/_lib/include/morpheus/stages/deserialize.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,9 +17,11 @@

#pragma once

#include "morpheus/messages/control.hpp"
#include "morpheus/messages/meta.hpp"
#include "morpheus/messages/multi.hpp"
#include "morpheus/types.hpp" // for TensorIndex
#include "morpheus/types.hpp" // for TensorIndex
#include "morpheus/utilities/python_util.hpp" // for show_warning_message

#include <boost/fiber/context.hpp>
#include <boost/fiber/future/future.hpp>
Expand All @@ -30,6 +32,7 @@
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <mrc/types.hpp>
#include <nlohmann/json.hpp>
#include <pymrc/node.hpp>
#include <rxcpp/rx.hpp>
// IWYU pragma: no_include "rxcpp/sources/rx-iterate.hpp"
Expand All @@ -38,11 +41,13 @@
#include <memory>
#include <string>
#include <thread>
#include <utility> // for pair
#include <vector>

namespace morpheus {
/****** Component public implementations *******************/
/****** DeserializationStage********************************/
using namespace std::literals::string_literals;

/**
* @addtogroup stages
Expand All @@ -51,15 +56,25 @@ namespace morpheus {
*/

#pragma GCC visibility push(default)
/**
* @brief Slices incoming Dataframes into smaller `batch_size`'d chunks. This stage accepts the `MessageMeta` output
* from `FileSourceStage`/`KafkaSourceStage` stages breaking them up into into `MultiMessage`'s. This should be one of
* the first stages after the `Source` object.
*/
class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<MultiMessage>>
using cm_task_t = std::pair<std::string, nlohmann::json>;

void make_output_message(std::shared_ptr<MultiMessage>& full_message,
TensorIndex start,
TensorIndex stop,
cm_task_t* task,
std::shared_ptr<MultiMessage>& windowed_message);

void make_output_message(std::shared_ptr<MultiMessage>& full_message,
TensorIndex start,
TensorIndex stop,
cm_task_t* task,
std::shared_ptr<ControlMessage>& windowed_message);

template <typename OutputT>
class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<OutputT>>
{
public:
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<MultiMessage>>;
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<OutputT>>;
using typename base_t::sink_type_t;
using typename base_t::source_type_t;
using typename base_t::subscribe_fn_t;
Expand All @@ -70,22 +85,27 @@ class DeserializeStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMe
* @param batch_size Number of messages to be divided into each batch
* @param ensure_sliceable_index Whether or not to call `ensure_sliceable_index()` on all incoming `MessageMeta`
*/
DeserializeStage(TensorIndex batch_size, bool ensure_sliceable_index = true);
DeserializeStage(TensorIndex batch_size,
bool ensure_sliceable_index = true,
std::unique_ptr<cm_task_t> task = nullptr) :
base_t(base_t::op_factory_from_sub_fn(build_operator())),
m_batch_size(batch_size),
m_ensure_sliceable_index(ensure_sliceable_index),
m_task(std::move(task)){};

private:
/**
* TODO(Documentation)
*/
subscribe_fn_t build_operator();

TensorIndex m_batch_size;
bool m_ensure_sliceable_index{true};
std::unique_ptr<cm_task_t> m_task{nullptr};
};

/****** DeserializationStageInterfaceProxy******************/
/**
* @brief Interface proxy, used to insulate python bindings.
*/
template <typename OutputT>
struct DeserializeStageInterfaceProxy
{
/**
Expand All @@ -96,11 +116,87 @@ struct DeserializeStageInterfaceProxy
* @param batch_size : Number of messages to be divided into each batch
* @return std::shared_ptr<mrc::segment::Object<DeserializeStage>>
*/
static std::shared_ptr<mrc::segment::Object<DeserializeStage>> init(mrc::segment::Builder& builder,
const std::string& name,
TensorIndex batch_size,
bool ensure_sliceable_index);
static std::shared_ptr<mrc::segment::Object<DeserializeStage<OutputT>>> init(mrc::segment::Builder& builder,
const std::string& name,
TensorIndex batch_size,
bool ensure_sliceable_index,
const pybind11::object& task_type,
const pybind11::object& task_payload);
};

template <typename OutputT>
typename DeserializeStage<OutputT>::subscribe_fn_t DeserializeStage<OutputT>::build_operator()
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output](sink_type_t x) {
if (!x->has_sliceable_index())
{
if (m_ensure_sliceable_index)
{
auto old_index_name = x->ensure_sliceable_index();

if (old_index_name.has_value())
{
// Generate a warning
LOG(WARNING) << MORPHEUS_CONCAT_STR(
"Incoming MessageMeta does not have a unique and monotonic index. Updating index "
"to be unique. Existing index will be retained in column '"
<< *old_index_name << "'");
}
}
else
{
utilities::show_warning_message(
"Detected a non-sliceable index on an incoming MessageMeta. Performance when taking slices "
"of messages may be degraded. Consider setting `ensure_sliceable_index==True`",
PyExc_RuntimeWarning);
}
}

// Make one large MultiMessage
auto full_message = std::make_shared<MultiMessage>(x, 0, x->count());

// Loop over the MessageMeta and create sub-batches
for (TensorIndex i = 0; i < x->count(); i += this->m_batch_size)
{
std::shared_ptr<OutputT> windowed_message{nullptr};
make_output_message(
full_message, i, std::min(i + this->m_batch_size, x->count()), m_task.get(), windowed_message);
output.on_next(std::move(windowed_message));
}
},
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
output.on_completed();
}));
};
}

template <typename OutputT>
std::shared_ptr<mrc::segment::Object<DeserializeStage<OutputT>>> DeserializeStageInterfaceProxy<OutputT>::init(
mrc::segment::Builder& builder,
const std::string& name,
TensorIndex batch_size,
bool ensure_sliceable_index,
const pybind11::object& task_type,
const pybind11::object& task_payload)
{
std::unique_ptr<cm_task_t> task{nullptr};

if (!task_type.is_none() && !task_payload.is_none())
{
task = std::make_unique<cm_task_t>(pybind11::cast<std::string>(task_type),
mrc::pymrc::cast_from_pyobject(task_payload));
}

auto stage =
builder.construct_object<DeserializeStage<OutputT>>(name, batch_size, ensure_sliceable_index, std::move(task));

return stage;
}
#pragma GCC visibility pop
/** @} */ // end of group
} // namespace morpheus
4 changes: 3 additions & 1 deletion morpheus/_lib/include/morpheus/utilities/python_util.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -27,6 +27,7 @@ using PyObject = _object; // NOLINT(readability-identifier-naming)

namespace morpheus::utilities {

#pragma GCC visibility push(default)
/**
* @brief Shows a python warning using the `warnings.warn` module. These warnings can be suppressed and work different
* than `logger.warn()`
Expand All @@ -38,5 +39,6 @@ namespace morpheus::utilities {
void show_warning_message(const std::string& deprecation_message,
PyObject* category = nullptr,
ssize_t stack_level = 1);
#pragma GCC visibility pop

} // namespace morpheus::utilities
81 changes: 25 additions & 56 deletions morpheus/_lib/src/stages/deserialize.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -24,14 +24,15 @@
#include "mrc/segment/object.hpp"
#include "mrc/types.hpp"

#include "morpheus/messages/control.hpp"
#include "morpheus/types.hpp"
#include "morpheus/utilities/python_util.hpp"
#include "morpheus/utilities/string_util.hpp"

#include <glog/logging.h>
#include <mrc/segment/builder.hpp>
#include <pyerrors.h>
#include <pymrc/node.hpp>
#include <pymrc/utils.hpp> // for cast_from_pyobject
#include <rxcpp/rx.hpp>

#include <algorithm> // for min
Expand All @@ -43,65 +44,33 @@
#include <utility>

namespace morpheus {
// Component public implementations
// ************ DeserializationStage **************************** //
DeserializeStage::DeserializeStage(TensorIndex batch_size, bool ensure_sliceable_index) :
PythonNode(base_t::op_factory_from_sub_fn(build_operator())),
m_batch_size(batch_size),
m_ensure_sliceable_index(ensure_sliceable_index)
{}

DeserializeStage::subscribe_fn_t DeserializeStage::build_operator()
void make_output_message(std::shared_ptr<MultiMessage>& full_message,
TensorIndex start,
TensorIndex stop,
cm_task_t* task,
std::shared_ptr<MultiMessage>& windowed_message)
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output](sink_type_t x) {
if (!x->has_sliceable_index())
{
if (m_ensure_sliceable_index)
{
auto old_index_name = x->ensure_sliceable_index();

if (old_index_name.has_value())
{
// Generate a warning
LOG(WARNING) << MORPHEUS_CONCAT_STR(
"Incoming MessageMeta does not have a unique and monotonic index. Updating index "
"to be unique. Existing index will be retained in column '"
<< *old_index_name << "'");
}
}
else
{
utilities::show_warning_message(
"Detected a non-sliceable index on an incoming MessageMeta. Performance when taking slices "
"of messages may be degraded. Consider setting `ensure_sliceable_index==True`",
PyExc_RuntimeWarning);
}
}

// Make one large MultiMessage
auto full_message = std::make_shared<MultiMessage>(x, 0, x->count());

// Loop over the MessageMeta and create sub-batches
for (TensorIndex i = 0; i < x->count(); i += this->m_batch_size)
{
auto next = full_message->get_slice(i, std::min(i + this->m_batch_size, x->count()));

output.on_next(std::move(next));
}
},
[&](std::exception_ptr error_ptr) { output.on_error(error_ptr); },
[&]() { output.on_completed(); }));
};
DCHECK_EQ(task, nullptr) << "Task is not supported for MultiMessage";
auto sliced_msg = full_message->get_slice(start, stop);
windowed_message.swap(sliced_msg);
}

// ************ DeserializationStageInterfaceProxy ************* //
std::shared_ptr<mrc::segment::Object<DeserializeStage>> DeserializeStageInterfaceProxy::init(
mrc::segment::Builder& builder, const std::string& name, TensorIndex batch_size, bool ensure_sliceable_index)
void make_output_message(std::shared_ptr<MultiMessage>& full_message,
TensorIndex start,
TensorIndex stop,
cm_task_t* task,
std::shared_ptr<ControlMessage>& windowed_message)
{
auto stage = builder.construct_object<DeserializeStage>(name, batch_size, ensure_sliceable_index);
auto window = full_message->copy_ranges({{start, stop}}, stop - start);
auto new_message = std::make_shared<ControlMessage>();
new_message->payload(window->meta);
if (task)
{
new_message->add_task(task->first, task->second);
}

return stage;
windowed_message.swap(new_message);
}

} // namespace morpheus
8 changes: 6 additions & 2 deletions morpheus/_lib/stages/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import mrc.core.segment
__all__ = [
"AddClassificationsStage",
"AddScoresStage",
"DeserializeStage",
"DeserializeControlMessageStage",
"DeserializeMultiMessageStage",
"FileSourceStage",
"FilterDetectionsStage",
"FilterSource",
Expand All @@ -37,7 +38,10 @@ class AddClassificationsStage(mrc.core.segment.SegmentObject):
class AddScoresStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, idx2label: typing.Dict[int, str]) -> None: ...
pass
class DeserializeStage(mrc.core.segment.SegmentObject):
class DeserializeControlMessageStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, batch_size: int, ensure_sliceable_index: bool = True, task_type: object = None, task_payload: object = None) -> None: ...
pass
class DeserializeMultiMessageStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, batch_size: int, ensure_sliceable_index: bool = True) -> None: ...
pass
class FileSourceStage(mrc.core.segment.SegmentObject):
Expand Down
Loading
Loading