Skip to content

Commit

Permalink
persistentstorage: Simplify usage of storage client, avoid unnecessar…
Browse files Browse the repository at this point in the history
…y allocations (open-telemetry#8830)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Nov 10, 2023
1 parent 786299f commit 36a89bc
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 369 deletions.
13 changes: 13 additions & 0 deletions .chloggen/simplifypersistentstorage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: "exporterhelper"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Simplify usage of storage client, avoid unnecessary allocations"

# One or more tracking issues or pull requests related to the change
issues: [8830]
185 changes: 104 additions & 81 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -40,7 +42,6 @@ import (
// xxxx deleted
type persistentContiguousStorage struct {
logger *zap.Logger
queueName string
client storage.Client
unmarshaler RequestUnmarshaler
marshaler RequestMarshaler
Expand Down Expand Up @@ -74,19 +75,17 @@ const (
)

var (
errMaxCapacityReached = errors.New("max capacity reached")
errValueNotSet = errors.New("value not set")
errKeyNotPresentInBatch = errors.New("key was not present in get batchStruct")
errMaxCapacityReached = errors.New("max capacity reached")
errValueNotSet = errors.New("value not set")
)

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
func newPersistentContiguousStorage(ctx context.Context, queueName string, client storage.Client,
logger *zap.Logger, capacity uint64, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: logger,
logger: logger.With(zap.String(zapQueueNameKey, queueName)),
client: client,
queueName: queueName,
unmarshaler: unmarshaler,
marshaler: marshaler,
capacity: capacity,
Expand All @@ -96,7 +95,7 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, clien
itemsCount: &atomic.Uint64{},
}

initPersistentContiguousStorage(ctx, pcs)
pcs.initPersistentContiguousStorage(ctx)
notDispatchedReqs := pcs.retrieveNotDispatchedReqs(context.Background())

// Make sure the leftover requests are handled
Expand All @@ -114,32 +113,27 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, clien
return pcs
}

func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContiguousStorage) {
var writeIndex itemIndex
var readIndex itemIndex
batch, err := newBatch(pcs).get(readIndexKey, writeIndexKey).execute(ctx)
func (pcs *persistentContiguousStorage) initPersistentContiguousStorage(ctx context.Context) {
riOp := storage.GetOperation(readIndexKey)
wiOp := storage.GetOperation(writeIndexKey)

err := pcs.client.Batch(ctx, riOp, wiOp)
if err == nil {
readIndex, err = batch.getItemIndexResult(readIndexKey)
pcs.readIndex, err = bytesToItemIndex(riOp.Value)
}

if err == nil {
writeIndex, err = batch.getItemIndexResult(writeIndexKey)
pcs.writeIndex, err = bytesToItemIndex(wiOp.Value)
}

if err != nil {
if errors.Is(err, errValueNotSet) {
pcs.logger.Info("Initializing new persistent queue", zap.String(zapQueueNameKey, pcs.queueName))
pcs.logger.Info("Initializing new persistent queue")
} else {
pcs.logger.Error("Failed getting read/write index, starting with new ones",
zap.String(zapQueueNameKey, pcs.queueName),
zap.Error(err))
pcs.logger.Error("Failed getting read/write index, starting with new ones", zap.Error(err))
}
pcs.readIndex = 0
pcs.writeIndex = 0
} else {
pcs.readIndex = readIndex
pcs.writeIndex = writeIndex
}

pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex))
Expand All @@ -155,12 +149,10 @@ func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []Request)
}
if errCount > 0 {
pcs.logger.Error("Errors occurred while moving items for dispatching back to queue",
zap.String(zapQueueNameKey, pcs.queueName),
zap.Int(zapNumberOfItems, len(reqs)), zap.Int(zapErrorCount, errCount))

} else {
pcs.logger.Info("Moved items for dispatching back to queue",
zap.String(zapQueueNameKey, pcs.queueName),
zap.Int(zapNumberOfItems, len(reqs)))

}
Expand Down Expand Up @@ -193,7 +185,7 @@ func (pcs *persistentContiguousStorage) size() uint64 {
}

