From ffd4f9c38b7ab1268bf92760c2cd42c63db3e745 Mon Sep 17 00:00:00 2001 From: Daniel Sabsay Date: Mon, 6 Jan 2025 16:25:42 -0800 Subject: [PATCH 1/2] Add test for AsyncOperationProcessor stop() behavior The existing implementation sometimes drops existing operations that are still on the queue when .stop() is called. If multiple communications in a select statement can proceed, one is chosen pseudo-randomly: https://go.dev/ref/spec#Select_statements This means that sometimes a processor worker will process a remaining operation, and sometimes it won't. Signed-off-by: Daniel Sabsay --- pkg/cacheutil/async_op_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 pkg/cacheutil/async_op_test.go diff --git a/pkg/cacheutil/async_op_test.go b/pkg/cacheutil/async_op_test.go new file mode 100644 index 0000000000..610c6d863f --- /dev/null +++ b/pkg/cacheutil/async_op_test.go @@ -0,0 +1,33 @@ +package cacheutil + +import ( + "sync" + "testing" + + "github.com/efficientgo/core/testutil" +) + +// Ensure that the processor does not stop if there are still operations waiting in the queue. +func TestAsyncOp(t *testing.T) { + for i := 0; i < 1000; i++ { + runTest(t) + } +} + +func runTest(t *testing.T) { + p := NewAsyncOperationProcessor(100, 10) + mtx := sync.Mutex{} + var acc int = 0 + + for i := 0; i < 100; i++ { + err := p.EnqueueAsync(func() { + mtx.Lock() + defer mtx.Unlock() + acc += 1 + }) + testutil.Ok(t, err) + } + + p.Stop() + testutil.Equals(t, 100, acc) +} From 6d87485f96de5d450a1e279f9e2bfd328fbbe614 Mon Sep 17 00:00:00 2001 From: Daniel Sabsay Date: Wed, 8 Jan 2025 10:15:59 -0800 Subject: [PATCH 2/2] add header to test file Signed-off-by: Daniel Sabsay --- pkg/cacheutil/async_op_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/cacheutil/async_op_test.go b/pkg/cacheutil/async_op_test.go index 610c6d863f..ee57d583ce 100644 --- a/pkg/cacheutil/async_op_test.go +++ b/pkg/cacheutil/async_op_test.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package cacheutil import (