Skip to content

Commit

Permalink
Producer を Runnable 化 (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
idofront authored Dec 7, 2024
1 parent 3880534 commit 2b49059
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 72 deletions.
24 changes: 17 additions & 7 deletions main/TrafficPlayer.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Dealer/DealReporter.hpp>
#include <Dealer/Dealer.hpp>
#include <ParseOptions.hpp>
#include <Producer/Producer.hpp>
#include <Queue/ThreadSafeQueue.hpp>
#include <Thread/Employer.hpp>
#include <TimingAdjuster/ReserveTimingAdjuster.hpp>
Expand Down Expand Up @@ -74,13 +75,23 @@ int main(int argc, char *argv[])

auto dealerThread = std::thread([&dealer]() { dealer.Run(); });

auto producer = ReserveTimingAdjuster(queuePtr);
auto producerThread = std::thread([&producer]() { producer.Run(); });
auto reserveTimeQueuePtr = std::make_shared<BoundedThreadSafeQueue<ReserveTimeRecord>>(1024);

// Create reserve timing adjuster
auto reserveTimingAdjuster = ReserveTimingAdjuster(reserveTimeQueuePtr);

// Create producers
auto producerFuturePtrs = std::vector<std::shared_ptr<Thread::Future>>();
for (auto i = 0; i < NUM_OF_PRODUCERS; i++)
{
auto producerFuturePtr = employer.Submit(std::make_shared<Producer>(reserveTimeQueuePtr, queuePtr));
producerFuturePtrs.push_back(producerFuturePtr);
}

auto repeatCount = options.RepeatCount();
uint64_t repeat = 0;

// Read pcap file
// Read pcap file and make traffic records to be reserved and sent later by the producer.
auto trafficRecords = trafficMakerPtr->Make();
while (++repeat)
{
Expand All @@ -101,7 +112,9 @@ int main(int argc, char *argv[])
}

std::for_each(trafficRecords.begin(), trafficRecords.end(),
[&producer](const TrafficRecord &trafficRecord) { producer.Produce(trafficRecord); });
[&reserveTimingAdjuster](const TrafficRecord &trafficRecord) {
reserveTimingAdjuster.Adjust(trafficRecord);
});
}

