From 2b49059a2163812187db8702f9a58cabcafa805c Mon Sep 17 00:00:00 2001 From: idofront <139764549+idofront@users.noreply.github.com> Date: Sat, 7 Dec 2024 20:36:29 +0900 Subject: [PATCH] =?UTF-8?q?Producer=20=E3=82=92=20Runnable=20=E5=8C=96=20(?= =?UTF-8?q?#32)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main/TrafficPlayer.cpp | 24 ++++++--- src/Producer/Producer.cpp | 34 +++++++++++++ src/Producer/Producer.hpp | 21 ++++++++ src/TimingAdjuster/ReserveTimingAdjuster.cpp | 53 ++------------------ src/TimingAdjuster/ReserveTimingAdjuster.hpp | 18 ++----- 5 files changed, 78 insertions(+), 72 deletions(-) create mode 100644 src/Producer/Producer.cpp create mode 100644 src/Producer/Producer.hpp diff --git a/main/TrafficPlayer.cpp b/main/TrafficPlayer.cpp index f258298..739705b 100644 --- a/main/TrafficPlayer.cpp +++ b/main/TrafficPlayer.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -74,13 +75,23 @@ int main(int argc, char *argv[]) auto dealerThread = std::thread([&dealer]() { dealer.Run(); }); - auto producer = ReserveTimingAdjuster(queuePtr); - auto producerThread = std::thread([&producer]() { producer.Run(); }); + auto reserveTimeQueuePtr = std::make_shared>(1024); + + // Create reserve timing adjuster + auto reserveTimingAdjuster = ReserveTimingAdjuster(reserveTimeQueuePtr); + + // Create producers + auto producerFuturePtrs = std::vector>(); + for (auto i = 0; i < NUM_OF_PRODUCERS; i++) + { + auto producerFuturePtr = employer.Submit(std::make_shared(reserveTimeQueuePtr, queuePtr)); + producerFuturePtrs.push_back(producerFuturePtr); + } auto repeatCount = options.RepeatCount(); uint64_t repeat = 0; - // Read pcap file + // Read pcap file and make traffic records to be reserved and sent later by the producer. auto trafficRecords = trafficMakerPtr->Make(); while (++repeat) { @@ -101,7 +112,9 @@ int main(int argc, char *argv[]) } std::for_each(trafficRecords.begin(), trafficRecords.end(), - [&producer](const TrafficRecord &trafficRecord) { producer.Produce(trafficRecord); }); + [&reserveTimingAdjuster](const TrafficRecord &trafficRecord) { + reserveTimingAdjuster.Adjust(trafficRecord); + }); } do @@ -113,9 +126,6 @@ int main(int argc, char *argv[]) // This implementation is forceful and may cause packet loss. try { - producer.TryTerminate(); - producerThread.join(); - dealer.TryTerminate(); dealerThread.join(); diff --git a/src/Producer/Producer.cpp b/src/Producer/Producer.cpp new file mode 100644 index 0000000..5fd9e52 --- /dev/null +++ b/src/Producer/Producer.cpp @@ -0,0 +1,34 @@ +#include + +Producer::Producer(std::shared_ptr> inputQueue, + std::shared_ptr> outputQueue) + : _InputQueue(inputQueue), _OutputQueue(outputQueue) +{ +} + +void Producer::Task() +{ + while (IsContinue()) + { + auto dequeueTimeout = std::chrono::milliseconds(1); + auto reserveTimeRecordOpt = _InputQueue->Dequeue(dequeueTimeout); + if (!reserveTimeRecordOpt.has_value()) + { + continue; + } + + { + auto reserveTimeRecord = reserveTimeRecordOpt.value(); + auto reservationTime = reserveTimeRecord.ReservationTime(); + SleepUntil(reservationTime); + // When woken up, check this task should continue. + if (!IsContinue()) + { + continue; + } + + auto trafficRecord = TrafficRecord(reserveTimeRecord.Data(), std::chrono::nanoseconds(0)); + _OutputQueue->Enqueue(trafficRecord); + } + } +} diff --git a/src/Producer/Producer.hpp b/src/Producer/Producer.hpp new file mode 100644 index 0000000..e0552a5 --- /dev/null +++ b/src/Producer/Producer.hpp @@ -0,0 +1,21 @@ +#ifndef PRODUCER__PRODUCER_HPP +#define PRODUCER__PRODUCER_HPP + +#include +#include +#include +#include + +class Producer : public Thread::Runnable +{ + public: + Producer(std::shared_ptr> inputQueue, + std::shared_ptr> outputQueue); + virtual void Task() override; + + private: + std::shared_ptr> _InputQueue; + std::shared_ptr> _OutputQueue; +}; + +#endif diff --git a/src/TimingAdjuster/ReserveTimingAdjuster.cpp b/src/TimingAdjuster/ReserveTimingAdjuster.cpp index 6a4a546..d3bfaf8 100644 --- a/src/TimingAdjuster/ReserveTimingAdjuster.cpp +++ b/src/TimingAdjuster/ReserveTimingAdjuster.cpp @@ -1,12 +1,11 @@ #include -ReserveTimingAdjuster::ReserveTimingAdjuster(std::shared_ptr> queue) - : _Queue(queue), _ReserveQueue(std::make_shared>(128)), - _LatestReservationTime(std::chrono::system_clock::now()), _ThreadPool(), _IsRequestedToTerminate(false) +ReserveTimingAdjuster::ReserveTimingAdjuster(std::shared_ptr> queue) + : _ReserveQueue(queue), _LatestReservationTime(std::chrono::system_clock::now()) { } -void ReserveTimingAdjuster::Produce(const TrafficRecord &trafficRecord) +void ReserveTimingAdjuster::Adjust(const TrafficRecord &trafficRecord) { auto now = std::chrono::system_clock::now(); if (_LatestReservationTime > now) @@ -24,49 +23,3 @@ void ReserveTimingAdjuster::Produce(const TrafficRecord &trafficRecord) _ReserveQueue->Enqueue(reserveTimeRecord); _LatestReservationTime = reserveSendTime; } - -void ReserveTimingAdjuster::Run() -{ - auto threadCount = std::thread::hardware_concurrency(); - - for (size_t i = 0; i < threadCount; i++) - { - _ThreadPool.push_back(std::thread([this]() { - while (_IsRequestedToTerminate == false) - { - auto dequeueTimeout = std::chrono::milliseconds(1); - auto reserveTimeRecord = _ReserveQueue->Dequeue(dequeueTimeout); - if (!reserveTimeRecord.has_value()) - { - continue; - } - auto task = MakeTask(reserveTimeRecord.value()); - task(_Queue); - } - })); - } - - for (auto &thread : _ThreadPool) - { - thread.join(); - } - - spdlog::debug("Reserve timing adjuster is terminated."); -} - -void ReserveTimingAdjuster::TryTerminate() -{ - spdlog::debug("Reserve timing adjuster is requested to terminate."); - _IsRequestedToTerminate = true; -} - -std::function>)> ReserveTimingAdjuster::MakeTask( - const ReserveTimeRecord &reserveTimeRecord) -{ - return [reserveTimeRecord](std::shared_ptr> _Queue) { - auto reservationTime = reserveTimeRecord.ReservationTime(); - std::this_thread::sleep_until(reservationTime); - auto trafficRecord = TrafficRecord(reserveTimeRecord.Data(), std::chrono::nanoseconds(0)); - _Queue->Enqueue(trafficRecord); - }; -} diff --git a/src/TimingAdjuster/ReserveTimingAdjuster.hpp b/src/TimingAdjuster/ReserveTimingAdjuster.hpp index 0f034ec..3772a39 100644 --- a/src/TimingAdjuster/ReserveTimingAdjuster.hpp +++ b/src/TimingAdjuster/ReserveTimingAdjuster.hpp @@ -13,24 +13,12 @@ class ReserveTimingAdjuster { public: - ReserveTimingAdjuster(std::shared_ptr> queue); - virtual void Produce(const TrafficRecord &trafficRecord); - virtual void Run(); - virtual void TryTerminate(); + ReserveTimingAdjuster(std::shared_ptr> queue); + void Adjust(const TrafficRecord &trafficRecord); private: - std::shared_ptr> _Queue; - std::shared_ptr> _ReserveQueue; + std::shared_ptr> _ReserveQueue; std::chrono::time_point _LatestReservationTime; - - /// @brief Thread pool to send data - std::vector _ThreadPool; - - /// @brief Thread function to send data - std::function>)> MakeTask( - const ReserveTimeRecord &reserveTimeRecord); - - bool _IsRequestedToTerminate; }; #endif