Skip to content

Commit

Permalink
improvements and simplifications
Browse files Browse the repository at this point in the history
  • Loading branch information
yosriayed committed Apr 23, 2024
1 parent 8316de8 commit dac0c1d
Show file tree
Hide file tree
Showing 17 changed files with 923 additions and 3,004 deletions.
44 changes: 0 additions & 44 deletions .github/workflows/test-ubuntu-latest-clang-with_std_expected.yml

This file was deleted.

39 changes: 0 additions & 39 deletions .github/workflows/windows-latest-with-std-expected.yml

This file was deleted.

6 changes: 0 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ target_compile_features(${PROJECT_NAME} INTERFACE cxx_std_20)
target_include_directories(${PROJECT_NAME} INTERFACE include)

option(BUILD_TESTS "Build tests" OFF)
option(WITH_STD_EXPECTED "Support std::expected specialisations" OFF)

if(WITH_STD_EXPECTED)
message("Building with std::expected specialisations")
target_compile_definitions(${PROJECT_NAME} INTERFACE WITH_STD_EXPECTED)
endif()

if(BUILD_TESTS)

Expand Down
228 changes: 143 additions & 85 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ auto some_async_task() -> plz::future<int>
// an arbitrary type can be set as an exception
// promise.set_exception(42);
// promise.set_exception(std::runtime_error("error"));

}
catch(...)
{
Expand Down Expand Up @@ -168,113 +169,170 @@ int main()
.get();
assert(result6 == 43);
}
```
```
## multiple futures/promises

### specializations for std::expected
plz::futures provide a convenient way to handle a collection of future/promise objects using a key type and provide an aggregate result handle using a plz::future.

If a c++23 compiler is used with support for std::expected, adding the define "WITH_STD_EXPECTED" will enable the template classes plz::expected::future and plz::expected::promise. Similiar to std::expected, these classes takes first template parameter `T` for the value and a second `E` is the type of the error.
These classes does not support the catching of rethrowing of exceptions. The user should handle the exceptions himself.(i.e catching the exception and converting it to error type.)
```cpp
std::array promises = {
plz::promise<double>(), plz::promise<double>(), plz::promise<double>()
};

The return type of `get` ant `take` methods is `std::expected<T,E>`
auto futures = plz::make_futures<double, int>();

The template function `plz::make_promise<T>()` create a `promise<T>` and template function `plz::make_promise<T, E>()` create a `plz::expected::promise<T, E>`.
for(int i = 0; i < promises.size(); ++i)
{
futures.add_future(i, promises[i].get_future());

std::thread(
[i, promise = promises[i]]() mutable
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
promise.set_result(i);
})
.detach();
}

double res1 = futures.get(0);
double res2 = futures.get(1);
double res3 = futures.get(2);

assert(res1 == 0);
assert(res2 == 1);
assert(res3 == 2);

std::vector<double> res = futures.get();
double acc = std::accumulate(res.begin(), res.end(), 0.0);

assert(acc == promises.size() * (promises.size() - 1) / 2);
```
## thread_pool
`plz::thread_pool` class is a fixed size thread pool. The user submits tasks (any callable object) to be executed into a queue using the `run` method, which return a `plz::future` object.
```cpp
#define WITH_STD_EXPECTED
#include <plz/future.hpp>
plz::thread_pool pool(3);
enum class ErrorCode
auto task = [](int x)
{
error_1,
error_2
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return x;
};
// simulate a asynchronous task that return a plz::expected::future object as handler for the result
auto some_async_task() -> plz::expected::future<int, ErrorCode>
{
auto promise = plz::make_promise<int, ErrorCode>;
auto future = promise.get_future();
pool.run(task, 84);
auto future = pool.run(task, 42);
std::thread(
[promise = std::move(promise)]()
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
promise.set_result(42);
int result = future.get();
// eventually we can set an error:
// promise.set_error(ErrorCode::error_1);
// promise.set_result(std::unexpected(ErrorCode::error_1));
})
.detach();
assert(result == 42);
```
plz::thread_pool uses instances of std::jthreads. So the user can enqueue a callable object that take a std::stop_token as its last argument called with workers jthread stop token for gracefull task cancellation when the the jthread tries to stop.

