Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Dealer Actions Observable #47

Merged
merged 1 commit into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion main/TrafficPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ int main(int argc, char *argv[])
auto employer = Thread::Employer(NUM_OF_THREADS);
auto dealerPtr = std::make_shared<Dealer>(queuePtr, interface);
auto dealerFuturePtr = employer.Submit(dealerPtr);
auto reporterFuturePtr = employer.Submit(std::make_shared<DealReporter>(*dealerPtr, reportIntervalMsec));
auto reporterPtr = std::make_shared<DealReporter>(reportIntervalMsec);
reporterPtr->RegisterDealer(dealerPtr);
auto reporterFuturePtr = employer.Submit(reporterPtr);

// Create producers
auto producerFuturePtrs = CreateProducers(employer, reserveTimeQueuePtr, queuePtr);
Expand Down
3 changes: 3 additions & 0 deletions src/Dealer/DealReport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define DEALER__DEAL_REPORT_HPP

#include <chrono>
#include <memory>

class DealReport
{
Expand All @@ -19,4 +20,6 @@ class DealReport
std::size_t _PacketSize;
};

using DealReportPtr = std::shared_ptr<DealReport>;

#endif
21 changes: 13 additions & 8 deletions src/Dealer/DealReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
#include <numeric>
#include <spdlog/spdlog.h>

DealReporter::DealReporter(Dealer &dealer, std::chrono::milliseconds interval)
: _Dealer(dealer), _Interval(interval), _UnitConverter(std::make_unique<SiPrefixConversion>()),
_IsRequestedToTerminate(false)
DealReporter::DealReporter(std::chrono::milliseconds interval)
: _Interval(interval), _UnitConverter(std::make_unique<SiPrefixConversion>()), _IsRequestedToTerminate(false),
_Reports(std::make_shared<ThreadSafeQueue<DealReportPtr>>())
{
}

Expand All @@ -30,11 +30,11 @@ void DealReporter::Task()
// Reserve the next show time before processing the reports to avoid the delay.
_ShowReportsReservation = std::chrono::system_clock::now() + _Interval;

