Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Add more logs when PutMessage latency is high
Browse files Browse the repository at this point in the history
  • Loading branch information
kobeyang committed Jul 20, 2017
1 parent 24ebf96 commit f666c5f
Showing 1 changed file with 21 additions and 9 deletions.
30 changes: 21 additions & 9 deletions services/inputhost/pubconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,10 @@ func (conn *pubConnection) failInflightMessages(inflightMessages map[string]resp
common.TagDstPth: common.FmtDstPth(conn.destinationPath),
common.TagInPutAckID: common.FmtInPutAckID(id),
`duration`: d,
}).Debug(`failInflightMessages: publish message latency`)
`putMsgChanLen`: len(conn.putMsgCh),
`putMsgAckChanLen`: len(conn.ackChannel),
`replyChanLen`: len(conn.replyCh),
}).Error(`failInflightMessages: publish message latency`)
}
// Record the number of failed messages
conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageFailures)
Expand Down Expand Up @@ -606,10 +609,14 @@ func (conn *pubConnection) updateInflightMap(inflightMessages map[string]respons
conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageBeforeAckLatency, ackReceiveTime.Sub(resp.putMsgRecvTime))

if d > timeLatencyToLog {
conn.logger.
WithField(common.TagDstPth, common.FmtDstPth(conn.destinationPath)).
WithField(common.TagInPutAckID, common.FmtInPutAckID(ackID)).
WithField(`d`, d).Info(`publish message latency at updateInflightMap`)
conn.logger.WithFields(bark.Fields{
common.TagDstPth: common.FmtDstPth(conn.destinationPath),
common.TagInPutAckID: common.FmtInPutAckID(ackID),
`d`: d,
`putMsgChanLen`: len(conn.putMsgCh),
`putMsgAckChanLen`: len(conn.ackChannel),
`replyChanLen`: len(conn.replyCh),
}).Info(`publish message latency at updateInflightMap`)
}
delete(inflightMessages, ackID)
return ok
Expand All @@ -625,10 +632,15 @@ func (conn *pubConnection) updateEarlyReplyAcks(resCh response, earlyReplyAcks m
ack, _ := earlyReplyAcks[resCh.ackID]
actualDuration := d - time.Since(ack.ackSentTime)
if d > timeLatencyToLog {
conn.logger.
WithField(common.TagDstPth, common.FmtDstPth(conn.destinationPath)).
WithField(common.TagInPutAckID, common.FmtInPutAckID(resCh.ackID)).
WithFields(bark.Fields{`d`: d, `actualDuration`: actualDuration}).Info(`publish message latency at updateEarlyReplyAcks and actualDuration`)
conn.logger.WithFields(bark.Fields{
common.TagDstPth: common.FmtDstPth(conn.destinationPath),
common.TagInPutAckID: common.FmtInPutAckID(resCh.ackID),
`d`: d,
`actualDuration`: actualDuration,
`putMsgChanLen`: len(conn.putMsgCh),
`putMsgAckChanLen`: len(conn.ackChannel),
`replyChanLen`: len(conn.replyCh),
}).Info(`publish message latency at updateEarlyReplyAcks and actualDuration`)
}
conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageLatency, actualDuration)
conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageLatency, actualDuration)
Expand Down

0 comments on commit f666c5f

Please sign in to comment.