Skip to content

Commit

Permalink
[chore] Simplify initialization logic of the not dispached requests (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#8857)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
  • Loading branch information
bogdandrutu and dmitryax authored Nov 14, 2023
1 parent 9e65a56 commit 6667bd0
Showing 1 changed file with 26 additions and 45 deletions.
71 changes: 26 additions & 45 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,8 @@ func newPersistentContiguousStorage(ctx context.Context, client storage.Client,
}

pcs.initPersistentContiguousStorage(ctx)
notDispatchedReqs := pcs.retrieveNotDispatchedReqs(ctx)

// Make sure the leftover requests are handled
pcs.enqueueNotDispatchedReqs(notDispatchedReqs)
pcs.retrieveAndEnqueueNotDispatchedReqs(ctx)

// Ensure the communication channel has the same size as the queue
// We might already have items here from requeueing non-dispatched requests
Expand Down Expand Up @@ -130,25 +128,6 @@ func (pcs *persistentContiguousStorage) initPersistentContiguousStorage(ctx cont
pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex))
}

func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []any) {
if len(reqs) > 0 {
errCount := 0
for _, req := range reqs {
if req == nil || pcs.put(req) != nil {
errCount++
}
}
if errCount > 0 {
pcs.logger.Error("Errors occurred while moving items for dispatching back to queue",
zap.Int(zapNumberOfItems, len(reqs)), zap.Int(zapErrorCount, errCount))

} else {
pcs.logger.Info("Moved items for dispatching back to queue",
zap.Int(zapNumberOfItems, len(reqs)))
}
}
}

// get returns the request channel that all the requests will be send on
func (pcs *persistentContiguousStorage) get() (QueueRequest, bool) {
for {
Expand Down Expand Up @@ -183,7 +162,11 @@ func (pcs *persistentContiguousStorage) put(req any) error {

pcs.mu.Lock()
defer pcs.mu.Unlock()
return pcs.putInternal(req)
}

// putInternal is the internal version that requires caller to hold the mutex lock.
func (pcs *persistentContiguousStorage) putInternal(req any) error {
if pcs.size() >= pcs.capacity {
pcs.logger.Warn("Maximum queue capacity reached")
return ErrQueueIsFull
Expand Down Expand Up @@ -251,58 +234,52 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) QueueRe
return req
}

// retrieveNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
// and moves the items back to the queue. The function returns an array which might contain nils
// if unmarshalling of the value at a given index was not possible.
func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []any {
// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
// and moves the items at the back of the queue.
func (pcs *persistentContiguousStorage) retrieveAndEnqueueNotDispatchedReqs(ctx context.Context) {
var dispatchedItems []itemIndex

pcs.mu.Lock()
defer pcs.mu.Unlock()

pcs.logger.Debug("Checking if there are items left for dispatch by consumers")
itemKeysBuf, err := pcs.client.Get(ctx, currentlyDispatchedItemsKey)
if err == nil {
dispatchedItems, err = bytesToItemIndexArray(itemKeysBuf)
}
if err != nil {
pcs.logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err))
return nil
return
}

if len(dispatchedItems) > 0 {
pcs.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems, len(dispatchedItems)))
} else {
if len(dispatchedItems) == 0 {
pcs.logger.Debug("No items left for dispatch by consumers")
return
}

reqs := make([]any, len(dispatchedItems))
pcs.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems, len(dispatchedItems)))
retrieveBatch := make([]storage.Operation, len(dispatchedItems))
cleanupBatch := make([]storage.Operation, len(dispatchedItems))
for i, it := range dispatchedItems {
key := getItemKey(it)
retrieveBatch[i] = storage.GetOperation(key)
cleanupBatch[i] = storage.DeleteOperation(key)
}

retrieveErr := pcs.client.Batch(ctx, retrieveBatch...)
cleanupErr := pcs.client.Batch(ctx, cleanupBatch...)

if retrieveErr != nil {
pcs.logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr))
}

if cleanupErr != nil {
pcs.logger.Debug("Failed cleaning items left by consumers", zap.Error(cleanupErr))
}

if retrieveErr != nil {
return reqs
pcs.logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr))
return
}

for i, op := range retrieveBatch {
errCount := 0
for _, op := range retrieveBatch {
if op.Value == nil {
pcs.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
pcs.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
continue
}
req, err := pcs.unmarshaler(op.Value)
Expand All @@ -311,14 +288,18 @@ func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Co
pcs.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))
continue
}
if req == nil {
pcs.logger.Debug("Item value could not be retrieved", zap.String(zapKey, op.Key), zap.Error(err))
} else {
reqs[i] = req
if req == nil || pcs.putInternal(req) != nil {
errCount++
}
}

return reqs
if errCount > 0 {
pcs.logger.Error("Errors occurred while moving items for dispatching back to queue",
zap.Int(zapNumberOfItems, len(retrieveBatch)), zap.Int(zapErrorCount, errCount))
} else {
pcs.logger.Info("Moved items for dispatching back to queue",
zap.Int(zapNumberOfItems, len(retrieveBatch)))
}
}

// itemDispatchingStart appends the item to the list of currently dispatched items
Expand Down

0 comments on commit 6667bd0

Please sign in to comment.