From fbbb99e5ce242513af47d8bed8f521a4c2e22054 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 15 Feb 2023 18:09:43 -0800 Subject: [PATCH] [chore] fix nits in persistent queue implementation (#7203) Signed-off-by: Bogdan Drutu --- .../internal/persistent_queue.go | 18 +++----- .../internal/persistent_queue_test.go | 36 +++++++-------- .../internal/persistent_storage_batch_test.go | 10 ++--- .../internal/persistent_storage_test.go | 45 ++++++------------- 4 files changed, 41 insertions(+), 68 deletions(-) diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index e2e0b3c6b1d..ae47044079b 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -34,12 +34,10 @@ var ( // persistentQueue holds the queue backed by file storage type persistentQueue struct { - logger *zap.Logger - stopWG sync.WaitGroup - stopOnce sync.Once - stopChan chan struct{} - numWorkers int - storage *persistentContiguousStorage + stopWG sync.WaitGroup + stopOnce sync.Once + stopChan chan struct{} + storage *persistentContiguousStorage } // buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done @@ -51,21 +49,17 @@ func buildPersistentStorageName(name string, signal component.DataType) string { // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue { return &persistentQueue{ - logger: logger, stopChan: make(chan struct{}), storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler), } } // StartConsumers starts the given number of consumers which will be consuming items -func (pq *persistentQueue) StartConsumers(num int, callback func(item Request)) { - pq.numWorkers = num - - for i := 0; i < pq.numWorkers; i++ { +func (pq *persistentQueue) StartConsumers(numWorkers int, callback func(item Request)) { + for i := 0; i < numWorkers; i++ { pq.stopWG.Add(1) go func() { defer pq.stopWG.Done() - for { select { case req := <-pq.storage.get(): diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index bec50a373bd..f6ab8b892ce 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/zap" @@ -50,7 +51,7 @@ func TestPersistentQueue_Capacity(t *testing.T) { t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) }) wq := createTestQueue(ext, 5) - require.Equal(t, 0, wq.Size()) + assert.Equal(t, 0, wq.Size()) traces := newTraces(1, 10) req := newFakeTracesRequest(traces) @@ -58,20 +59,20 @@ func TestPersistentQueue_Capacity(t *testing.T) { for i := 0; i < 10; i++ { result := wq.Produce(req) if i < 6 { - require.True(t, result) + assert.True(t, result) } else { - require.False(t, result) + assert.False(t, result) } // Let's make sure the loop picks the first element into the channel, // so the capacity could be used in full if i == 0 { - require.Eventually(t, func() bool { + assert.Eventually(t, func() bool { return wq.Size() == 0 }, 5*time.Second, 10*time.Millisecond) } } - require.Equal(t, 5, wq.Size()) + assert.Equal(t, 5, wq.Size()) } } @@ -79,7 +80,7 @@ func TestPersistentQueue_Close(t *testing.T) { path := t.TempDir() ext := createStorageExtension(path) - t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) }) + t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) }) wq := createTestQueue(ext, 1001) traces := newTraces(1, 10) @@ -91,15 +92,13 @@ func TestPersistentQueue_Close(t *testing.T) { wq.Produce(req) } // This will close the queue very quickly, consumers might not be able to consume anything and should finish gracefully - require.Eventually(t, func() bool { + assert.NotPanics(t, func() { wq.Stop() - return true - }, 5*time.Second, 10*time.Millisecond) + }) // The additional stop should not panic - require.Eventually(t, func() bool { + assert.NotPanics(t, func() { wq.Stop() - return true - }, 5*time.Second, 10*time.Millisecond) + }) } // Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage @@ -107,7 +106,7 @@ func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) { path := t.TempDir() ext := createStorageExtension(path) - t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) }) + t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) }) wq := createTestQueue(ext, 1001) traces := newTraces(1, 10) @@ -130,11 +129,10 @@ func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) { for i := 0; i < 1000; i++ { wq.Produce(req) } - require.Eventually(t, func() bool { + assert.NotPanics(t, func() { wq.Stop() - return true - }, 5*time.Second, 10*time.Millisecond) - require.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time") + }) + assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time") stopStorage = fnBefore } @@ -176,7 +174,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { tq := createTestQueue(ext, 5000) defer tq.Stop() - t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) }) + t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) }) numMessagesConsumed := atomic.NewInt32(0) tq.StartConsumers(c.numConsumers, func(item Request) { @@ -189,7 +187,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { tq.Produce(req) } - require.Eventually(t, func() bool { + assert.Eventually(t, func() bool { return c.numMessagesProduced == int(numMessagesConsumed.Load()) }, 5*time.Second, 10*time.Millisecond) }) diff --git a/exporter/exporterhelper/internal/persistent_storage_batch_test.go b/exporter/exporterhelper/internal/persistent_storage_batch_test.go index 496a767b6b2..481e4bafdff 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -35,7 +36,6 @@ func TestPersistentStorageBatch_Operations(t *testing.T) { setItemIndex("index", itemIndexValue). setItemIndexArray("arr", itemIndexArrayValue). execute(context.Background()) - require.NoError(t, err) batch, err := newBatch(ps). @@ -45,11 +45,11 @@ func TestPersistentStorageBatch_Operations(t *testing.T) { retrievedItemIndexValue, err := batch.getItemIndexResult("index") require.NoError(t, err) - require.Equal(t, itemIndexValue, retrievedItemIndexValue) + assert.Equal(t, itemIndexValue, retrievedItemIndexValue) retrievedItemIndexArrayValue, err := batch.getItemIndexArrayResult("arr") require.NoError(t, err) - require.Equal(t, itemIndexArrayValue, retrievedItemIndexArrayValue) + assert.Equal(t, itemIndexArrayValue, retrievedItemIndexArrayValue) _, err = newBatch(ps).delete("index", "arr").execute(context.Background()) require.NoError(t, err) @@ -60,9 +60,9 @@ func TestPersistentStorageBatch_Operations(t *testing.T) { require.NoError(t, err) _, err = batch.getItemIndexResult("index") - require.Error(t, err, errValueNotSet) + assert.Error(t, err, errValueNotSet) retrievedItemIndexArrayValue, err = batch.getItemIndexArrayResult("arr") require.NoError(t, err) - require.Nil(t, retrievedItemIndexArrayValue) + assert.Nil(t, retrievedItemIndexArrayValue) } diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index dadd082b730..d4ff5134404 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -226,12 +227,12 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) { requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0}) // Now, this will take item 0 and pull item 1 into the unbuffered channel - readReq := getItemFromChannel(t, ps) - require.Equal(t, req.td, readReq.(*fakeTracesRequest).td) + readReq := <-ps.get() + assert.Equal(t, req.td, readReq.(*fakeTracesRequest).td) requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1}) // This takes item 1 from channel and pulls another one (item 2) into the unbuffered channel - secondReadReq := getItemFromChannel(t, ps) + secondReadReq := <-ps.get() requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1, 2}) // Lets mark item 1 as finished, it will remove it from the currently dispatched items list @@ -242,7 +243,7 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) { // The queue should be essentially {3,4,0,2} out of which item "3" should be pulled right away into // the unbuffered channel. Check how many items are there, which, after the current one is fetched should go to 3. newPs := createTestPersistentStorage(client) - require.Eventually(t, func() bool { + assert.Eventually(t, func() bool { return newPs.size() == 3 }, 5*time.Second, 10*time.Millisecond) @@ -250,13 +251,13 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) { // We should be able to pull all remaining items now for i := 0; i < 4; i++ { - req := getItemFromChannel(t, newPs) + req := <-newPs.get() req.OnProcessingFinished() } // The queue should be now empty requireCurrentlyDispatchedItemsEqual(t, newPs, nil) - require.Eventually(t, func() bool { + assert.Eventually(t, func() bool { return newPs.size() == 0 }, 5*time.Second, 10*time.Millisecond) @@ -302,10 +303,10 @@ func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) { }, 5*time.Second, 10*time.Millisecond) // Lets read both of the elements we put - readReq := getItemFromChannel(t, ps) + readReq := <-ps.get() require.Equal(t, req.td, readReq.(*fakeTracesRequest).td) - readReq = getItemFromChannel(t, ps) + readReq = <-ps.get() require.Equal(t, req.td, readReq.(*fakeTracesRequest).td) require.Equal(t, uint64(0), ps.size()) @@ -431,15 +432,6 @@ func TestPersistentStorage_StopShouldCloseClient(t *testing.T) { require.Equal(t, uint64(1), castedClient.getCloseCount()) } -func getItemFromChannel(t *testing.T, pcs *persistentContiguousStorage) Request { - var readReq Request - require.Eventually(t, func() bool { - readReq = <-pcs.get() - return true - }, 5*time.Second, 10*time.Millisecond) - return readReq -} - func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguousStorage, compare []itemIndex) { require.Eventually(t, func() bool { pcs.mu.Lock() @@ -448,30 +440,19 @@ func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguou }, 5*time.Second, 10*time.Millisecond) } -type mockStorageExtension struct{} - -func (m mockStorageExtension) Start(_ context.Context, _ component.Host) error { - return nil -} - -func (m mockStorageExtension) Shutdown(_ context.Context) error { - return nil +type mockStorageExtension struct { + component.StartFunc + component.ShutdownFunc } func (m mockStorageExtension) GetClient(ctx context.Context, kind component.Kind, id component.ID, s string) (storage.Client, error) { - return newMockStorageClient(), nil + return &mockStorageClient{st: map[string][]byte{}}, nil } func newMockStorageExtension() storage.Extension { return &mockStorageExtension{} } -func newMockStorageClient() storage.Client { - return &mockStorageClient{ - st: map[string][]byte{}, - } -} - type mockStorageClient struct { st map[string][]byte mux sync.Mutex