Skip to content

Commit

Permalink
pw_async: Clarify tests and document current behavior
Browse files Browse the repository at this point in the history
Change-Id: I212141b7b6269eaaf0ac2f7fd53011abe5453cb7
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/143597
Commit-Queue: Taylor Cramer <cramertj@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
Reviewed-by: Ali Saeed <saeedali@google.com>
  • Loading branch information
cramertj authored and CQ Bot Account committed May 5, 2023
1 parent 0376847 commit 75830d7
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 140 deletions.
293 changes: 174 additions & 119 deletions pw_async/fake_dispatcher_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,152 +14,219 @@
#include "pw_async/fake_dispatcher.h"

#include "gtest/gtest.h"
#include "pw_containers/vector.h"
#include "pw_string/to_string.h"

#define ASSERT_OK(status) ASSERT_EQ(OkStatus(), status)
#define ASSERT_CANCELLED(status) ASSERT_EQ(Status::Cancelled(), status)

using namespace std::chrono_literals;

struct CallCounts {
int ok = 0;
int cancelled = 0;
bool operator==(const CallCounts& other) const {
return ok == other.ok && cancelled == other.cancelled;
}
};

namespace pw {
template <>
StatusWithSize ToString<CallCounts>(const CallCounts& value,
span<char> buffer) {
return string::Format(buffer,
"CallCounts {.ok = %d, .cancelled = %d}",
value.ok,
value.cancelled);
}
} // namespace pw