auto reports = std::vector<DealReport>();
while (!_Dealer.ReportsPtr->Empty())
auto reports = std::vector<DealReportPtr>();
while (!_Reports->Empty())
{
auto timeout = std::chrono::milliseconds(1000);
auto report = _Dealer.ReportsPtr->Dequeue(timeout);
auto report = _Reports->Dequeue(timeout);
if (report.has_value())
{
reports.push_back(report.value());
Expand All @@ -55,15 +55,20 @@ void DealReporter::PostTask()
spdlog::debug("Deal reporter is terminated.");
}

void DealReporter::ShowReports(const std::vector<DealReport> &reports,
void DealReporter::RegisterDealer(std::shared_ptr<Dealer> dealer)
{
dealer->RegisterDealedCallback([this](DealReportPtr report) { this->_Reports->Enqueue(report); });
}

void DealReporter::ShowReports(const std::vector<DealReportPtr> &reports,
std::chrono::time_point<std::chrono::system_clock> rangeStart,
std::chrono::time_point<std::chrono::system_clock> rangeEnd,
const UnitConverter &_UnitConverter)
{
auto range = std::chrono::duration_cast<std::chrono::nanoseconds>(rangeEnd - rangeStart).count();
auto rangeSecondsAsDouble = range / 1'000'000'000.0;
auto totalSize = std::accumulate(reports.begin(), reports.end(), 0,
[](int sum, const DealReport &report) { return sum + report.PacketSize(); });
[](int sum, const DealReportPtr &report) { return sum + report->PacketSize(); });
auto totalSizeBits = totalSize * 8;
auto throughput = totalSizeBits / rangeSecondsAsDouble;
auto packetPerSecond = reports.size() / rangeSecondsAsDouble;
Expand Down
8 changes: 5 additions & 3 deletions src/Dealer/DealReporter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,28 @@
#include <Dealer/Dealer.hpp>
#include <Thread/Runnable.hpp>
#include <UnitConverter/UnitConverter.hpp>
#include <vector>

class DealReporter : public Thread::Runnable
{
public:
DealReporter(Dealer &dealer, std::chrono::milliseconds interval);
DealReporter(std::chrono::milliseconds interval);
virtual ~DealReporter() = default;
virtual void PreTask() override;
virtual void Task() override;
virtual void PostTask() override;
virtual void RegisterDealer(std::shared_ptr<Dealer> dealer);

private:
Dealer &_Dealer;
std::chrono::milliseconds _Interval;
static void ShowReports(const std::vector<DealReport> &reports,
static void ShowReports(const std::vector<DealReportPtr> &reports,
std::chrono::time_point<std::chrono::system_clock> rangeStart,
std::chrono::time_point<std::chrono::system_clock> rangeEnd,
const UnitConverter &_UnitConverter);
UnitConverter _UnitConverter;
bool _IsRequestedToTerminate;
std::chrono::system_clock::time_point _ShowReportsReservation;
std::shared_ptr<ThreadSafeQueue<DealReportPtr>> _Reports;
};

#endif
39 changes: 25 additions & 14 deletions src/Dealer/Dealer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,11 @@ void Dealer::Task()

if (result.has_value())
{
try
{
Send(result.value().Data());
}
catch (const std::exception &e)
{
spdlog::error(e.what());
}
auto report = Send(result.value().Data());

// Notify callbacks
// TODO Callbacks should be executed in another thread because we don't know how long it will take
NotifyDealedCallbacks(report);
}
else
{
Expand All @@ -46,19 +43,33 @@ void Dealer::Task()
spdlog::debug("Dealer is terminated.");
}

void Dealer::Send(const std::vector<uint8_t> &data)
void Dealer::RegisterDealedCallback(std::function<void(DealReportPtr)> callback)
{
_DealedCallbacks.push_back(callback);
}

void Dealer::NotifyDealedCallbacks(DealReportPtr &report)
{
std::for_each(_DealedCallbacks.begin(), _DealedCallbacks.end(),
[&report](const std::function<void(DealReportPtr)> &callback) { callback(report); });
}

std::shared_ptr<DealReport> Dealer::Send(const std::vector<uint8_t> &data)
{
auto ready_time = std::chrono::system_clock::now();
// データ送信
auto sent_time = std::chrono::system_clock::from_time_t(0);
// Send data
if (sendto(sockfd, data.data(), data.size(), 0, (struct sockaddr *)&device, sizeof(device)) < 0)
{
auto error = strerror(errno);
auto fmt = boost::format("Failed to send data: %1%");
throw std::runtime_error(boost::str(fmt % error));
spdlog::warn(boost::str(fmt % error));
}
else
{
sent_time = std::chrono::system_clock::now();
}
auto sent_time = std::chrono::system_clock::now();
auto report = DealReport(ready_time, sent_time, data.size());
ReportsPtr->Enqueue(report);
return std::make_shared<DealReport>(ready_time, std::chrono::system_clock::from_time_t(0), data.size());
}

void Dealer::PrepareSocket()
Expand Down
8 changes: 7 additions & 1 deletion src/Dealer/Dealer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@ class Dealer : public Thread::Runnable
{
public:
Dealer(std::shared_ptr<ThreadSafeQueue<TrafficRecord>> queue, const std::string &device_name);

virtual ~Dealer();

virtual void Task();

std::shared_ptr<ThreadSafeQueue<DealReport>> ReportsPtr;

void RegisterDealedCallback(std::function<void(DealReportPtr)> callback);

private:
std::shared_ptr<ThreadSafeQueue<TrafficRecord>> queue;

int sockfd;
struct ifreq if_idx;
struct sockaddr_ll device;
const std::string device_name;
std::vector<std::function<void(DealReportPtr)>> _DealedCallbacks;
void NotifyDealedCallbacks(DealReportPtr &report);

void Send(const std::vector<uint8_t> &data);
std::shared_ptr<DealReport> Send(const std::vector<uint8_t> &data);
void PrepareSocket();
};

Expand Down
Loading