func (pcs *persistentContiguousStorage) stop() {
pcs.logger.Debug("Stopping persistentContiguousStorage", zap.String(zapQueueNameKey, pcs.queueName))
pcs.logger.Debug("Stopping persistentContiguousStorage")
pcs.stopOnce.Do(func() {
close(pcs.stopChan)
if err := pcs.client.Close(context.Background()); err != nil {
Expand All @@ -213,16 +205,22 @@ func (pcs *persistentContiguousStorage) put(req Request) error {
defer pcs.mu.Unlock()

if pcs.size() >= pcs.capacity {
pcs.logger.Warn("Maximum queue capacity reached", zap.String(zapQueueNameKey, pcs.queueName))
pcs.logger.Warn("Maximum queue capacity reached")
return errMaxCapacityReached
}

itemKey := pcs.itemKey(pcs.writeIndex)
itemKey := getItemKey(pcs.writeIndex)
pcs.writeIndex++
pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex))

reqBuf, err := pcs.marshaler(req)
if err != nil {
return err
}
ctx := context.Background()
_, err := newBatch(pcs).setItemIndex(writeIndexKey, pcs.writeIndex).setRequest(itemKey, req).execute(ctx)
err = pcs.client.Batch(ctx,
storage.SetOperation(writeIndexKey, itemIndexToBytes(pcs.writeIndex)),
storage.SetOperation(itemKey, reqBuf))

// Inform the loop that there's some data to process
pcs.putChan <- struct{}{}
Expand All @@ -245,16 +243,16 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques
pcs.itemDispatchingStart(ctx, index)

var req Request
batch, err := newBatch(pcs).get(pcs.itemKey(index)).execute(ctx)
itemKey := getItemKey(index)
buf, err := pcs.client.Get(ctx, itemKey)
if err == nil {
req, err = batch.getRequestResult(pcs.itemKey(index))
req, err = pcs.unmarshaler(buf)
}

if err != nil || req == nil {
// We need to make sure that currently dispatched items list is cleaned
if err := pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue",
zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err))
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
}

return nil, false
Expand All @@ -265,8 +263,7 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques
pcs.mu.Lock()
defer pcs.mu.Unlock()
if err := pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue",
zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err))
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
}
})
return req, true
Expand All @@ -285,61 +282,61 @@ func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Co
pcs.mu.Lock()
defer pcs.mu.Unlock()

pcs.logger.Debug("Checking if there are items left for dispatch by consumers", zap.String(zapQueueNameKey, pcs.queueName))
batch, err := newBatch(pcs).get(currentlyDispatchedItemsKey).execute(ctx)
pcs.logger.Debug("Checking if there are items left for dispatch by consumers")
itemKeysBuf, err := pcs.client.Get(ctx, currentlyDispatchedItemsKey)
if err == nil {
dispatchedItems, err = batch.getItemIndexArrayResult(currentlyDispatchedItemsKey)
dispatchedItems, err = bytesToItemIndexArray(itemKeysBuf)
}
if err != nil {
pcs.logger.Error("Could not fetch items left for dispatch by consumers", zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err))
pcs.logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err))
return reqs
}

if len(dispatchedItems) > 0 {
pcs.logger.Info("Fetching items left for dispatch by consumers",
zap.String(zapQueueNameKey, pcs.queueName), zap.Int(zapNumberOfItems, len(dispatchedItems)))
pcs.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems, len(dispatchedItems)))
} else {
pcs.logger.Debug("No items left for dispatch by consumers")
}

reqs = make([]Request, len(dispatchedItems))
keys := make([]string, len(dispatchedItems))
retrieveBatch := newBatch(pcs)
cleanupBatch := newBatch(pcs)
retrieveBatch := make([]storage.Operation, len(dispatchedItems))
cleanupBatch := make([]storage.Operation, len(dispatchedItems))
for i, it := range dispatchedItems {
keys[i] = pcs.itemKey(it)
retrieveBatch.get(keys[i])
cleanupBatch.delete(keys[i])
key := getItemKey(it)
retrieveBatch[i] = storage.GetOperation(key)
cleanupBatch[i] = storage.DeleteOperation(key)
}

_, retrieveErr := retrieveBatch.execute(ctx)
_, cleanupErr := cleanupBatch.execute(ctx)
retrieveErr := pcs.client.Batch(ctx, retrieveBatch...)
cleanupErr := pcs.client.Batch(ctx, cleanupBatch...)

if retrieveErr != nil {
pcs.logger.Warn("Failed retrieving items left by consumers", zap.String(zapQueueNameKey, pcs.queueName), zap.Error(retrieveErr))
pcs.logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr))
}

if cleanupErr != nil {
pcs.logger.Debug("Failed cleaning items left by consumers", zap.String(zapQueueNameKey, pcs.queueName), zap.Error(cleanupErr))
pcs.logger.Debug("Failed cleaning items left by consumers", zap.Error(cleanupErr))
}

if retrieveErr != nil {
return reqs
}

for i, key := range keys {
req, err := retrieveBatch.getRequestResult(key)
for i, op := range retrieveBatch {
if op.Value == nil {
pcs.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
continue
}
req, err := pcs.unmarshaler(op.Value)
// If error happened or item is nil, it will be efficiently ignored
if err != nil {
pcs.logger.Warn("Failed unmarshalling item",
zap.String(zapQueueNameKey, pcs.queueName), zap.String(zapKey, key), zap.Error(err))
pcs.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))
continue
}
if req == nil {
pcs.logger.Debug("Item value could not be retrieved", zap.String(zapKey, op.Key), zap.Error(err))
} else {
if req == nil {
pcs.logger.Debug("Item value could not be retrieved",
zap.String(zapQueueNameKey, pcs.queueName), zap.String(zapKey, key), zap.Error(err))
} else {
reqs[i] = req
}
reqs[i] = req
}
}