namespace pw::async::test {
namespace {

struct CallCounter {
CallCounts counts;
auto fn() {
return [this](Context&, Status status) {
if (status.ok()) {
this->counts.ok++;
} else if (status.IsCancelled()) {
this->counts.cancelled++;
}
};
}
};

TEST(FakeDispatcher, PostTasks) {
TEST(FakeDispatcher, UnpostedTasksDontRun) {
FakeDispatcher dispatcher;
CallCounter counter;
Task task(counter.fn());
dispatcher.RunUntilIdle();
EXPECT_EQ(counter.counts, CallCounts{});
}

int count = 0;
auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
ASSERT_OK(status);
++count;
};
TEST(FakeDispatcher, PostedTaskRunsOnce) {
FakeDispatcher dispatcher;
CallCounter counter;
Task task(counter.fn());
dispatcher.Post(task);
dispatcher.RunUntilIdle();
EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
}

Task task(inc_count);
TEST(FakeDispatcher, TaskPostedTwiceBeforeRunningRunsOnce) {
FakeDispatcher dispatcher;
CallCounter counter;
Task task(counter.fn());
dispatcher.Post(task);
dispatcher.Post(task);
dispatcher.RunUntilIdle();
EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
}

Task task2(inc_count);
dispatcher.Post(task2);
TEST(FakeDispatcher, TaskRepostedAfterRunningRunsTwice) {
FakeDispatcher dispatcher;
CallCounter counter;
Task task(counter.fn());
dispatcher.Post(task);
dispatcher.RunUntilIdle();
EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
dispatcher.Post(task);
dispatcher.RunUntilIdle();
EXPECT_EQ(counter.counts, CallCounts{.ok = 2});
}

Task task3(inc_count);
dispatcher.Post(task3);
TEST(FakeDispatcher, TwoPostedTasksEachRunOnce) {
FakeDispatcher dispatcher;
CallCounter counter_1;
Task task_1(counter_1.fn());
CallCounter counter_2;
Task task_2(counter_2.fn());
dispatcher.Post(task_1);
dispatcher.Post(task_2);
dispatcher.RunUntilIdle();
EXPECT_EQ(counter_1.counts, CallCounts{.ok = 1});
EXPECT_EQ(counter_2.counts, CallCounts{.ok = 1});
}

// Should not run; RunUntilIdle() does not advance time.
Task task4([&count]([[maybe_unused]] Context& c, Status status) {
ASSERT_CANCELLED(status);
++count;
});
dispatcher.PostAfter(task4, 1ms);
TEST(FakeDispatcher, PostedTasksRunInOrderForFairness) {
FakeDispatcher dispatcher;
pw::Vector<uint8_t, 3> task_run_order;
Task task_1([&task_run_order](auto...) { task_run_order.push_back(1); });
Task task_2([&task_run_order](auto...) { task_run_order.push_back(2); });
Task task_3([&task_run_order](auto...) { task_run_order.push_back(3); });
dispatcher.Post(task_1);
dispatcher.Post(task_2);
dispatcher.Post(task_3);
dispatcher.RunUntilIdle();
pw::Vector<uint8_t, 3> expected_run_order({1, 2, 3});
EXPECT_EQ(task_run_order, expected_run_order);
}

TEST(FakeDispatcher, RequestStopQueuesPreviouslyPostedTaskWithCancel) {
FakeDispatcher dispatcher;
CallCounter counter;
Task task(counter.fn());
dispatcher.Post(task);
dispatcher.RequestStop();
dispatcher.RunUntilIdle();
EXPECT_EQ(counter.counts, CallCounts{.cancelled = 1});
}

TEST(FakeDispatcher, RequestStopQueuesNewlyPostedTaskWithCancel) {
FakeDispatcher dispatcher;
CallCounter counter;
Task task(counter.fn());
dispatcher.RequestStop();
dispatcher.Post(task);
dispatcher.RunUntilIdle();
ASSERT_EQ(count, 4);
EXPECT_EQ(counter.counts, CallCounts{.cancelled = 1});
}

// Lambdas can only capture one ptr worth of memory without allocating, so we
// group the data we want to share between tasks and their containing tests
// inside one struct.
struct TaskPair {
Task task_a;
Task task_b;
int count = 0;
};
TEST(FakeDispatcher, RunUntilIdleDoesNotRunFutureTask) {
FakeDispatcher dispatcher;
CallCounter counter;
// Should not run; RunUntilIdle() does not advance time.
Task task(counter.fn());
dispatcher.PostAfter(task, 1ms);
dispatcher.RunUntilIdle();
EXPECT_EQ(counter.counts, CallCounts{});
}

TEST(FakeDispatcher, DelayedTasks) {
TEST(FakeDispatcher, PostAfterRunsTasksInSequence) {
FakeDispatcher dispatcher;
TaskPair tp;
pw::Vector<uint8_t, 3> task_run_order;
Task task_1([&task_run_order](auto...) { task_run_order.push_back(1); });
Task task_2([&task_run_order](auto...) { task_run_order.push_back(2); });
Task task_3([&task_run_order](auto...) { task_run_order.push_back(3); });
dispatcher.PostAfter(task_1, 50ms);
dispatcher.PostAfter(task_2, 25ms);
dispatcher.PostAfter(task_3, 100ms);
dispatcher.RunFor(125ms);
pw::Vector<uint8_t, 3> expected_run_order({2, 1, 3});
EXPECT_EQ(task_run_order, expected_run_order);
}

Task task0([&tp]([[maybe_unused]] Context& c, Status status) {
ASSERT_OK(status);
tp.count = tp.count * 10 + 4;
});
dispatcher.PostAfter(task0, 200ms);
TEST(FakeDispatcher, CancelInsideOtherTaskCancelsTaskWithoutRunningIt) {
FakeDispatcher dispatcher;

Task task1([&tp]([[maybe_unused]] Context& c, Status status) {
ASSERT_OK(status);
tp.count = tp.count * 10 + 1;
c.dispatcher->PostAfter(tp.task_a, 50ms);
c.dispatcher->PostAfter(tp.task_b, 25ms);
});
dispatcher.PostAfter(task1, 100ms);
CallCounter cancelled_task_counter;
Task cancelled_task(cancelled_task_counter.fn());

tp.task_a.set_function([&tp]([[maybe_unused]] Context& c, Status status) {
ASSERT_OK(status);
tp.count = tp.count * 10 + 3;
});
tp.task_b.set_function([&tp]([[maybe_unused]] Context& c, Status status) {
Task canceling_task([&cancelled_task](Context& c, Status status) {
ASSERT_OK(status);
tp.count = tp.count * 10 + 2;
ASSERT_TRUE(c.dispatcher->Cancel(cancelled_task));
});

dispatcher.RunFor(200ms);
dispatcher.RequestStop();
dispatcher.Post(canceling_task);
dispatcher.Post(cancelled_task);
dispatcher.RunUntilIdle();
ASSERT_EQ(tp.count, 1234);

// NOTE: the cancelled task is *not* run with `Cancel`.
// This is likely to produce strange behavior, and this contract should
// be revisited and carefully documented.
EXPECT_EQ(cancelled_task_counter.counts, CallCounts{});
}

TEST(FakeDispatcher, CancelTasks) {
TEST(FakeDispatcher, CancelInsideCurrentTaskFails) {
FakeDispatcher dispatcher;

auto shouldnt_run = []([[maybe_unused]] Context& c,
[[maybe_unused]] Status status) { FAIL(); };

TaskPair tp;
// This task gets canceled in cancel_task.
tp.task_a.set_function(shouldnt_run);
dispatcher.PostAfter(tp.task_a, 40ms);

// This task gets canceled immediately.
Task task1(shouldnt_run);
dispatcher.PostAfter(task1, 10ms);
ASSERT_TRUE(dispatcher.Cancel(task1));

// This task cancels the first task.
Task cancel_task([&tp](Context& c, Status status) {
Task self_cancel_task;
self_cancel_task.set_function([&self_cancel_task](Context& c, Status status) {
ASSERT_OK(status);
ASSERT_TRUE(c.dispatcher->Cancel(tp.task_a));
++tp.count;
ASSERT_FALSE(c.dispatcher->Cancel(self_cancel_task));
});
dispatcher.PostAfter(cancel_task, 20ms);

dispatcher.RunFor(50ms);
dispatcher.RequestStop();
dispatcher.Post(self_cancel_task);
dispatcher.RunUntilIdle();
ASSERT_EQ(tp.count, 1);
}

// Test RequestStop() from inside task.
TEST(FakeDispatcher, RequestStopInsideTask) {
TEST(FakeDispatcher, RequestStopInsideOtherTaskCancelsOtherTask) {
FakeDispatcher dispatcher;

int count = 0;
auto cancelled_cb = [&count]([[maybe_unused]] Context& c, Status status) {
ASSERT_CANCELLED(status);
++count;
};

// These tasks are never executed and cleaned up in RequestStop().
Task task0(cancelled_cb), task1(cancelled_cb);
dispatcher.PostAfter(task0, 20ms);
dispatcher.PostAfter(task1, 21ms);
// This task is never executed and is cleaned up in RequestStop().
CallCounter task_counter;
Task task(task_counter.fn());

Task stop_task([&count]([[maybe_unused]] Context& c, Status status) {
int stop_count = 0;
Task stop_task([&stop_count]([[maybe_unused]] Context& c, Status status) {
ASSERT_OK(status);
++count;
stop_count++;
static_cast<FakeDispatcher*>(c.dispatcher)->RequestStop();
static_cast<FakeDispatcher*>(c.dispatcher)->RunUntilIdle();
});

dispatcher.Post(stop_task);
dispatcher.Post(task);

dispatcher.RunUntilIdle();
ASSERT_EQ(count, 3);
EXPECT_EQ(stop_count, 1);
EXPECT_EQ(task_counter.counts, CallCounts{.cancelled = 1});
}

TEST(FakeDispatcher, PeriodicTasks) {
FakeDispatcher dispatcher;

int count = 0;
Task periodic_task([&count]([[maybe_unused]] Context& c, Status status) {
ASSERT_OK(status);
++count;
});
CallCounter periodic_counter;
Task periodic_task(periodic_counter.fn());
dispatcher.PostPeriodicAt(periodic_task, 20ms, dispatcher.now() + 50ms);

// Cancel periodic task after it has run thrice, at +50ms, +70ms, and +90ms.
Expand All @@ -172,36 +239,28 @@ TEST(FakeDispatcher, PeriodicTasks) {
dispatcher.RunFor(300ms);
dispatcher.RequestStop();
dispatcher.RunUntilIdle();
ASSERT_EQ(count, 3);
EXPECT_EQ(periodic_counter.counts, CallCounts{.ok = 3});
}

TEST(FakeDispatcher, PostPeriodicAfter) {
FakeDispatcher dispatcher;

int count = 0;
Task periodic_task([&count]([[maybe_unused]] Context& c, Status status) {
ASSERT_OK(status);
++count;
});
CallCounter counter;
Task periodic_task(counter.fn());
dispatcher.PostPeriodicAfter(periodic_task, /*interval=*/5ms, /*delay=*/20ms);

dispatcher.RunUntilIdle();
ASSERT_EQ(count, 0);
EXPECT_EQ(counter.counts, CallCounts{});
dispatcher.RunFor(20ms);
ASSERT_EQ(count, 1);
EXPECT_EQ(counter.counts, CallCounts{.ok = 1});
dispatcher.RunFor(10ms);
ASSERT_EQ(count, 3);
EXPECT_EQ(counter.counts, CallCounts{.ok = 3});
dispatcher.RunUntilIdle();
ASSERT_EQ(count, 3);
EXPECT_EQ(counter.counts, CallCounts{.ok = 3});
}

TEST(FakeDispatcher, TasksCancelledByDispatcherDestructor) {
int count = 0;
auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
ASSERT_CANCELLED(status);
++count;
};
Task task0(inc_count), task1(inc_count), task2(inc_count);
CallCounter counter;
Task task0(counter.fn()), task1(counter.fn()), task2(counter.fn());

{
FakeDispatcher dispatcher;
Expand All @@ -210,25 +269,21 @@ TEST(FakeDispatcher, TasksCancelledByDispatcherDestructor) {
dispatcher.PostAfter(task2, 10s);
}

ASSERT_EQ(count, 3);
ASSERT_EQ(counter.counts, CallCounts{.cancelled = 3});
}

TEST(DispatcherBasic, TasksCancelledByRunFor) {
int count = 0;
auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
ASSERT_CANCELLED(status);
++count;
};
Task task0(inc_count), task1(inc_count), task2(inc_count);

FakeDispatcher dispatcher;
CallCounter counter;
Task task0(counter.fn()), task1(counter.fn()), task2(counter.fn());
dispatcher.PostAfter(task0, 10s);
dispatcher.PostAfter(task1, 10s);
dispatcher.PostAfter(task2, 10s);

dispatcher.RequestStop();
dispatcher.RunFor(5s);
ASSERT_EQ(count, 3);
ASSERT_EQ(counter.counts, CallCounts{.cancelled = 3});
}

} // namespace
} // namespace pw::async::test
2 changes: 2 additions & 0 deletions pw_async/fake_dispatcher_test.gni
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ template("fake_dispatcher_test") {
pw_sync_TIMED_THREAD_NOTIFICATION_BACKEND != "" &&
pw_thread_THREAD_BACKEND != ""
deps = [
"$dir_pw_containers:vector",
"$dir_pw_string:to_string",
"$dir_pw_sync:timed_thread_notification",
"$dir_pw_thread:thread",
dir_pw_log,
Expand Down
Loading

0 comments on commit 75830d7

Please sign in to comment.