Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Outputhost: Fix offset computations for ack/read levels (#193)
Browse files Browse the repository at this point in the history
* fix level offset

* seqnum

* cr comments
  • Loading branch information
Kiran RG authored May 8, 2017
1 parent 00980c0 commit a96faf0
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 105 deletions.
154 changes: 65 additions & 89 deletions services/outputhost/ackmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,28 @@ const replicatorCallTimeout = 20 * time.Second
type storeHostAddress int64

type (
// internalMsg is the message which is stored locally on the ackMgr
internalMsg struct {
addr storeHostAddress
acked bool
ackIndex uint32

// msgCtx is the message context which is stored locally on the ackMgr
msgCtx struct {
addr storeHostAddress
seqnum common.SequenceNumber
acked bool
}

levels struct {
asOf common.UnixNanoTime // Timestamp when this level was calculated
readLevel common.SequenceNumber // -1 = nothing received, 0 = 1 message (#0) received, etc.
ackLevel common.SequenceNumber // -1 = nothing acked
readLevel ackIndex // index upto which msgs have been read
readLevelSeq common.SequenceNumber // msg seqnum corresponding to readLevel
ackLevel ackIndex // index upto which msgs have been acked
ackLevelSeq common.SequenceNumber // msg seqnum corresponding to the ackLevel
lastAckedSeq common.SequenceNumber // the latest sequence which is acked
}

// ackManager is held per CG extent and it holds the addresses that we get from the store.
ackManager struct {
addrs map[common.SequenceNumber]*internalMsg // ‡
sealed bool // ‡
addrs map[ackIndex]*msgCtx // ‡
sealed bool // ‡
outputHostUUID string
cgUUID string
extUUID string
Expand Down Expand Up @@ -87,7 +92,7 @@ func newAckManager(
committer Committer,
logger bark.Logger) *ackManager {
ackMgr := &ackManager{
addrs: make(map[common.SequenceNumber]*internalMsg),
addrs: make(map[ackIndex]*msgCtx),
cgCache: cgCache,
outputHostUUID: outputHostUUID,
cgUUID: cgUUID,
Expand All @@ -102,8 +107,10 @@ func newAckManager(
}

ackMgr.levels = &levels{
readLevel: common.SequenceNumber(cge.GetAckLevelSeqNo()),
ackLevel: common.SequenceNumber(cge.GetAckLevelSeqNo()),
readLevel: ackIndex(cge.GetAckLevelSeqNo()),
readLevelSeq: common.SequenceNumber(cge.GetAckLevelSeqNo()),
ackLevel: ackIndex(cge.GetAckLevelSeqNo()),
ackLevelSeq: common.SequenceNumber(cge.GetAckLevelSeqNo()),
}

return ackMgr
Expand All @@ -112,75 +119,44 @@ func newAckManager(
// ackID is a string which is a base64 encoded string
// First we get the ackID and store the address locally in our data structure
// for maintaining the ack level
func (ackMgr *ackManager) getNextAckID(address int64, sequence common.SequenceNumber) (ackID string) {
func (ackMgr *ackManager) getNextAckID(address storeHostAddress, sequence common.SequenceNumber) (ackID string) {
ackMgr.lk.Lock()
defer ackMgr.lk.Unlock()

ackMgr.readLevel++ // This means that the first ID is '1'
ackMgr.readLevelSeq++

expectedReadLevel := ackMgr.levelOffset + ackMgr.readLevel
if sequence != ackMgr.readLevelSeq {

// NOTE: sequence should be zero for timer destinations, since they usually have discontinuous sequence numbers
if sequence != 0 && expectedReadLevel != sequence {
skippedMessages := sequence - expectedReadLevel
ackMgr.logger.WithFields(bark.Fields{
`old-readLevelSeq`: ackMgr.readLevelSeq,
`new-readLevelSeq`: sequence,
}).Warn(`adjusting read-level sequence`)

if skippedMessages < 0 {
ackMgr.logger.WithFields(bark.Fields{
`address`: address,
`sequence`: sequence,
`readLevel`: ackMgr.readLevel,
`levelOffset`: ackMgr.levelOffset,
`expectedReadLevel`: expectedReadLevel,
`skippedMessages`: skippedMessages,
}).Error(`negative discontinuity detected (rollback)`)
// Don't update gauge, since negative numbers aren't supported for M3 gauges
} else {
// update gauge here to say we skipped messages (potentially due to retention?)
ackMgr.cgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGSkippedMessages, int64(skippedMessages))
}
// TODO: Support message auditing by creating discontinuity events, rather than just updating the offset here
// TODO: Consumption rates will be spuriously high
// NOTE: Increasing the offset here also affects the AckLevelSeqNo, which will tend to push it into the discontinuity,
// which makes it appear that a retained message was received. This is OK, and it will cause queue depth to react
// to retention events a bit faster.
// NOTE: The same skipped messages can be reported twice if the outputhost is restarted before the ack level
// passes the discontinuity
ackMgr.addLevelOffsetImpl(skippedMessages)
// update gauge here to indicate we skipped messages (potentially due to retention?)
ackMgr.cgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope,
metrics.OutputhostCGSkippedMessages, int64(sequence-ackMgr.readLevelSeq))

ackMgr.readLevelSeq = sequence
}

ackID = common.ConstructAckID(ackMgr.sessionID, ackMgr.ackMgrID, uint32(ackMgr.readLevel), address)
ackID = common.ConstructAckID(ackMgr.sessionID, ackMgr.ackMgrID, uint32(ackMgr.readLevel), int64(address))

// now store the message in the data structure internally
ackMgr.addrs[ackMgr.readLevel] = &internalMsg{
addr: storeHostAddress(address),
ackMgr.addrs[ackMgr.readLevel] = &msgCtx{
addr: address,
seqnum: sequence,
}

// Let the committer know about the new read level
ackMgr.committer.SetReadLevel(CommitterLevel{
seqNo: sequence,
address: storeHostAddress(address),
address: address,
})

ackMgr.lk.Unlock()

return
}

// addLevelOffset adds an offset to the sequence number stored in cassandra. This ensures that our internal records always have
// continguous sequence numbers, but we can adjust the offset when/if the replica connection gets loaded (asynchronously)
func (ackMgr *ackManager) addLevelOffset(offset common.SequenceNumber) {
ackMgr.lk.Lock()
ackMgr.addLevelOffsetImpl(offset)
ackMgr.lk.Unlock()
}

func (ackMgr *ackManager) addLevelOffsetImpl(offset common.SequenceNumber) {
ackMgr.logger.WithFields(bark.Fields{
`oldOffset`: ackMgr.levelOffset,
`newOffset`: ackMgr.levelOffset + offset,
`change`: offset,
}).Info(`adjusting sequence number offset`)
ackMgr.levelOffset += offset
}

func (ackMgr *ackManager) stop() {
close(ackMgr.closeChannel)
ackMgr.doneWG.Wait()
Expand All @@ -194,7 +170,7 @@ func (ackMgr *ackManager) start() {

// getCurrentReadLevel returns the current read-level address and seqnum. this is called
// by extcache when it connects to a new replica, when one stream is disconnected.
func (ackMgr *ackManager) getCurrentReadLevel() (addr int64, seqNo common.SequenceNumber) {
func (ackMgr *ackManager) getCurrentReadLevel() (addr int64, seqnum common.SequenceNumber) {

ackMgr.lk.RLock()
defer ackMgr.lk.RUnlock()
Expand All @@ -206,7 +182,7 @@ func (ackMgr *ackManager) getCurrentReadLevel() (addr int64, seqNo common.Sequen
addr = int64(msg.addr)
}

seqNo = ackMgr.levelOffset + ackMgr.readLevel
seqnum = ackMgr.readLevelSeq

return
}
Expand Down Expand Up @@ -251,30 +227,30 @@ func (ackMgr *ackManager) updateAckLevel() {
ackMgr.lk.Lock()

count := 0
stop := ackMgr.ackLevel + common.SequenceNumber(int64(len(ackMgr.addrs)))
stop := ackMgr.ackLevel + ackIndex(len(ackMgr.addrs))

// We go through the map here and see if the messages are acked,
// moving the acklevel as we go forward.
for curr := ackMgr.ackLevel + 1; curr <= stop; curr++ {
if addrs, ok := ackMgr.addrs[curr]; ok {
if addrs.acked {
update = true
ackMgr.ackLevel = curr
ackLevelAddress = int64(addrs.addr)

// We need to commit every message we see here, since we may have an interleved stream,
// and only the committer knows how to report the level(s). This is true, e.g. for Kafka.
ackMgr.committer.SetCommitLevel(CommitterLevel{
seqNo: ackMgr.ackLevel + ackMgr.levelOffset,
address: addrs.addr,
})

// getCurrentAckLevelOffset needs addr[ackMgr.ackLevel], so delete the previous one if it exists
count++
delete(ackMgr.addrs, curr-1)
} else {
if !addrs.acked {
break
}

update = true
ackMgr.ackLevel = curr
ackLevelAddress = int64(addrs.addr)

// We need to commit every message we see here, since we may have an interleved stream,
// and only the committer knows how to report the level(s). This is true, e.g. for Kafka.
ackMgr.committer.SetCommitLevel(CommitterLevel{
seqNo: addrs.seqnum,
address: addrs.addr,
})

// getCurrentAckLevelOffset needs addr[ackMgr.ackLevel], so delete the previous one if it exists
count++
delete(ackMgr.addrs, curr-1)
}
}

Expand Down Expand Up @@ -333,31 +309,31 @@ func (ackMgr *ackManager) updateAckLevel() {
}
}

func (ackMgr *ackManager) acknowledgeMessage(ackID AckID, seqNum uint32, address int64, isNack bool) error {
func (ackMgr *ackManager) acknowledgeMessage(ackID AckID, seqNum ackIndex, address int64, isNack bool) error {
var err error
notifyCg := true
ackMgr.lk.Lock() // Read lock would be OK in this case (except for a benign race with two simultaneous acks for the same ackID), see below
// check if this id is present
if addrs, ok := ackMgr.addrs[common.SequenceNumber(seqNum)]; ok {
if msg, ok := ackMgr.addrs[seqNum]; ok {
// validate the address from the ackID
if addrs.addr != storeHostAddress(address) {
if msg.addr != storeHostAddress(address) {
ackMgr.logger.WithFields(bark.Fields{
`address`: address,
`expected`: addrs.addr,
`expected`: msg.addr,
}).Error(`ack address does not match!`)
err = errors.New("address of the ackID doesn't match with ackMgr")
notifyCg = false
} else {
if ackMgr.cgCache.cachedCGDesc.GetOwnerEmail() == SmartRetryDisableString {
ackMgr.logger.WithFields(bark.Fields{
`Address`: address,
`addr`: addrs.addr,
`addr`: msg.addr,
common.TagSeq: seqNum,
`isNack`: isNack,
}).Info(`msg ack`)
}
if !isNack {
addrs.acked = true // This is the only place that this field of addrs is changed. It was initially set under a write lock elsewhere, hence we can have a read lock
msg.acked = true // This is the only place that this field of msg is changed. It was initially set under a write lock elsewhere, hence we can have a read lock
// update the last acked sequence, if this is the most recent ack
if ackMgr.lastAckedSeq < common.SequenceNumber(seqNum) {
ackMgr.lastAckedSeq = common.SequenceNumber(seqNum)
Expand Down Expand Up @@ -402,14 +378,14 @@ func (ackMgr *ackManager) manageAckLevel() {

// get the number of acked and unacked messages from the last ack level
func (ackMgr *ackManager) getNumAckedAndUnackedMessages() (*int64, *int64) {
stop := ackMgr.ackLevel + common.SequenceNumber(int64(len(ackMgr.addrs)))
stop := ackMgr.ackLevel + ackIndex(len(ackMgr.addrs))

var acked int64
var unacked int64
// We go through the map here and see if the messages are acked,
for curr := ackMgr.ackLevel + 1; curr <= stop; curr++ {
if addrs, ok := ackMgr.addrs[curr]; ok {
if addrs.acked {
if msg, ok := ackMgr.addrs[curr]; ok {
if msg.acked {
acked++
} else {
unacked++
Expand Down
5 changes: 0 additions & 5 deletions services/outputhost/extcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,6 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
return
}

// successfully opened read stream on the replica; save this index
if startSequence != 0 {
extCache.ackMgr.addLevelOffset(startSequence) // Let ack manager know that the first message received is not sequence zero
}

logger.WithField(`startIndex`, startIndex).Debug(`opened read stream`)
pickedIndex = startIndex
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, Store: %s}`, extUUID, storeUUID)
Expand Down
2 changes: 1 addition & 1 deletion services/outputhost/outputhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (h *OutputHost) processAcks(ackIds []string, isNack bool) (invalidIDs []str
}

// let the ackMgr know; from the perspective of the ackManager, ack == nack
if err = ackMgr.acknowledgeMessage(ackID, seqNum, ackIDObj.Address, isNack); err == nil {
if err = ackMgr.acknowledgeMessage(ackID, ackIndex(seqNum), ackIDObj.Address, isNack); err == nil {
continue
} else {
h.logger.WithFields(bark.Fields{
Expand Down
4 changes: 2 additions & 2 deletions services/outputhost/outputhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func (s *OutputHostSuite) TestOutputAckMgrReset() {
s.mockCons.On("Write", mock.Anything).Return(fmt.Errorf("breaking write pipe"))

// 8. get the ackMgr
var readLevel common.SequenceNumber
var readLevel ackIndex
var ackMgr *ackManager
outputHost.cgMutex.RLock()
if cg, ok := outputHost.cgCache[cgUUID]; ok {
Expand All @@ -994,7 +994,7 @@ func (s *OutputHostSuite) TestOutputAckMgrReset() {
outputHost.Shutdown()

// 9. Make sure we reset the readLevel
var newReadLevel common.SequenceNumber
var newReadLevel ackIndex
ackMgr.lk.RLock()
newReadLevel = ackMgr.readLevel
ackMgr.lk.RUnlock()
Expand Down
16 changes: 8 additions & 8 deletions services/outputhost/replicaconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,28 +225,28 @@ func (conn *replicaConnection) readMessagesPump() {
case store.ReadMessageContentType_MESSAGE:

msg := rmc.GetMessage()
correctSequenceNumber := common.SequenceNumber(msg.Message.GetSequenceNumber())
msgSeqNum := common.SequenceNumber(msg.Message.GetSequenceNumber())

// XXX: Sequence number check to make sure we get monotonically increasing
// sequence number.
// We just log and move forward
// XXX: Note we skip the first message check here because we can start from
// a bigger sequence number in case of restarts
if conn.extCache.destType != shared.DestinationType_TIMER {
if lastSeqNum+1 != int64(correctSequenceNumber) {
if lastSeqNum+1 != int64(msgSeqNum) {
// FIXME: add metric to help alert this case
expectedSeqNum := 1 + lastSeqNum
skippedMessages := int64(correctSequenceNumber) - lastSeqNum
skippedMessages := int64(msgSeqNum) - lastSeqNum

conn.logger.WithFields(bark.Fields{
"correctSequenceNumber": correctSequenceNumber,
"expectedSeqNum": expectedSeqNum,
"skippedMessages": skippedMessages,
"msgSeqNum": msgSeqNum,
"expectedSeqNum": expectedSeqNum,
"skippedMessages": skippedMessages,
}).Error("sequence number out of order")
}
} else {
// T471157 For timers, do not signal discontinuities to ack manager, since discontinuities are frequent
correctSequenceNumber = 0
msgSeqNum = 0
}

// update the lastSeqNum to this value
Expand All @@ -257,7 +257,7 @@ func (conn *replicaConnection) readMessagesPump() {
cMsg.EnqueueTimeUtc = msg.Message.EnqueueTimeUtc
cMsg.Payload = msg.Message.Payload

cMsg.AckId = common.StringPtr(conn.extCache.ackMgr.getNextAckID(msg.GetAddress(), correctSequenceNumber))
cMsg.AckId = common.StringPtr(conn.extCache.ackMgr.getNextAckID(storeHostAddress(msg.GetAddress()), msgSeqNum))
// write the message to the msgsCh so that it can be delivered
// after being stored on the cache.
// 1. either there are no listeners
Expand Down

0 comments on commit a96faf0

Please sign in to comment.