return future;
}
```cpp
plz::thread_pool pool(3);

int main()
{
// block the calling the thread until the promise is fulfilled.
std::expected<int, ErrorCode> result = some_async_task().get();
assert(result.value() == 42);
pool.run(
[](int i, std::stop_token token)
{
while(!token.stop_requested())
{
std::this_thread::sleep_for(100ms);
}
},
42);

// similair to `get`, but the result is moved out and the promise is reset to the "not ready" state
std::expected<int, ErrorCode> result2 = some_async_task().take();
assert(result2.value() == 42);
pool.run(
[](std::stop_token token)
{
while(!token.stop_requested())
{
std::this_thread::sleep_for(100ms);
}
});

// chain the async task with a lambda that takes the result of the previous task and returns a new result, then wait for the final result
std::expected<int, ErrorCode> result3 =
some_async_task()
.then(
[](int result)
{
return std::to_string(result);
})
.then(
[](std::string result)
{
return std::stoi(result);
})
.get()
.value();
assert(result3 == 42);
std::this_thread::sleep_for(2s);

// Install a handler that will be called when an expected is returned in the chain of continuations
// The error will still be retrievable on the expected result object

std::expected<int, ErrorCode> result5 =
some_async_task()
.then(
[](int result) -> std::expected<int, ErrorCode>
{
return std::unexpected(ErrorCode::error_1);
})
.then(
[](int result)
{
// this lambda will not be called
assert(false);
return result;
})
.on_error(
[](ErrorCode e)
{
// this will be called when error is set on the promise
assert(e == ErrorCode::error_1);
})
.get();

assert(result5.error() == ErrorCode::error_1);
}
pool.quit();
pool.wait();
```
## packaged_tasks
The user can also enqueue a task to be called on range of values using the `map` method which in turn will return a `plz::futures` object.
Like `std::packaged_task` the class template `plz::packaged_task` wraps any Callable target (function, lambda expression, bind expression, or another function object) so that it can be invoked asynchronously. They are designed to work with `plz::future`, `plz::promise` and `plz::thread_pool` classes.
```cpp
plz::thread_pool pool(4);
std::array vec = { 1, 2, 3, 4, 5, 6, 7, 8 };
auto f = pool.map(vec, [] (auto i) { return i + 1; });
auto sum = f.then(
[](const auto& res)
{
int acc = 0;
for(auto r : res)
{
acc += r;
}
return acc;
})
.get();
assert(sum == 44);
```
A singleton global instance of plz::thread_pool is available and can be used directly using the free functions under the namespace plz:

```cpp
plz::set_thread_count(4); // will throw if global instance is already configured with number of threads.
// So this function should be called before any usage of the global isntance of plz::thread_pool.
// Otherwise the default number of threads is gonna be std::thread::hardware_concurrency()

auto foo = [] (int i, int x) -> int
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return i + x;
}

plz::run(foo, 1, 2).get(); // enqueue foo on the global instance plz::thread_pool::global_instance() and wait for the result


std::array vec = { 1, 2, 3, 4, 5, 6, 7, 8 };
std::vector<int> res_vec = plz::map(vec, [] (auto i) { return i + 1; }).get(); // enqueue the lambda for each element in the provided range and wait for the aggregate result

```
## thread_pool
TODO
## circular buffer
TODO
`plz::future` has a `then` overload that takes an isntance of `plz::thread_pool` as first argument. This overload will enquue the provided callable on the provided thread pool. `async_then` method on the other hand will enqueue the task on the same thread_pool without having to specifiy it as argument
```cpp
plz::thread_pool pool(2);
plz::thread_pool pool2(2);
auto foo = [] (int i, int x) -> int
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return i + x;
}
// enqueue foo on the pool then enqueue a lambda on the second pool
pool.run(foo, 1, 2)
.then(
&pool2,
[](int result)
{
return result - 1;
}
);
// enqueue foo on the pool then enqueue a lambda on the second pool
pool.run(foo, 1, 2).
async_then(
[](int result)
{
return result - 1;
}
);
// wait for the pools to be idle
pool.wait();
pool2.wait();
```
## circular buffer reader and writer
// TODO
2 changes: 1 addition & 1 deletion include/plz/circbuff/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class source
source_notify_function_id register_notify_function(Func&& func)
{
auto id = m_notify_function_id_counter++;
m_notif_funcs.emplace_back({ id, std::move(func) });
m_notif_funcs.push_back({ id, std::move(func) });
return id;
}

Expand Down
Loading

0 comments on commit dac0c1d

Please sign in to comment.