Skip to content

Commit

Permalink
[chore] remove unnecessary type itemIndex, use uint64 (open-telemetry…
Browse files Browse the repository at this point in the history
…#8897)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Nov 14, 2023
1 parent 560bd1d commit 3ca5c91
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
34 changes: 16 additions & 18 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ type persistentContiguousStorage struct {
capacity uint64

mu sync.Mutex
readIndex itemIndex
writeIndex itemIndex
currentlyDispatchedItems []itemIndex
readIndex uint64
writeIndex uint64
currentlyDispatchedItems []uint64
refClient int64
}

type itemIndex uint64

const (
zapKey = "key"
zapErrorCount = "errorCount"
Expand Down Expand Up @@ -141,7 +139,7 @@ func (pcs *persistentContiguousStorage) get() (QueueRequest, bool) {
}

func (pcs *persistentContiguousStorage) size() uint64 {
return uint64(pcs.writeIndex - pcs.readIndex)
return pcs.writeIndex - pcs.readIndex
}

// Size returns the number of currently available items, which were not picked by consumers yet
Expand Down Expand Up @@ -269,7 +267,7 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) QueueRe
// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
// and moves the items at the back of the queue.
func (pcs *persistentContiguousStorage) retrieveAndEnqueueNotDispatchedReqs(ctx context.Context) {
var dispatchedItems []itemIndex
var dispatchedItems []uint64

pcs.mu.Lock()
defer pcs.mu.Unlock()
Expand Down Expand Up @@ -335,7 +333,7 @@ func (pcs *persistentContiguousStorage) retrieveAndEnqueueNotDispatchedReqs(ctx
}

// itemDispatchingFinish removes the item from the list of currently dispatched items and deletes it from the persistent queue
func (pcs *persistentContiguousStorage) itemDispatchingFinish(ctx context.Context, index itemIndex) error {
func (pcs *persistentContiguousStorage) itemDispatchingFinish(ctx context.Context, index uint64) error {
lenCDI := len(pcs.currentlyDispatchedItems)
for i := 0; i < lenCDI; i++ {
if pcs.currentlyDispatchedItems[i] == index {
Expand Down Expand Up @@ -369,34 +367,34 @@ func (pcs *persistentContiguousStorage) itemDispatchingFinish(ctx context.Contex
return nil
}

func getItemKey(index itemIndex) string {
return strconv.FormatUint(uint64(index), 10)
func getItemKey(index uint64) string {
return strconv.FormatUint(index, 10)
}

func itemIndexToBytes(value itemIndex) []byte {
return binary.LittleEndian.AppendUint64([]byte{}, uint64(value))
func itemIndexToBytes(value uint64) []byte {
return binary.LittleEndian.AppendUint64([]byte{}, value)
}

func bytesToItemIndex(b []byte) (itemIndex, error) {
val := itemIndex(0)
func bytesToItemIndex(b []byte) (uint64, error) {
val := uint64(0)
if b == nil {
return val, errValueNotSet
}
err := binary.Read(bytes.NewReader(b), binary.LittleEndian, &val)
return val, err
}

func itemIndexArrayToBytes(arr []itemIndex) []byte {
func itemIndexArrayToBytes(arr []uint64) []byte {
size := len(arr)
buf := make([]byte, 0, 4+size*8)
buf = binary.LittleEndian.AppendUint32(buf, uint32(size))
for _, item := range arr {
buf = binary.LittleEndian.AppendUint64(buf, uint64(item))
buf = binary.LittleEndian.AppendUint64(buf, item)
}
return buf
}

func bytesToItemIndexArray(b []byte) ([]itemIndex, error) {
func bytesToItemIndexArray(b []byte) ([]uint64, error) {
if len(b) == 0 {
return nil, nil
}
Expand All @@ -406,7 +404,7 @@ func bytesToItemIndexArray(b []byte) ([]itemIndex, error) {
return nil, err
}

val := make([]itemIndex, size)
val := make([]uint64, size)
err := binary.Read(reader, binary.LittleEndian, &val)
return val, err
}
34 changes: 17 additions & 17 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,28 +187,28 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
require.NoError(t, err)
}

requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{})
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{})

// Takes index 0 in process.
readReq, found := ps.get()
require.True(t, found)
assert.Equal(t, req.td, readReq.Request.(*fakeTracesRequest).td)
requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0})
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// This takes item 1 to process.
secondReadReq, found := ps.get()
require.True(t, found)
requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1})
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1})

// Lets mark item 1 as finished, it will remove it from the currently dispatched items list.
secondReadReq.OnProcessingFinished()
requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0})
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end.
// The queue should be essentially {3,4,0,2}.
newPs := createTestPersistentStorage(client)
assert.Equal(t, 4, newPs.Size())
requireCurrentlyDispatchedItemsEqual(t, newPs, []itemIndex{})
requireCurrentlyDispatchedItemsEqual(t, newPs, []uint64{})

// We should be able to pull all remaining items now
for i := 0; i < 4; i++ {
Expand All @@ -218,14 +218,14 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
}

// The queue should be now empty
requireCurrentlyDispatchedItemsEqual(t, newPs, []itemIndex{})
requireCurrentlyDispatchedItemsEqual(t, newPs, []uint64{})
assert.Equal(t, 0, newPs.Size())
// The writeIndex should be now set accordingly
require.EqualValues(t, 6, newPs.writeIndex)

// There should be no items left in the storage
for i := 0; i < int(newPs.writeIndex); i++ {
bb, err := client.Get(context.Background(), getItemKey(itemIndex(i)))
bb, err := client.Get(context.Background(), getItemKey(uint64(i)))
require.NoError(t, err)
require.Nil(t, bb)
}
Expand Down Expand Up @@ -343,8 +343,8 @@ func BenchmarkPersistentStorage_TraceSpans(b *testing.B) {

func TestItemIndexMarshaling(t *testing.T) {
cases := []struct {
in itemIndex
out itemIndex
in uint64
out uint64
}{
{
in: 0,
Expand Down Expand Up @@ -372,20 +372,20 @@ func TestItemIndexMarshaling(t *testing.T) {

func TestItemIndexArrayMarshaling(t *testing.T) {
cases := []struct {
in []itemIndex
out []itemIndex
in []uint64
out []uint64
}{
{
in: []itemIndex{0, 1, 2},
out: []itemIndex{0, 1, 2},
in: []uint64{0, 1, 2},
out: []uint64{0, 1, 2},
},
{
in: []itemIndex{},
out: []itemIndex{},
in: []uint64{},
out: []uint64{},
},
{
in: nil,
out: []itemIndex{},
out: []uint64{},
},
}

Expand Down Expand Up @@ -509,7 +509,7 @@ func TestPersistentStorage_ItemDispatchingFinish_ErrorHandling(t *testing.T) {
}
}

func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguousStorage, compare []itemIndex) {
func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguousStorage, compare []uint64) {
pcs.mu.Lock()
defer pcs.mu.Unlock()
assert.ElementsMatch(t, compare, pcs.currentlyDispatchedItems)
Expand Down

0 comments on commit 3ca5c91

Please sign in to comment.