Skip to content

Commit

Permalink
Thread を管理するクラスを実装 (#24)
Browse files Browse the repository at this point in the history
* Runnable, Worker, Employer, ThreadPool, Future を実装
* FixedSizeVector の実装
* ThreadSample の実装
* ThreadSample 用 launch.json の記述
  • Loading branch information
idofront authored Dec 7, 2024
1 parent 92e9317 commit f443978
Show file tree
Hide file tree
Showing 11 changed files with 558 additions and 1 deletion.
23 changes: 22 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,25 @@
{
"version": "0.2.0",
"configurations": []
"configurations": [
{
"name": "Thread Sample",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/build/ThreadSample",
"args": [],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
],
"preLaunchTask": "CMake: Build (Debug)"
}
]
}
85 changes: 85 additions & 0 deletions main/ThreadSample.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include <Thread/Employer.hpp>
#include <Thread/Runnable.hpp>
#include <spdlog/spdlog.h>

namespace Thread
{
class SampleRunner : public Runnable
{
public:
SampleRunner() : _Counter(0)
{
}

virtual void PreTask() override
{
spdlog::info("PreTask");
}

virtual void Task() override
{
Sleep(std::chrono::milliseconds(100));
spdlog::info("Task: {}", _Counter++);
}

virtual void PostTask() override
{
spdlog::info("PostTask");
}

int Counter() const
{
return _Counter;
}

private:
std::uint64_t _Counter;
};
} // namespace Thread

int main()
{
Thread::Employer employer;
auto futures = {employer.Submit(std::make_shared<Thread::SampleRunner>()),
employer.Submit(std::make_shared<Thread::SampleRunner>())};

// Check the submission
for (auto &future : futures)
{
if (!future)
{
spdlog::error("Failed to submit the task");
return 1;
}
}

// Wait for a while
std::this_thread::sleep_for(std::chrono::seconds(1));

// Get the result
for (auto &future : futures)
{
future->TryTerminate();
auto result = future->Get(std::chrono::milliseconds(1000));

if (result.has_value())
{
// In this implementation, could not get the result.
}
else
{
spdlog::error("Task did not finish");
}
}

auto activeThreadcount = employer.ActiveThreadCount();
spdlog::info("Active thread count: {}", activeThreadcount);

spdlog::info("Try to terminate all threads");
employer.TryTerminate();

spdlog::info("Wait for all threads to finish");
employer.Wait();

return 0;
}
8 changes: 8 additions & 0 deletions src/Thread/Employer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#include <Thread/Employer.hpp>

namespace Thread
{
Employer::Employer() : ThreadPool(std::thread::hardware_concurrency())
{
}
} // namespace Thread
17 changes: 17 additions & 0 deletions src/Thread/Employer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef THREAD__EMPLOYER_HPP
#define THREAD__EMPLOYER_HPP

#include <Thread/ThreadPool.hpp>
#include <thread>

namespace Thread
{
/// @brief Employer class to manage the thread pool and interface for submitting tasks
class Employer : public ThreadPool
{
public:
Employer();
};
} // namespace Thread

#endif
43 changes: 43 additions & 0 deletions src/Thread/Future.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#include <Thread/Future.hpp>

namespace Thread
{
Future::Future(std::shared_ptr<Runnable> runnablePtr) : _RunnablePtr(runnablePtr)
{
runnablePtr->RegisterCallback([this]() { NotifyCallbacks(); });
}

bool Future::Wait(std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lock(_Mutex);
return _ConditionVar.wait_for(lock, timeout, [this] { return !_RunnablePtr || !_RunnablePtr->IsContinue(); });
}

void Future::TryTerminate()
{
if (_RunnablePtr)
{
_RunnablePtr->TryTerminate();
}
}

std::optional<std::shared_ptr<Runnable>> Future::Get(std::chrono::milliseconds timeout)
{
// Wait for the thread to finish
bool wait = Wait(timeout);

// If the thread is not finished after the timeout, return nullopt
return wait ? std::make_optional(_RunnablePtr) : std::nullopt;
}

void Future::RegisterCallback(std::function<void()> callback)
{
std::unique_lock<std::mutex> lock(_Mutex);
_Callbacks.push_back(callback);
}

void Future::NotifyCallbacks()
{
std::for_each(_Callbacks.begin(), _Callbacks.end(), [](const std::function<void()> &callback) { callback(); });
}
} // namespace Thread
46 changes: 46 additions & 0 deletions src/Thread/Future.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#ifndef THREAD__FUTURE_HPP
#define THREAD__FUTURE_HPP

#include <Thread/Runnable.hpp>
#include <functional>
#include <memory>
#include <optional>