Expand All @@ -349,18 +346,14 @@ func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Co
// itemDispatchingStart appends the item to the list of currently dispatched items
func (pcs *persistentContiguousStorage) itemDispatchingStart(ctx context.Context, index itemIndex) {
pcs.currentlyDispatchedItems = append(pcs.currentlyDispatchedItems, index)
_, err := newBatch(pcs).
setItemIndexArray(currentlyDispatchedItemsKey, pcs.currentlyDispatchedItems).
execute(ctx)
err := pcs.client.Set(ctx, currentlyDispatchedItemsKey, itemIndexArrayToBytes(pcs.currentlyDispatchedItems))
if err != nil {
pcs.logger.Debug("Failed updating currently dispatched items",
zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err))
pcs.logger.Debug("Failed updating currently dispatched items", zap.Error(err))
}
}

// itemDispatchingFinish removes the item from the list of currently dispatched items and deletes it from the persistent queue
func (pcs *persistentContiguousStorage) itemDispatchingFinish(ctx context.Context, index itemIndex) error {
var batch *batchStruct
var updatedDispatchedItems []itemIndex
for _, it := range pcs.currentlyDispatchedItems {
if it != index {
Expand All @@ -369,26 +362,22 @@ func (pcs *persistentContiguousStorage) itemDispatchingFinish(ctx context.Contex
}
pcs.currentlyDispatchedItems = updatedDispatchedItems

batch = newBatch(pcs).
setItemIndexArray(currentlyDispatchedItemsKey, pcs.currentlyDispatchedItems).
delete(pcs.itemKey(index))
if _, err := batch.execute(ctx); err != nil {
setOp := storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pcs.currentlyDispatchedItems))
deleteOp := storage.DeleteOperation(getItemKey(index))
if err := pcs.client.Batch(ctx, setOp, deleteOp); err != nil {
// got an error, try to gracefully handle it
pcs.logger.Warn("Failed updating currently dispatched items, trying to delete the item first",
zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err))
pcs.logger.Warn("Failed updating currently dispatched items, trying to delete the item first", zap.Error(err))
} else {
// Everything ok, exit
return nil
}

if _, err := newBatch(pcs).delete(pcs.itemKey(index)).execute(ctx); err != nil {
if err := pcs.client.Batch(ctx, deleteOp); err != nil {
// Return an error here, as this indicates an issue with the underlying storage medium
return fmt.Errorf("failed deleting item from queue, got error from storage: %w", err)
}

batch = newBatch(pcs).
setItemIndexArray(currentlyDispatchedItemsKey, pcs.currentlyDispatchedItems)
if _, err := batch.execute(ctx); err != nil {
if err := pcs.client.Batch(ctx, setOp); err != nil {
// even if this fails, we still have the right dispatched items in memory
// at worst, we'll have the wrong list in storage, and we'll discard the nonexistent items during startup
return fmt.Errorf("failed updating currently dispatched items, but deleted item successfully: %w", err)
Expand All @@ -398,16 +387,50 @@ func (pcs *persistentContiguousStorage) itemDispatchingFinish(ctx context.Contex
}

func (pcs *persistentContiguousStorage) updateReadIndex(ctx context.Context) {
_, err := newBatch(pcs).
setItemIndex(readIndexKey, pcs.readIndex).
execute(ctx)

err := pcs.client.Set(ctx, readIndexKey, itemIndexToBytes(pcs.readIndex))
if err != nil {
pcs.logger.Debug("Failed updating read index",
zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err))
pcs.logger.Debug("Failed updating read index", zap.Error(err))
}
}

func (pcs *persistentContiguousStorage) itemKey(index itemIndex) string {
func getItemKey(index itemIndex) string {
return strconv.FormatUint(uint64(index), 10)
}

func itemIndexToBytes(value itemIndex) []byte {
return binary.LittleEndian.AppendUint64([]byte{}, uint64(value))
}

func bytesToItemIndex(b []byte) (itemIndex, error) {
val := itemIndex(0)
if b == nil {
return val, errValueNotSet
}
err := binary.Read(bytes.NewReader(b), binary.LittleEndian, &val)
return val, err
}

func itemIndexArrayToBytes(arr []itemIndex) []byte {
size := len(arr)
buf := make([]byte, 0, 4+size*8)
buf = binary.LittleEndian.AppendUint32(buf, uint32(size))
for _, item := range arr {
buf = binary.LittleEndian.AppendUint64(buf, uint64(item))
}
return buf
}

func bytesToItemIndexArray(b []byte) ([]itemIndex, error) {
if len(b) == 0 {
return nil, nil
}
var size uint32
reader := bytes.NewReader(b)
if err := binary.Read(reader, binary.LittleEndian, &size); err != nil {
return nil, err
}

val := make([]itemIndex, size)
err := binary.Read(reader, binary.LittleEndian, &val)
return val, err
}
Loading

0 comments on commit 36a89bc

Please sign in to comment.