Skip to content

Commit

Permalink
Force non-zero queue timeout (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shillaker authored Jun 17, 2021
1 parent cd8a532 commit 646dd85
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
24 changes: 14 additions & 10 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

#include <faabric/util/exception.h>
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>

#include <queue>

#define DEFAULT_QUEUE_TIMEOUT_MS 500

namespace faabric::util {
class QueueTimeoutException : public faabric::util::FaabricException
{
Expand Down Expand Up @@ -40,21 +43,22 @@ class Queue
}
}

T dequeue(long timeoutMs = 0)
T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
UniqueLock lock(mx);

if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
throw std::runtime_error("Invalid queue timeout");
}

while (mq.empty()) {
if (timeoutMs > 0) {
std::cv_status returnVal = enqueueNotifier.wait_for(
lock, std::chrono::milliseconds(timeoutMs));
std::cv_status returnVal = enqueueNotifier.wait_for(
lock, std::chrono::milliseconds(timeoutMs));

// Work out if this has returned due to timeout expiring
if (returnVal == std::cv_status::timeout) {
throw QueueTimeoutException("Timeout waiting for dequeue");
}
} else {
enqueueNotifier.wait(lock);
// Work out if this has returned due to timeout expiring
if (returnVal == std::cv_status::timeout) {
throw QueueTimeoutException("Timeout waiting for dequeue");
}
}

Expand Down
13 changes: 13 additions & 0 deletions tests/test/util/test_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,17 @@ TEST_CASE("Test queue on non-copy-constructible object", "[util]")
REQUIRE(fa.get() == 1);
REQUIRE(fb.get() == 2);
}

TEST_CASE("Test queue timeout must be positive", "[util]")
{
int timeoutValueMs;

SECTION("Zero timeout") { timeoutValueMs = 0; }

SECTION("Negative timeout") { timeoutValueMs = -1; }

faabric::util::Queue<int> q;
q.enqueue(10);
REQUIRE_THROWS(q.dequeue(timeoutValueMs));
}
}

0 comments on commit 646dd85

Please sign in to comment.