Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shm: Introduce first support for pools #484

Merged
merged 7 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/region/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
################################################################################

add_example(NAME region
DEVICE sampler sink keep-alive
DEVICE sampler processor sink keep-alive
SCRIPT region region-advanced
)
53 changes: 53 additions & 0 deletions examples/region/fairmq-start-ex-region-advanced.sh.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@

transport="shmem"
msgSize="1000000"

if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi

if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi

SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"
# SAMPLER+=" --sampling-rate 10"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &

PROCESSOR="fairmq-ex-region-processor"
PROCESSOR+=" --id processor1"
PROCESSOR+=" --severity debug"
PROCESSOR+=" --transport $transport"
PROCESSOR+=" --shm-monitor true"
PROCESSOR+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
PROCESSOR+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$PROCESSOR &

SINK1="fairmq-ex-region-sink"
SINK1+=" --id sink1"
SINK1+=" --severity debug"
SINK1+=" --chan-name data2"
SINK1+=" --transport $transport"
SINK1+=" --shm-monitor true"
SINK1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
xterm -geometry 120x32+1500+0 -hold -e @EX_BIN_DIR@/$SINK1 &

SINK2="fairmq-ex-region-sink"
SINK2+=" --id sink2"
SINK2+=" --severity debug"
SINK2+=" --chan-name data3"
SINK2+=" --transport $transport"
SINK2+=" --shm-monitor true"
SINK2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
xterm -geometry 120x32+1500+500 -hold -e @EX_BIN_DIR@/$SINK2 &
74 changes: 74 additions & 0 deletions examples/region/processor.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/********************************************************************************
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/

#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <memory>

namespace bpo = boost::program_options;
using namespace std;
using namespace fair::mq;

namespace {

struct Processor : Device
{
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
GetChannel("data1", 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged") << ", id: " << info.id
<< ", ptr: " << info.ptr << ", size: " << info.size
<< ", flags: " << info.flags;
});
}

void Run() override
{
Channel& dataIn = GetChannel("data1", 0);
Channel& dataOut1 = GetChannel("data2", 0);
Channel& dataOut2 = GetChannel("data3", 0);

while (!NewStatePending()) {
auto msg(dataIn.Transport()->CreateMessage());
dataIn.Receive(msg);

fair::mq::MessagePtr msgCopy1(NewMessage());
msgCopy1->Copy(*msg);
fair::mq::MessagePtr msgCopy2(NewMessage());
msgCopy2->Copy(*msg);

dataOut1.Send(msgCopy1);
dataOut2.Send(msgCopy2);

if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";
break;
}
}
}

void ResetTask() override
{
GetChannel("data1", 0).Transport()->UnsubscribeFromRegionEvents();
}

private:
uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0;
};

} // namespace

void addCustomOptions(bpo::options_description& options)
{
options.add_options()("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}

unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Processor>(); }
23 changes: 18 additions & 5 deletions examples/region/sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <fairmq/tools/RateLimit.h>

#include <cstdint>
#include <mutex>
Expand All @@ -23,8 +24,10 @@ struct Sampler : fair::mq::Device
fMsgSize = fConfig->GetProperty<int>("msg-size");
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChanName = fConfig->GetProperty<std::string>("chan-name");
fSamplingRate = fConfig->GetProperty<float>("sampling-rate");

GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) {
GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
Expand All @@ -43,7 +46,7 @@ struct Sampler : fair::mq::Device
regionCfg.lock = !fExternalRegion; // mlock region after creation
regionCfg.zero = !fExternalRegion; // zero region content after creation
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
"data", // region is created using the transport of this channel...
fChanName, // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
Expand All @@ -59,8 +62,11 @@ struct Sampler : fair::mq::Device

void Run() override
{

fair::mq::tools::RateLimiter rateLimiter(fSamplingRate);

while (!NewStatePending()) {
fair::mq::MessagePtr msg(NewMessageFor("data", // channel
fair::mq::MessagePtr msg(NewMessageFor(fChanName, // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
Expand All @@ -70,11 +76,14 @@ struct Sampler : fair::mq::Device

std::lock_guard<std::mutex> lock(fMtx);
++fNumUnackedMsgs;
if (Send(msg, "data", 0) > 0) {
if (Send(msg, fChanName, 0) > 0) {
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Stopping sending.";
break;
}
if (fSamplingRate > 0.001) {
rateLimiter.maybe_sleep();
}
}
}

Expand All @@ -99,7 +108,7 @@ struct Sampler : fair::mq::Device
void ResetTask() override
{
fRegion.reset();
GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents();
GetChannel(fChanName, 0).Transport()->UnsubscribeFromRegionEvents();
}

private:
Expand All @@ -111,12 +120,16 @@ struct Sampler : fair::mq::Device
fair::mq::UnmanagedRegionPtr fRegion = nullptr;
std::mutex fMtx;
uint64_t fNumUnackedMsgs = 0;
std::string fChanName;
float fSamplingRate = 0.;
};

void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("chan-name", bpo::value<std::string>()->default_value("data"), "name of the output channel")
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("sampling-rate", bpo::value<float>()->default_value(0.), "Sampling rate (Hz).")
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
Expand Down
19 changes: 10 additions & 9 deletions examples/region/sink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ struct Sink : Device
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
fChanName = fConfig->GetProperty<std::string>("chan-name");
GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged") << ", id: " << info.id
<< ", ptr: " << info.ptr << ", size: " << info.size
Expand All @@ -32,11 +33,11 @@ struct Sink : Device

void Run() override
{
Channel& dataInChannel = GetChannel("data", 0);
Channel& dataIn = GetChannel(fChanName, 0);

while (!NewStatePending()) {
auto msg(dataInChannel.Transport()->CreateMessage());
dataInChannel.Receive(msg);
auto msg(dataIn.Transport()->CreateMessage());
dataIn.Receive(msg);

// void* ptr = msg->GetData();
// char* cptr = static_cast<char*>(ptr);
Expand All @@ -51,22 +52,22 @@ struct Sink : Device

void ResetTask() override
{
GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents();
GetChannel(fChanName, 0).Transport()->UnsubscribeFromRegionEvents();
}

private:
uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0;
std::string fChanName;
};

} // namespace

void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"max-iterations",
bpo::value<uint64_t>()->default_value(0),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
options.add_options()
("chan-name", bpo::value<std::string>()->default_value("data"), "name of the input channel")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}

unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Sink>(); }
1 change: 1 addition & 0 deletions fairmq/UnmanagedRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ struct RegionConfig
int creationFlags = 0; /// flags passed to the underlying transport on region creation
int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user
uint64_t size = 0; /// region size
uint64_t rcSegmentSize = 10000000; /// region size
std::string path = ""; /// file path, if the region is backed by a file
std::optional<uint16_t> id = std::nullopt; /// region id
uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events
Expand Down
Loading