diff --git a/CMakeLists.txt b/CMakeLists.txt index e93b90c..be7c81b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,7 +80,7 @@ message(STATUS "XEUS_ZMQ_BUILD_TESTS: ${XEUS_ZMQ_BUILD_TESTS}") # ============ set(xeus_REQUIRED_VERSION 3.2.0) -set(nlohmann_json_REQUIRED_VERSION 3.2.0) +set(nlohmann_json_REQUIRED_VERSION 3.11.2) set(cppzmq_REQUIRED_VERSION 4.8.1) set(zeromq_REQUIRED_VERSION 4.3.2) @@ -132,6 +132,7 @@ set(XEUS_ZMQ_HEADERS ${XEUS_ZMQ_INCLUDE_DIR}/xeus-zmq/xthread.hpp ${XEUS_ZMQ_INCLUDE_DIR}/xeus-zmq/xzmq_context.hpp ${XEUS_ZMQ_INCLUDE_DIR}/xeus-zmq/xzmq_serializer.hpp + ${XEUS_ZMQ_INCLUDE_DIR}/xeus-zmq/xclient_zmq.hpp ) set(XEUS_ZMQ_SOURCES @@ -164,6 +165,15 @@ set(XEUS_ZMQ_SOURCES ${XEUS_ZMQ_SOURCE_DIR}/xzmq_messenger.hpp ${XEUS_ZMQ_SOURCE_DIR}/xzmq_messenger.cpp ${XEUS_ZMQ_SOURCE_DIR}/xzmq_serializer.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xclient_zmq.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xclient_messenger.hpp + ${XEUS_ZMQ_SOURCE_DIR}/xclient_messenger.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xclient_zmq_impl.hpp + ${XEUS_ZMQ_SOURCE_DIR}/xclient_zmq_impl.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xdealer_channel.hpp + ${XEUS_ZMQ_SOURCE_DIR}/xdealer_channel.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xiopub_client.hpp + ${XEUS_ZMQ_SOURCE_DIR}/xiopub_client.cpp ) # Targets and link diff --git a/README.md b/README.md index b3cf513..15ad4a5 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ See the [documentation](http://xeus.readthedocs.io/) for an exhaustive list of t We have packaged all these dependencies on conda-forge. The simplest way to install them is to run: ```bash -mamba install cmake pkg-config zeromq cppzmq OpenSSL nlohmann_json xtl xeus -c conda-forge +mamba install cmake pkg-config zeromq cppzmq OpenSSL nlohmann_json=3.11.2 xtl xeus -c conda-forge ``` Once you have installed the dependencies, you can build and install `xeus-zmq`: diff --git a/environment-dev.yml b/environment-dev.yml index 77576c1..77d5403 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -15,7 +15,7 @@ dependencies: - xeus>=3.2.0,<4.0 - OpenSSL=1 - libopenssl-static=1 - - nlohmann_json + - nlohmann_json=3.11.2 # Test dependencies - doctest >= 2.4.6 - pytest diff --git a/include/xeus-zmq/xclient_zmq.hpp b/include/xeus-zmq/xclient_zmq.hpp new file mode 100644 index 0000000..9eed125 --- /dev/null +++ b/include/xeus-zmq/xclient_zmq.hpp @@ -0,0 +1,67 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_CLIENT_ZMQ_HPP +#define XEUS_CLIENT_ZMQ_HPP + +#include + +#include + +#include "xeus/xeus_context.hpp" +#include "xeus/xkernel_configuration.hpp" +#include "xeus/xmessage.hpp" + +#include "xeus-zmq.hpp" + +namespace xeus +{ + class xclient_zmq_impl; + + class XEUS_ZMQ_API xclient_zmq + { + public: + + using listener = std::function; + + explicit xclient_zmq(std::unique_ptr impl); + ~xclient_zmq(); + + void send_on_shell(xmessage msg); + void send_on_control(xmessage msg); + + std::optional check_shell_answer(); + std::optional check_control_answer(); + + void register_shell_listener(const listener& l); + void register_control_listener(const listener& l); + void register_iopub_listener(const listener& l); + + void notify_shell_listener(xmessage msg); + void notify_control_listener(xmessage msg); + void notify_iopub_listener(xmessage msg); + + std::size_t iopub_queue_size() const; + std::optional pop_iopub_message(); + void connect(); + void stop_channels(); + void start(); + void wait_for_message(); + + private: + std::unique_ptr p_client_impl; + }; + + XEUS_ZMQ_API + std::unique_ptr make_xclient_zmq(xcontext& context, + const xconfiguration& config, + nl::json::error_handler_t eh = nl::json::error_handler_t::strict); +} + +#endif \ No newline at end of file diff --git a/src/xclient_messenger.cpp b/src/xclient_messenger.cpp new file mode 100644 index 0000000..bb2220f --- /dev/null +++ b/src/xclient_messenger.cpp @@ -0,0 +1,42 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include "nlohmann/json.hpp" +#include "xeus-zmq/xmiddleware.hpp" +#include "xclient_messenger.hpp" + +namespace nl = nlohmann; + +namespace xeus +{ + xclient_messenger::xclient_messenger(zmq::context_t& context) + : m_iopub_controller(context, zmq::socket_type::req) + { + } + + xclient_messenger::~xclient_messenger() + { + } + + void xclient_messenger::connect() + { + m_iopub_controller.set(zmq::sockopt::linger, get_socket_linger()); + m_iopub_controller.connect(get_controller_end_point("iopub")); + } + + void xclient_messenger::stop_channels() + { + zmq::message_t stop_msg("stop", 4); + zmq::message_t response; + + // Wait for iopub answer + m_iopub_controller.send(stop_msg, zmq::send_flags::none); + (void)m_iopub_controller.recv(response); + } +} \ No newline at end of file diff --git a/src/xclient_messenger.hpp b/src/xclient_messenger.hpp new file mode 100644 index 0000000..7975436 --- /dev/null +++ b/src/xclient_messenger.hpp @@ -0,0 +1,31 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_CLIENT_MESSENGER_HPP +#define XEUS_CLIENT_MESSENGER_HPP + +#include + +namespace xeus +{ + class xclient_messenger + { + public: + explicit xclient_messenger(zmq::context_t& context); + virtual ~xclient_messenger(); + + void connect(); + void stop_channels(); + + private: + zmq::socket_t m_iopub_controller; + }; +} + +#endif \ No newline at end of file diff --git a/src/xclient_zmq.cpp b/src/xclient_zmq.cpp new file mode 100644 index 0000000..ad9dc54 --- /dev/null +++ b/src/xclient_zmq.cpp @@ -0,0 +1,113 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include "xeus-zmq/xclient_zmq.hpp" +#include "xclient_zmq_impl.hpp" + +namespace xeus +{ + + xclient_zmq::xclient_zmq(std::unique_ptr impl) + : p_client_impl(std::move(impl)) + { + } + + // Has to be in the cpp because incomplete + // types are used in unique_ptr in the header + xclient_zmq::~xclient_zmq() = default; + + + void xclient_zmq::send_on_shell(xmessage msg) + { + p_client_impl->send_on_shell(std::move(msg)); + } + + void xclient_zmq::send_on_control(xmessage msg) + { + p_client_impl->send_on_control(std::move(msg)); + } + + std::optional xclient_zmq::check_shell_answer() + { + return p_client_impl->receive_on_shell(-1); + } + + std::optional xclient_zmq::check_control_answer() + { + return p_client_impl->receive_on_control(-1); + } + + void xclient_zmq::register_shell_listener(const listener& l) + { + p_client_impl->register_shell_listener(l); + } + + void xclient_zmq::register_control_listener(const listener& l) + { + p_client_impl->register_control_listener(l); + } + + void xclient_zmq::register_iopub_listener(const listener& l) + { + p_client_impl->register_iopub_listener(l); + } + + void xclient_zmq::notify_shell_listener(xmessage msg) + { + p_client_impl->notify_shell_listener(std::move(msg)); + } + + void xclient_zmq::notify_control_listener(xmessage msg) + { + p_client_impl->notify_control_listener(std::move(msg)); + } + + void xclient_zmq::notify_iopub_listener(xmessage msg) + { + p_client_impl->notify_iopub_listener(std::move(msg)); + } + + std::size_t xclient_zmq::iopub_queue_size() const + { + return p_client_impl->iopub_queue_size(); + } + + std::optional xclient_zmq::pop_iopub_message() + { + return p_client_impl->pop_iopub_message(); + } + + void xclient_zmq::connect() + { + p_client_impl->connect(); + } + + void xclient_zmq::stop_channels() + { + p_client_impl->stop_channels(); + } + + void xclient_zmq::start() + { + p_client_impl->start(); + } + + void xclient_zmq::wait_for_message() + { + p_client_impl->wait_for_message(); + } + + std::unique_ptr make_xclient_zmq(xcontext& context, + const xconfiguration& config, + nl::json::error_handler_t eh) + { + auto impl = std::make_unique(context.get_wrapped_context(), config, eh); + return std::make_unique(std::move(impl)); + } +} diff --git a/src/xclient_zmq_impl.cpp b/src/xclient_zmq_impl.cpp new file mode 100644 index 0000000..699ade0 --- /dev/null +++ b/src/xclient_zmq_impl.cpp @@ -0,0 +1,182 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include + +#include "xeus-zmq/xauthentication.hpp" +#include "xclient_zmq_impl.hpp" +#include "xeus-zmq/xzmq_serializer.hpp" + +namespace xeus +{ + + xclient_zmq_impl::xclient_zmq_impl(zmq::context_t& context, + const xeus::xconfiguration& config, + nl::json::error_handler_t eh) + : p_auth(make_xauthentication(config.m_signature_scheme, config.m_key)) + , m_shell_client(context, config.m_transport, config.m_ip, config.m_shell_port) + , m_control_client(context, config.m_transport, config.m_ip, config.m_control_port) + , m_iopub_client(context, config) + , p_messenger(context) + , m_error_handler(eh) + { + } + + // Has to be in the cpp because incomplete + // types are used in unique_ptr in the header + xclient_zmq_impl::~xclient_zmq_impl() = default; + + void xclient_zmq_impl::send_on_shell(xmessage msg) + { + zmq::multipart_t wire_msg = xzmq_serializer::serialize(std::move(msg), *p_auth, m_error_handler); + m_shell_client.send_message(wire_msg); + } + + void xclient_zmq_impl::send_on_control(xmessage msg) + { + zmq::multipart_t wire_msg = xzmq_serializer::serialize(std::move(msg), *p_auth, m_error_handler); + m_control_client.send_message(wire_msg); + } + + std::optional xclient_zmq_impl::receive_on_shell(long timeout) + { + std::optional wire_msg = m_shell_client.receive_message(timeout); + + if (wire_msg.has_value()) + { + return deserialize(wire_msg.value()); + } else { + return std::nullopt; + } + } + + std::optional xclient_zmq_impl::receive_on_control(long timeout) + { + std::optional wire_msg = m_control_client.receive_message(timeout); + + if (wire_msg.has_value()) + { + return deserialize(wire_msg.value()); + } else { + return std::nullopt; + } + } + + void xclient_zmq_impl::register_shell_listener(const listener& l) + { + m_shell_listener = l; + } + + void xclient_zmq_impl::register_control_listener(const listener& l) + { + m_control_listener = l; + } + + std::size_t xclient_zmq_impl::iopub_queue_size() const + { + return m_iopub_client.iopub_queue_size(); + } + + std::optional xclient_zmq_impl::pop_iopub_message() + { + return m_iopub_client.pop_iopub_message(); + } + + void xclient_zmq_impl::register_iopub_listener(const listener& l) + { + m_iopub_listener = l; + } + + void xclient_zmq_impl::connect() + { + p_messenger.connect(); + } + + void xclient_zmq_impl::stop_channels() + { + p_messenger.stop_channels(); + } + + void xclient_zmq_impl::notify_shell_listener(xmessage msg) + { + m_shell_listener(std::move(msg)); + } + + void xclient_zmq_impl::notify_control_listener(xmessage msg) + { + m_control_listener(std::move(msg)); + } + + void xclient_zmq_impl::notify_iopub_listener(xmessage msg) + { + m_iopub_listener(std::move(msg)); + } + + void xclient_zmq_impl::poll(long timeout) + { + zmq::multipart_t wire_msg; + zmq::pollitem_t items[] + = { { m_shell_client.get_socket(), 0, ZMQ_POLLIN, 0 }, { m_control_client.get_socket(), 0, ZMQ_POLLIN, 0 } }; + + while (true) + { + zmq::poll(&items[0], 2, std::chrono::milliseconds(timeout)); + try + { + if (items[0].revents & ZMQ_POLLIN) + { + wire_msg.recv(m_shell_client.get_socket()); + xmessage msg = deserialize(wire_msg); + notify_shell_listener(std::move(msg)); + return; + } + if (items[1].revents & ZMQ_POLLIN) + { + wire_msg.recv(m_control_client.get_socket()); + xmessage msg = deserialize(wire_msg); + notify_control_listener(std::move(msg)); + return; + } + } + catch (std::exception& e) + { + std::cerr << e.what() << std::endl; + } + } + } + + void xclient_zmq_impl::wait_for_message() + { + std::optional pending_message = pop_iopub_message(); + + if (pending_message.has_value()) + { + notify_iopub_listener(std::move(*pending_message)); + } else { + poll(-1); + } + } + + void xclient_zmq_impl::start() + { + start_iopub_thread(); + // TODO : Introduce a client, xheartbeat_client that runs on its own thread, m_heartbeat_thread. + } + + void xclient_zmq_impl::start_iopub_thread() + { + m_iopub_thread = std::move(xthread(&xiopub_client::run, p_iopub_client.get())); + } + + xmessage xclient_zmq_impl::deserialize(zmq::multipart_t& wire_msg) const + { + return xzmq_serializer::deserialize(wire_msg, *p_auth); + } + +} \ No newline at end of file diff --git a/src/xclient_zmq_impl.hpp b/src/xclient_zmq_impl.hpp new file mode 100644 index 0000000..30cfdcb --- /dev/null +++ b/src/xclient_zmq_impl.hpp @@ -0,0 +1,104 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_CLIENT_ZMQ_IMPL_HPP +#define XEUS_CLIENT_ZMQ_IMPL_HPP + +#include +#include "zmq.hpp" + +#include "xeus/xeus_context.hpp" +#include "xeus/xkernel_configuration.hpp" +#include "xeus/xmessage.hpp" + +#include "xeus-zmq/xthread.hpp" + +#include "xdealer_channel.hpp" +#include "xiopub_client.hpp" +#include "xclient_messenger.hpp" + +namespace xeus +{ + class xauthentication; + + class xclient_zmq_impl + { + public: + using iopub_client_ptr = std::unique_ptr; + using listener = std::function; + + xclient_zmq_impl(zmq::context_t& context, + const xconfiguration& config, + nl::json::error_handler_t eh); + + ~xclient_zmq_impl(); + + xclient_zmq_impl(const xclient_zmq_impl&) = delete; + xclient_zmq_impl& operator=(const xclient_zmq_impl&) = delete; + + xclient_zmq_impl(xclient_zmq_impl&&) = delete; + xclient_zmq_impl& operator=(xclient_zmq_impl&&) = delete; + + // shell channel + void send_on_shell(xmessage msg); + std::optional receive_on_shell(long timeout); + void register_shell_listener(const listener& l); + + // control channel + void send_on_control(xmessage msg); + std::optional receive_on_control(long timeout); + void register_control_listener(const listener& l); + + // iopub channel + std::size_t iopub_queue_size() const; + std::optional pop_iopub_message(); + void register_iopub_listener(const listener& l); + + // hearbeat channel + // TODO + + // client messenger + void connect(); + void stop_channels(); + + void notify_shell_listener(xmessage msg); + void notify_control_listener(xmessage msg); + void notify_iopub_listener(xmessage msg); + + void wait_for_message(); + void start(); + + xmessage deserialize(zmq::multipart_t& wire_msg) const; + + private: + void start_iopub_thread(); + void poll(long timeout); + + using authentication_ptr = std::unique_ptr; + authentication_ptr p_auth; + + xdealer_channel m_shell_client; + xdealer_channel m_control_client; + xiopub_client m_iopub_client; + + xclient_messenger p_messenger; + + nl::json::error_handler_t m_error_handler; + + listener m_shell_listener; + listener m_control_listener; + listener m_iopub_listener; + + iopub_client_ptr p_iopub_client; + + xthread m_iopub_thread; + }; +} + +#endif \ No newline at end of file diff --git a/src/xdealer_channel.cpp b/src/xdealer_channel.cpp new file mode 100644 index 0000000..81c986c --- /dev/null +++ b/src/xdealer_channel.cpp @@ -0,0 +1,51 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include "xeus-zmq/xmiddleware.hpp" + +#include "xdealer_channel.hpp" + +namespace xeus +{ + + xdealer_channel::xdealer_channel(zmq::context_t& context, + const std::string& transport, + const std::string& ip, + const std::string& port) + : m_socket(context, zmq::socket_type::dealer) + { + m_socket.connect(get_end_point(transport, ip, port)); + } + + xdealer_channel::~xdealer_channel() + { + } + + void xdealer_channel::send_message(zmq::multipart_t& message) + { + message.send(m_socket); + } + + std::optional xdealer_channel::receive_message(long timeout) + { + zmq::multipart_t wire_msg; + m_socket.set(zmq::sockopt::linger, static_cast(timeout)); + if (wire_msg.recv(m_socket)) + { + return wire_msg; + } else { + return std::nullopt; + } + } + + zmq::socket_t& xdealer_channel::get_socket() + { + return m_socket; + } +} \ No newline at end of file diff --git a/src/xdealer_channel.hpp b/src/xdealer_channel.hpp new file mode 100644 index 0000000..4893dc2 --- /dev/null +++ b/src/xdealer_channel.hpp @@ -0,0 +1,43 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_DEALER_CHANNEL_HPP +#define XEUS_DEALER_CHANNEL_HPP + +#include "zmq.hpp" +#include "zmq_addon.hpp" + +#include "nlohmann/json.hpp" +#include "xeus/xkernel_configuration.hpp" + +namespace xeus +{ + + class xdealer_channel + { + public: + + xdealer_channel(zmq::context_t& context, + const std::string& transport, + const std::string& ip, + const std::string& port); + + ~xdealer_channel(); + + void send_message(zmq::multipart_t& message); + std::optional receive_message(long timeout); + + zmq::socket_t& get_socket(); + + private: + zmq::socket_t m_socket; + }; +} + +#endif \ No newline at end of file diff --git a/src/xiopub_client.cpp b/src/xiopub_client.cpp new file mode 100644 index 0000000..3519766 --- /dev/null +++ b/src/xiopub_client.cpp @@ -0,0 +1,83 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include + +#include "xiopub_client.hpp" +#include "xclient_zmq_impl.hpp" + +#include "xeus-zmq/xzmq_serializer.hpp" +#include "xeus-zmq/xmiddleware.hpp" + +namespace xeus +{ + + xiopub_client::xiopub_client(zmq::context_t& context, + const xeus::xconfiguration& config) + : m_iopub(context, zmq::socket_type::sub) + , m_controller(context, zmq::socket_type::rep) + { + m_iopub.connect(get_end_point(config.m_transport, config.m_ip, config.m_iopub_port)); + init_socket(m_controller, get_controller_end_point("iopub")); + } + + xiopub_client::~xiopub_client() + { + } + + std::size_t xiopub_client::iopub_queue_size() const + { + std::lock_guard guard(m_queue_mutex); + return m_message_queue.size(); + } + + std::optional xiopub_client::pop_iopub_message() + { + std::lock_guard guard(m_queue_mutex); + if (!m_message_queue.empty()) + { + xmessage msg = std::move(m_message_queue.back()); + m_message_queue.pop(); + return msg; + } else { + return std::nullopt; + } + } + + void xiopub_client::run() + { + zmq::pollitem_t items[] = { + { m_iopub, 0, ZMQ_POLLIN, 0 } + }; + + while (true) + { + zmq::poll(&items[0], 1, std::chrono::milliseconds(-1)); + + if (items[0].revents & ZMQ_POLLIN) + { + zmq::multipart_t wire_msg; + wire_msg.recv(m_iopub); + try + { + xmessage msg = p_client_impl->deserialize(wire_msg); + { + std::lock_guard guard(m_queue_mutex); + m_message_queue.push(std::move(msg)); + } + p_client_impl->notify_shell_listener(std::move(msg)); + } + catch(std::exception& e) + { + std::cerr << e.what() << std::endl; + } + } + } + } +} \ No newline at end of file diff --git a/src/xiopub_client.hpp b/src/xiopub_client.hpp new file mode 100644 index 0000000..f374e76 --- /dev/null +++ b/src/xiopub_client.hpp @@ -0,0 +1,55 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_IOPUB_CLIENT_HPP +#define XEUS_IOPUB_CLIENT_HPP + +#include +#include + +#include "zmq.hpp" +#include "nlohmann/json.hpp" + +#include "xeus/xmessage.hpp" +#include "xeus/xeus_context.hpp" +#include "xeus/xkernel_configuration.hpp" + +#include "xeus-zmq/xthread.hpp" +#include "xeus-zmq/xmiddleware.hpp" + +namespace xeus +{ + class xclient_zmq_impl; + + class xiopub_client + { + public: + + xiopub_client(zmq::context_t& context, + const xeus::xconfiguration& config); + + ~xiopub_client(); + + std::size_t iopub_queue_size() const; + std::optional pop_iopub_message(); + + void run(); + + private: + zmq::socket_t m_iopub; + zmq::socket_t m_controller; + + std::queue m_message_queue; + mutable std::mutex m_queue_mutex; + + xclient_zmq_impl* p_client_impl; + }; +} + +#endif \ No newline at end of file diff --git a/xeus-zmqConfig.cmake.in b/xeus-zmqConfig.cmake.in index 3f8761d..66bc431 100644 --- a/xeus-zmqConfig.cmake.in +++ b/xeus-zmqConfig.cmake.in @@ -22,7 +22,7 @@ set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR};${CMAKE_MODULE_PATH}") @XEUS_ZMQ_CONFIG_CODE@ include(CMakeFindDependencyMacro) -find_dependency(nlohmann_json @nlohmann_json_REQUIRED_VERSION@) +find_dependency(nlohmann_json @nlohmann_json_VERSION@ EXACT) find_dependency(xeus @xeus_REQUIRED_VERSION@) # On Unix platforms, ZeroMQ is built with autotools and pkg-config is