Skip to content

Commit

Permalink
update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
yosriayed committed Apr 25, 2024
1 parent 7c76ebd commit e51ed74
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 103 deletions.
258 changes: 164 additions & 94 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# cplease
CppParallelEase (or cplease) is a C++ header-only library that tries to provide some improved primitives to ease the writing of parallel and asynchronous c++ tasks
- [future/promise](#future_promise)
- [multiple futures/promises](#futures_promises)
- [thread_pool](#thread_pool)
- [circular reader/writer](#circular_buffer)
- [spmc/mpsc channel](#channel)

## future/promise
## <a id="future_promise"></a> future/promise

The pair of `plz::future` and `plz::promise` classes are similiar to `std::promise`
and `std::future`. They provide a mechanism to to access the result of asynchronous operations.
Expand All @@ -28,10 +33,11 @@ attached to the same future.

### Usage example

Write asynchronous task that return a plz::future object as handler for the result:

```cpp
#include <plz/future.hpp>

// simulate a asynchronous task that return a plz::future object as handler for the result
auto some_async_task() -> plz::future<int>
{
plz::promise<int> promise;
Expand Down Expand Up @@ -60,19 +66,23 @@ auto some_async_task() -> plz::future<int>

return future;
}
```
Block the calling the thread until the promise is fulfilled. If an exception was set on the promise, it will be rethrown:

int main()
{
// block the calling the thread until the promise is fulfilled. If an exception was set on the promise, it will be rethrown here
```cpp
int result = some_async_task().get();
assert(result == 42);

// similair to `get`, but the result is moved out and the promise is reset to the "not ready" state
int result2 = some_async_task().take();
```
`take` method is similair to `get`, but the result is moved out and the promise is reset to the "not ready" state
```cpp
int result = some_async_task().take();
assert(result2 == 42);
```

// chain the async task with a lambda that takes the result of the previous task and returns a new result, then wait for the final result
int result3 = some_async_task()
Chain the async task with a callable that takes the result of the previous task and returns a new result, then wait for the final result:

```cpp
int result = some_async_task()
.then(
[](int result)
{
Expand All @@ -85,11 +95,14 @@ int main()
})
.get();
assert(result3 == 42);
```
At any moment in the execution of the chained continuations, a thrown exception will be captured and set on the promise and will be rethrown when calling `wait`:
// Catch the exception thrown in then wait for the final result. If an exception was set on the promise, it will be rethrown here
```cpp
try
{
int result4 = some_async_task()
int result = some_async_task()
.then(
[](int result)
{
Expand All @@ -110,105 +123,117 @@ int main()
assert(e == 42);
}
// Install a handler that will be called when an exception is thrown in the chain of continuations
// The exception will still be rethrown when calling `wait`
try
{
int result5 = some_async_task()
.then(
[](int result)
{
throw std::runtime_error("error");
return result;
})
.then(
[](int result)
{
// this lambda will not be called
assert(false);
return result;
})
.on_exception(
[](std::runtime_error& e)
{
// this will be called when the exception of the same type is thrown
})
.on_exception(
[](std::exception& e)
{
// this will not be called since the exception is handled in the previous `on_exception` handler
})
.get();
}
catch(std::runtime_error& e)
{
assert(std::string(e.what()) == "error");
}

// Chain another async task with a lambda that return a plz::future object
int result6 = some_async_task()
```
Install exception handlers that will be called when an exception is thrown in the chain of continuations.
The exception will still be rethrown when calling `wait`:
```cpp
try
{
int result = some_async_task()
.then(
[](int result)
{
// make_promise is equivalent to just declaring plz::promise normally
auto promise = plz::make_promise<int>();
auto future = promise.get_future();
std::thread(
[promise = std::move(promise)]()
{
promise.set_result(result + 1);
})
.detach();
return future;
throw std::runtime_error("error");
return result;
})
.then(
[](int result)
{
// this lambda will not be called
assert(false);
return result;
})
.on_exception(
[](int& e)
{
// this will not be called since the type of exception does not match the thrown one
})
.on_exception(
[](std::runtime_error& e)
{
// this will be called since the type of the exception match the thrown one
})
.on_exception(
[](std::exception& e)
{
// this will not be called since the exception is handled in the previous `on_exception` handler
})
.get();
assert(result6 == 43);
}
catch(std::runtime_error& e)
{
assert(std::string(e.what()) == "error");
}
```
## multiple futures/promises

Chain another async task with a lambda that return a plz::future object

```cpp
int result = some_async_task()
.then(
[](int result)
{
// make_promise is equivalent to just declaring plz::promise normally
auto promise = plz::make_promise<int>();
auto future = promise.get_future();
std::thread(
[promise = std::move(promise)]()
{
promise.set_result(result + 1);
})
.detach();
return future;
})
.then(
[](int result)
{
return result;
})
.get();
assert(result6 == 43);
```
See the [tests](https://github.com/yosriayed/cplease/blob/main/test/future.test.cpp) for more examples
## <a id="futures_promises"></a> multiple futures/promises
plz::futures provide a convenient way to handle a collection of future/promise objects using a key type and provide an aggregate result handle using a plz::future.
```cpp
std::array promises = {
plz::promise<double>(), plz::promise<double>(), plz::promise<double>()
};
std::array promises = {
plz::promise<double>(), plz::promise<double>(), plz::promise<double>()
};
auto futures = plz::make_futures<double, int>();
auto futures = plz::make_futures<double, int>();
for(int i = 0; i < promises.size(); ++i)
{
futures.add_future(i, promises[i].get_future());
for(int i = 0; i < promises.size(); ++i)
{
futures.add_future(i, promises[i].get_future());
std::thread(
[i, promise = promises[i]]() mutable
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
promise.set_result(i);
})
.detach();
}
std::thread(
[i, promise = promises[i]]() mutable
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
promise.set_result(i);
})
.detach();
}
double res1 = futures.get(0);
double res2 = futures.get(1);
double res3 = futures.get(2);
double res1 = futures.get(0);
double res2 = futures.get(1);
double res3 = futures.get(2);
assert(res1 == 0);
assert(res2 == 1);
assert(res3 == 2);
assert(res1 == 0);
assert(res2 == 1);
assert(res3 == 2);
std::vector<double> res = futures.get();
double acc = std::accumulate(res.begin(), res.end(), 0.0);
std::vector<double> res = futures.get();
double acc = std::accumulate(res.begin(), res.end(), 0.0);

assert(acc == promises.size() * (promises.size() - 1) / 2);
assert(acc == promises.size() * (promises.size() - 1) / 2);
```

## thread_pool
## <a id="thread_pool"></a> thread_pool
`plz::thread_pool` class is a fixed size thread pool. The user submits tasks (any callable object) to be executed into a queue using the `run` method, which return a `plz::future` object.
```cpp
plz::thread_pool pool(3);
Expand All @@ -226,7 +251,7 @@ int result = future.get();

assert(result == 42);
```
plz::thread_pool uses instances of std::jthreads. So the user can enqueue a callable object that take a std::stop_token as its last argument called with workers jthread stop token for gracefull task cancellation when the the jthread tries to stop.
plz::thread_pool uses instances of std::jthreads. So the user can enqueue a callable object that take a std::stop_token as its last argument. the task will be called with the stop token of the worker jthread for gracefull task cancellation when the the jthread tries to stop.
```cpp
plz::thread_pool pool(3);
Expand All @@ -252,7 +277,7 @@ pool.run(
std::this_thread::sleep_for(2s);
pool.quit();
pool.quit(); // request to stop the jthreads
pool.wait();
```

Expand All @@ -278,7 +303,7 @@ auto sum = f.then(

assert(sum == 44);
```
A singleton global instance of plz::thread_pool is available and can be used directly using the free functions under the namespace plz:
A singleton global instance of plz::thread_pool is available `plz::thread_pool::global_instance()` and can be used directly using the free functions under the namespace plz.
```cpp
plz::set_thread_count(4); // will throw if global instance is already configured with number of threads.
Expand All @@ -289,17 +314,19 @@ auto foo = [] (int i, int x) -> int
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return i + x;
}
}
plz::run(foo, 1, 2).get(); // enqueue foo on the global instance plz::thread_pool::global_instance() and wait for the result
std::array vec = { 1, 2, 3, 4, 5, 6, 7, 8 };
std::vector<int> res_vec = plz::map(vec, [] (auto i) { return i + 1; }).get(); // enqueue the lambda for each element in the provided range and wait for the aggregate result
plz::quit();
plz::wait();
```

`plz::future` has a `then` overload that takes an isntance of `plz::thread_pool` as first argument. This overload will enquue the provided callable on the provided thread pool. `async_then` method on the other hand will enqueue the task on the same thread_pool without having to specifiy it as argument
`plz::future` has a `then` overload that takes an instance of `plz::thread_pool` as first argument. This overload will enquue the provided callable on the provided thread pool. `async_then` method on the other hand will enqueue the task on the same thread_pool without having to specifiy it as argument

```cpp
plz::thread_pool pool(2);
Expand Down Expand Up @@ -334,5 +361,48 @@ pool.run(foo, 1, 2).
pool.wait();
pool2.wait();
```
## circular buffer reader and writer
// TODO
See the [tests](https://github.com/yosriayed/cplease/blob/main/test/async_tasks.test.cpp) for more examples

## <a id="circular_buffer"></a> circular buffer reader/writer
A circular buffer is expressed using the c++20 concept `plz::circular_buffer_ptr` which check if a given type is a pointer to a type that behaves like an array with a compile-time known capacity that is a power of 2.
Any type of pointer to std::array with capacity power of 2 satisfy this concept.
```cpp
using plz::circbuff;
static_assert(circular_buffer_ptr<std::array<int, 16>*>);
static_assert(circular_buffer_ptr<std::unique_ptr<std::array<int, 16>>);
static_assert(circular_buffer_ptr<<std::shared_ptr<std::array<int, 16>>);
static_assert(circular_buffer_ptr<<std::optional<std::array<int, 16>>);
```
`plz::circbuff::reader` and `plz::circbuff::writer` take as argument an instance of any type that satisfy the `plz::circular_buffer_ptr` concept.
reader and writer instances that points the same array are independent.
```cpp
std::array<char, 8> array;
plz::circbuff::reader reader(&array);
plz::circbuff::writer writer(&array);
writer.put('0');
writer.put('1');
writer.put('2');
writer.put('3');
writer.write("5678", 4);
char read_data[4];
size_t size_data_read = reader.read(read_data, 4);
reader.read_using(
[](char* data, size_t size){
std::cout << std::string_view(data, size) << "\n";
return size; // should return how much data was actually read.
},
4);
```

See the [tests](https://github.com/yosriayed/cplease/blob/main/test/circbuff.test.cpp) for more usage examples

## <a id="channel"></a> spmc/mpsc channel

See the [tests](https://github.com/yosriayed/cplease/blob/main/test/channel.test.cpp) for more usage examples
Loading

0 comments on commit e51ed74

Please sign in to comment.