From ead18d3c10af17c7ec51bfc755f2c5c9880a94e6 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 24 Dec 2024 15:00:42 -0700 Subject: [PATCH] kgo: broadcast batch finishes in one big blast Previously, we would broadcast after every individual record finished in a batch. If we produced 5000 records concurrently but only allowed 1000 in flight, we would first send a few large batches, and then once they started finishing, we would: * Finish a record * Wake up a waiting record * That record would race into producing * The producing wouldn't linger even if we wanted to, because other records are blocked * That record would be produced alone in one batch immediately * Goto start This forced unnecessary synchronization and caused things to produce slowly. By doing one broadcast at the end of a batch, we actually give more "space" for the client to buffer before waking up everything waiting. The first goroutine awoken will still produce a small batch, _but_ we will reach a point where the client is buffering larger batches. Overall, this speeds things up for a niche case and is not detrimental in any way to the non-niche case. --- pkg/kgo/producer.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 1ea2a29f..f1f06b7a 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -549,6 +549,12 @@ func (p *producer) promiseRecordBeforeBuf(pr promisedRec, err error) { func (p *producer) finishPromises(b batchPromise) { cl := p.cl var more bool + var broadcast bool + defer func() { + if broadcast { + p.c.Broadcast() + } + }() start: p.promisesMu.Lock() for i, pr := range b.recs { @@ -558,7 +564,8 @@ start: pr.ProducerID = b.pid pr.ProducerEpoch = b.epoch pr.Attrs = b.attrs - cl.finishRecordPromise(pr, b.err, b.beforeBuf) + recBroadcast := cl.finishRecordPromise(pr, b.err, b.beforeBuf) + broadcast = broadcast || recBroadcast b.recs[i] = promisedRec{} } p.promisesMu.Unlock() @@ -572,7 +579,7 @@ start: } } -func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering bool) { +func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering bool) (broadcast bool) { p := &cl.producer if p.hooks != nil && len(p.hooks.unbuffered) > 0 { @@ -599,12 +606,10 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering p.mu.Lock() p.bufferedBytes -= userSize p.bufferedRecords-- - broadcast := p.blocked.Load() > 0 || p.bufferedRecords == 0 && p.flushing.Load() > 0 + broadcast = p.blocked.Load() > 0 || p.bufferedRecords == 0 && p.flushing.Load() > 0 p.mu.Unlock() - if broadcast { - p.c.Broadcast() - } + return broadcast } // partitionRecord loads the partitions for a topic and produce to them. If