Skip to content

Commit

Permalink
Simplify Python impl for KafkaSourceStage (#300)
Browse files Browse the repository at this point in the history
Current impl is overly complicated making use of undocumented methods.
Removes dependency on `cudf_kafka`.

Fixes #294

Authors:
  - David Gardner (https://github.com/dagardner-nv)
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #300
  • Loading branch information
dagardner-nv authored Nov 15, 2022
1 parent 8ed1836 commit e580adc
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 245 deletions.
3 changes: 2 additions & 1 deletion docker/conda/environments/cuda11.5_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ dependencies:
- cuda-python<=11.7.0 # Remove when Issue #251 is closed
- cudatoolkit=11.5
- cudf 22.08
- cudf_kafka 22.08.*
- cupy=9.5.0
- cython=0.29.24
- datacompy=0.8
Expand All @@ -57,6 +56,7 @@ dependencies:
- gxx_linux-64=9.4
- include-what-you-use=0.18
- isort
- librdkafka=1.7.0
- mlflow>=1.23
- myst-parser==0.17
- networkx=2.8
Expand All @@ -74,6 +74,7 @@ dependencies:
- pytest
- pytest-benchmark>=4.0
- pytest-cov
- python-confluent-kafka=1.7.0
- python-graphviz
- python=3.8
- rapidjson=1.1.0
Expand Down
7 changes: 5 additions & 2 deletions morpheus/_lib/include/morpheus/stages/kafka_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class KafkaSourceStage : public srf::pysrf::PythonSource<std::shared_ptr<Message
std::map<std::string, std::string> config,
bool disable_commit = false,
bool disable_pre_filtering = false,
size_t stop_after = 0);
size_t stop_after = 0,
bool async_commits = true);

~KafkaSourceStage() override = default;

Expand Down Expand Up @@ -107,6 +108,7 @@ class KafkaSourceStage : public srf::pysrf::PythonSource<std::shared_ptr<Message
bool m_disable_commit{false};
bool m_disable_pre_filtering{false};
bool m_requires_commit{false}; // Whether or not manual committing is required
bool m_async_commits{true};
size_t m_stop_after{0};

void *m_rebalancer;
Expand All @@ -129,7 +131,8 @@ struct KafkaSourceStageInterfaceProxy
std::map<std::string, std::string> config,
bool disable_commits,
bool disable_pre_filtering,
size_t stop_after = 0);
size_t stop_after = 0,
bool async_commits = true);
};
#pragma GCC visibility pop
} // namespace morpheus
3 changes: 2 additions & 1 deletion morpheus/_lib/src/python_modules/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ PYBIND11_MODULE(stages, m)
py::arg("config"),
py::arg("disable_commits") = false,
py::arg("disable_pre_filtering") = false,
py::arg("stop_after") = 0);
py::arg("stop_after") = 0,
py::arg("async_commits") = true);

py::class_<srf::segment::Object<PreprocessFILStage>,
srf::segment::ObjectProperties,
Expand Down
29 changes: 23 additions & 6 deletions morpheus/_lib/src/stages/kafka_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,17 @@ KafkaSourceStage::KafkaSourceStage(std::size_t max_batch_size,
std::map<std::string, std::string> config,
bool disable_commit,
bool disable_pre_filtering,
size_t stop_after) :
size_t stop_after,
bool async_commits) :
PythonSource(build()),
m_max_batch_size(max_batch_size),
m_topic(std::move(topic)),
m_batch_timeout_ms(batch_timeout_ms),
m_config(std::move(config)),
m_disable_commit(disable_commit),
m_disable_pre_filtering(disable_pre_filtering),
m_stop_after{stop_after}
m_stop_after{stop_after},
m_async_commits(async_commits)
{}

KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build()
Expand Down Expand Up @@ -334,7 +336,14 @@ KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build()

if (should_commit)
{
CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync");
if (m_async_commits)
{
CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync");
}
else
{
CHECK_KAFKA(consumer->commitSync(), RdKafka::ERR_NO_ERROR, "Error during commit");
}
}
}

Expand Down Expand Up @@ -571,10 +580,18 @@ std::shared_ptr<srf::segment::Object<KafkaSourceStage>> KafkaSourceStageInterfac
std::map<std::string, std::string> config,
bool disable_commits,
bool disable_pre_filtering,
size_t stop_after)
size_t stop_after,
bool async_commits)
{
auto stage = builder.construct_object<KafkaSourceStage>(
name, max_batch_size, topic, batch_timeout_ms, config, disable_commits, disable_pre_filtering, stop_after);
auto stage = builder.construct_object<KafkaSourceStage>(name,
max_batch_size,
topic,
batch_timeout_ms,
config,
disable_commits,
disable_pre_filtering,
stop_after,
async_commits);

return stage;
}
Expand Down
Loading

0 comments on commit e580adc

Please sign in to comment.