do
Expand All @@ -113,9 +126,6 @@ int main(int argc, char *argv[])
// This implementation is forceful and may cause packet loss.
try
{
producer.TryTerminate();
producerThread.join();

dealer.TryTerminate();
dealerThread.join();

Expand Down
34 changes: 34 additions & 0 deletions src/Producer/Producer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include <Producer/Producer.hpp>

Producer::Producer(std::shared_ptr<IThreadSafeQueue<ReserveTimeRecord>> inputQueue,
std::shared_ptr<IThreadSafeQueue<TrafficRecord>> outputQueue)
: _InputQueue(inputQueue), _OutputQueue(outputQueue)
{
}

void Producer::Task()
{
while (IsContinue())
{
auto dequeueTimeout = std::chrono::milliseconds(1);
auto reserveTimeRecordOpt = _InputQueue->Dequeue(dequeueTimeout);
if (!reserveTimeRecordOpt.has_value())
{
continue;
}

{
auto reserveTimeRecord = reserveTimeRecordOpt.value();
auto reservationTime = reserveTimeRecord.ReservationTime();
SleepUntil(reservationTime);
// When woken up, check this task should continue.
if (!IsContinue())
{
continue;
}

auto trafficRecord = TrafficRecord(reserveTimeRecord.Data(), std::chrono::nanoseconds(0));
_OutputQueue->Enqueue(trafficRecord);
}
}
}
21 changes: 21 additions & 0 deletions src/Producer/Producer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#ifndef PRODUCER__PRODUCER_HPP
#define PRODUCER__PRODUCER_HPP

#include <Queue/IThreadSafeQueue.hpp>
#include <Thread/Runnable.hpp>
#include <TimingAdjuster/ReserveTimeRecord.hpp>
#include <TrafficRecord.hpp>

class Producer : public Thread::Runnable
{
public:
Producer(std::shared_ptr<IThreadSafeQueue<ReserveTimeRecord>> inputQueue,
std::shared_ptr<IThreadSafeQueue<TrafficRecord>> outputQueue);
virtual void Task() override;

private:
std::shared_ptr<IThreadSafeQueue<ReserveTimeRecord>> _InputQueue;
std::shared_ptr<IThreadSafeQueue<TrafficRecord>> _OutputQueue;
};

#endif
53 changes: 3 additions & 50 deletions src/TimingAdjuster/ReserveTimingAdjuster.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#include <TimingAdjuster/ReserveTimingAdjuster.hpp>

ReserveTimingAdjuster::ReserveTimingAdjuster(std::shared_ptr<ThreadSafeQueue<TrafficRecord>> queue)
: _Queue(queue), _ReserveQueue(std::make_shared<BoundedThreadSafeQueue<ReserveTimeRecord>>(128)),
_LatestReservationTime(std::chrono::system_clock::now()), _ThreadPool(), _IsRequestedToTerminate(false)
ReserveTimingAdjuster::ReserveTimingAdjuster(std::shared_ptr<IThreadSafeQueue<ReserveTimeRecord>> queue)
: _ReserveQueue(queue), _LatestReservationTime(std::chrono::system_clock::now())
{
}

void ReserveTimingAdjuster::Produce(const TrafficRecord &trafficRecord)
void ReserveTimingAdjuster::Adjust(const TrafficRecord &trafficRecord)
{
auto now = std::chrono::system_clock::now();
if (_LatestReservationTime > now)
Expand All @@ -24,49 +23,3 @@ void ReserveTimingAdjuster::Produce(const TrafficRecord &trafficRecord)
_ReserveQueue->Enqueue(reserveTimeRecord);
_LatestReservationTime = reserveSendTime;
}

void ReserveTimingAdjuster::Run()
{
auto threadCount = std::thread::hardware_concurrency();

for (size_t i = 0; i < threadCount; i++)
{
_ThreadPool.push_back(std::thread([this]() {
while (_IsRequestedToTerminate == false)
{
auto dequeueTimeout = std::chrono::milliseconds(1);
auto reserveTimeRecord = _ReserveQueue->Dequeue(dequeueTimeout);
if (!reserveTimeRecord.has_value())
{
continue;
}
auto task = MakeTask(reserveTimeRecord.value());
task(_Queue);
}
}));
}

for (auto &thread : _ThreadPool)
{
thread.join();
}

spdlog::debug("Reserve timing adjuster is terminated.");
}

void ReserveTimingAdjuster::TryTerminate()
{
spdlog::debug("Reserve timing adjuster is requested to terminate.");
_IsRequestedToTerminate = true;
}

std::function<void(std::shared_ptr<ThreadSafeQueue<TrafficRecord>>)> ReserveTimingAdjuster::MakeTask(
const ReserveTimeRecord &reserveTimeRecord)
{
return [reserveTimeRecord](std::shared_ptr<ThreadSafeQueue<TrafficRecord>> _Queue) {
auto reservationTime = reserveTimeRecord.ReservationTime();
std::this_thread::sleep_until(reservationTime);
auto trafficRecord = TrafficRecord(reserveTimeRecord.Data(), std::chrono::nanoseconds(0));
_Queue->Enqueue(trafficRecord);
};
}
18 changes: 3 additions & 15 deletions src/TimingAdjuster/ReserveTimingAdjuster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,12 @@
class ReserveTimingAdjuster
{
public:
ReserveTimingAdjuster(std::shared_ptr<ThreadSafeQueue<TrafficRecord>> queue);
virtual void Produce(const TrafficRecord &trafficRecord);
virtual void Run();
virtual void TryTerminate();
ReserveTimingAdjuster(std::shared_ptr<IThreadSafeQueue<ReserveTimeRecord>> queue);
void Adjust(const TrafficRecord &trafficRecord);

private:
std::shared_ptr<ThreadSafeQueue<TrafficRecord>> _Queue;
std::shared_ptr<BoundedThreadSafeQueue<ReserveTimeRecord>> _ReserveQueue;
std::shared_ptr<IThreadSafeQueue<ReserveTimeRecord>> _ReserveQueue;
std::chrono::time_point<std::chrono::system_clock> _LatestReservationTime;

/// @brief Thread pool to send data
std::vector<std::thread> _ThreadPool;

/// @brief Thread function to send data
std::function<void(std::shared_ptr<ThreadSafeQueue<TrafficRecord>>)> MakeTask(
const ReserveTimeRecord &reserveTimeRecord);

bool _IsRequestedToTerminate;
};

#endif

0 comments on commit 2b49059

Please sign in to comment.