namespace Thread
{
/// @brief Future class for managing Runnable tasks
class Future
{
public:
Future(std::shared_ptr<Runnable> runnablePtr);

/// @brief Wait for the thread to finish
/// @param timeout
/// @return If the thread is not finished after the timeout, return false.
virtual bool Wait(std::chrono::milliseconds timeout);

/// @brief Try to terminate the thread
virtual void TryTerminate();

/// @brief Get the runnable object.
/// @return The runnable object if the thread is finished, otherwise return nullopt.
virtual std::optional<std::shared_ptr<Runnable>> Get(std::chrono::milliseconds timeout);

/// @brief Register the callback function to be called when the thread is finished.
/// @param callback The callback function to be called.
virtual void RegisterCallback(std::function<void()> callback);

private:
std::shared_ptr<Runnable> _RunnablePtr;
std::mutex _Mutex;
std::condition_variable _ConditionVar;

/// @brief Callback functions to be called when the thread is finished.
std::vector<std::function<void()>> _Callbacks;

/// @brief Notify the callback functions
void NotifyCallbacks();
};
} // namespace Thread

#endif
83 changes: 83 additions & 0 deletions src/Thread/Runnable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#include <Thread/Runnable.hpp>

namespace Thread
{
Runnable::Runnable() : _IsRequestedToTerminate(false), _IsTerminated(false)
{
}

void Runnable::Run()
{
PreTask();
while (IsContinue())
{
// In the task, it may call Sleep() to sleep for a while.
Task();
}
PostTask();

_IsTerminated = true;

// Notify the callbacks
NotifyCallbacks();
}

void Runnable::TryTerminate()
{
// Change flag to terminate
_IsRequestedToTerminate = true;

// Interrupt the sleep if the thread is sleeping
_SleepCondition.notify_all();
}

void Runnable::Join()
{
if (_Thread.joinable())
{
_Thread.join();
}
}

void Runnable::Sleep(std::chrono::milliseconds duration)
{
// Sleep for the given duration, but should be interrupted if requested to terminate
std::unique_lock<std::mutex> lock(_Mutex);
_SleepCondition.wait_for(lock, duration, [this] { return ShouldBeTerminated(); });
}

void Runnable::PreTask()
{
// Do nothing by default
}

void Runnable::PostTask()
{
// Do nothing by default
}

bool Runnable::IsContinue()
{
return !ShouldBeTerminated();
}

bool Runnable::ShouldBeTerminated()
{
return _IsRequestedToTerminate.load();
}

void Runnable::RegisterCallback(std::function<void()> callback)
{
_Callbacks.push_back(callback);
}

bool Runnable::IsTerminated() const
{
return _IsTerminated;
}

void Runnable::NotifyCallbacks()
{
std::for_each(_Callbacks.begin(), _Callbacks.end(), [](const std::function<void()> &callback) { callback(); });
}
} // namespace Thread
80 changes: 80 additions & 0 deletions src/Thread/Runnable.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#ifndef THREAD__RUNNABLE_HPP
#define THREAD__RUNNABLE_HPP

#include <Queue/IThreadSafeQueue.hpp>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <vector>

namespace Thread
{
/// @brief Runnable interface
class Runnable
{
public:
Runnable();

/// @brief Run the task
virtual void Run() final;

/// @brief Try to terminate the task gracefully.
virtual void TryTerminate() final;

/// @brief Join the thread
virtual void Join();

/// @brief Check if the task should continue
/// @return If the task should continue, return true.
bool IsContinue();

/// @brief Register the callback function to be called when the task is finished.
/// @param callback The callback function to be called.
virtual void RegisterCallback(std::function<void()> callback) final;

bool IsTerminated() const;

protected:
/// @brief Sleep for a while.
/// @param duration The duration to sleep.
/// @note The sleep can be interrupted if requested to terminate. In the implementation of Task(), this function
/// should be called instead of std::this_thread::sleep_for().
virtual void Sleep(std::chrono::milliseconds duration);

/// @brief Pre-task
/// @note This function is called before the task is executed.
virtual void PreTask();

/// @brief The task to be executed
/// @note This function should be implemented by the derived class. If the task needs to wait for a while, use
/// Sleep() to sleep.
virtual void Task() = 0;

/// @brief Post-task
/// @note This function is called after the task is executed.
virtual void PostTask();

private:
bool ShouldBeTerminated();

std::atomic<bool> _IsRequestedToTerminate;
std::thread _Thread;
std::mutex _Mutex;
std::condition_variable _SleepCondition;

/// @brief Notify the callback functions
void NotifyCallbacks();

/// @brief Callback functions to be called when the task is finished.
std::vector<std::function<void()>> _Callbacks;

bool _IsTerminated;
};

using RunnablePtr = std::shared_ptr<Runnable>;
using RunnableQueue = IThreadSafeQueue<RunnablePtr>;
using RunnableQueuePtr = std::shared_ptr<RunnableQueue>;
} // namespace Thread

#endif
Loading

0 comments on commit f443978

Please sign in to comment.