Skip to content

Commit

Permalink
feat(pubsublite): unload idle partition publishers (#7105)
Browse files Browse the repository at this point in the history
Lazily creates partition publishers and unloads them after 5 minutes of inactivity. This avoids connecting gRPC streams to the server for publisher clients that seldom publish.
  • Loading branch information
tmdiep authored Dec 9, 2022
1 parent 28f3572 commit 176f533
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 131 deletions.
6 changes: 3 additions & 3 deletions pubsublite/internal/test/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,11 @@ func (sv *streamVerifiers) Push(v *RPCVerifier) {
sv.verifiers.PushBack(v)
}

func (sv *streamVerifiers) Pop() (*RPCVerifier, error) {
func (sv *streamVerifiers) Pop(key string) (*RPCVerifier, error) {
sv.numStreams++
elem := sv.verifiers.Front()
if elem == nil {
sv.t.Errorf("stream(%d): unexpected connection with no verifiers", sv.numStreams)
sv.t.Errorf("unexpected stream index %d for key %s", sv.numStreams, key)
return nil, status.Error(codes.FailedPrecondition, "mockserver: got unexpected stream connection")
}

Expand Down Expand Up @@ -294,7 +294,7 @@ func (kv *keyedStreamVerifiers) Pop(key string) (*RPCVerifier, error) {
if !ok {
return nil, status.Error(codes.FailedPrecondition, "mockserver: unexpected connection with no configured responses")
}
return sv.Pop()
return sv.Pop(key)
}

func (kv *keyedStreamVerifiers) Flush() {
Expand Down
22 changes: 15 additions & 7 deletions pubsublite/internal/wire/publish_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ var errPublishQueueEmpty = errors.New("pubsublite: received publish response fro
// PublishResultFunc receives the result of a publish.
type PublishResultFunc func(*MessageMetadata, error)

type publishResult struct {
Metadata *MessageMetadata
OnResult PublishResultFunc
}

// messageHolder stores a message to be published, with associated metadata.
type messageHolder struct {
msg *pb.PubSubMessage
Expand Down Expand Up @@ -133,26 +138,29 @@ func (b *publishMessageBatcher) AddBatch(batch *publishBatch) {
b.publishQueue.PushBack(batch)
}

func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) error {
func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) ([]*publishResult, error) {
frontElem := b.publishQueue.Front()
if frontElem == nil {
return errPublishQueueEmpty
return nil, errPublishQueueEmpty
}
if firstOffset < b.minExpectedNextOffset {
return fmt.Errorf("pubsublite: server returned publish response with inconsistent start offset = %d, expected >= %d", firstOffset, b.minExpectedNextOffset)
return nil, fmt.Errorf("pubsublite: server returned publish response with inconsistent start offset = %d, expected >= %d", firstOffset, b.minExpectedNextOffset)
}

batch, _ := frontElem.Value.(*publishBatch)
var results []*publishResult
for i, msgHolder := range batch.msgHolders {
// Messages are ordered, so the offset of each message is firstOffset + i.
mm := &MessageMetadata{Partition: b.partition, Offset: firstOffset + int64(i)}
msgHolder.onResult(mm, nil)
b.availableBufferBytes += msgHolder.size
results = append(results, &publishResult{
Metadata: &MessageMetadata{Partition: b.partition, Offset: firstOffset + int64(i)},
OnResult: msgHolder.onResult,
})
}

b.availableBufferBytes += batch.totalSize
b.minExpectedNextOffset = firstOffset + int64(len(batch.msgHolders))
b.publishQueue.Remove(frontElem)
return nil
return results, nil
}

func (b *publishMessageBatcher) OnPermanentError(err error) {
Expand Down
41 changes: 26 additions & 15 deletions pubsublite/internal/wire/publish_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ func TestPublishBatcherBundlerOnPublishResponse(t *testing.T) {
batcher := newPublishMessageBatcher(&DefaultPublishSettings, partition, receiver.onNewBatch)

t.Run("empty in-flight batches", func(t *testing.T) {
if gotErr, wantErr := batcher.OnPublishResponse(0), errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) {
_, gotErr := batcher.OnPublishResponse(0)
if wantErr := errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("OnPublishResponse() got err: %v, want err: %v", gotErr, wantErr)
}
})
Expand All @@ -297,30 +298,40 @@ func TestPublishBatcherBundlerOnPublishResponse(t *testing.T) {

// Batch 2
msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
pubResult1 := newTestPublishResultReceiver(t, msg1)
pubResult2 := newTestPublishResultReceiver(t, msg2)
pubResult3 := newTestPublishResultReceiver(t, msg3)

batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1, pubResult1), makeMsgHolder(msg2, pubResult2)))
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg3, pubResult3)))
if err := batcher.OnPublishResponse(70); err != nil {
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1), makeMsgHolder(msg2)))
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg3)))

got, err := batcher.OnPublishResponse(70)
if err != nil {
t.Errorf("OnPublishResponse() got err: %v", err)
}
if err := batcher.OnPublishResponse(80); err != nil {
t.Errorf("OnPublishResponse() got err: %v", err)
want := []*publishResult{
{Metadata: &MessageMetadata{Partition: partition, Offset: 70}},
{Metadata: &MessageMetadata{Partition: partition, Offset: 71}},
}
if diff := testutil.Diff(got, want); diff != "" {
t.Errorf("Results got: -, want: +\n%s", diff)
}

pubResult1.ValidateResult(partition, 70)
pubResult2.ValidateResult(partition, 71)
pubResult3.ValidateResult(partition, 80)
got, err = batcher.OnPublishResponse(80)
if err != nil {
t.Errorf("OnPublishResponse() got err: %v", err)
}
want = []*publishResult{
{Metadata: &MessageMetadata{Partition: partition, Offset: 80}},
}
if diff := testutil.Diff(got, want); diff != "" {
t.Errorf("Results got: -, want: +\n%s", diff)
}
})

t.Run("inconsistent offset", func(t *testing.T) {
msg := &pb.PubSubMessage{Data: []byte{'4'}}
pubResult := newTestPublishResultReceiver(t, msg)
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg, pubResult)))
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg)))

if gotErr, wantMsg := batcher.OnPublishResponse(80), "inconsistent start offset = 80"; !test.ErrorHasMsg(gotErr, wantMsg) {
_, gotErr := batcher.OnPublishResponse(80)
if wantMsg := "inconsistent start offset = 80"; !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("OnPublishResponse() got err: %v, want err msg: %q", gotErr, wantMsg)
}
})
Expand Down
162 changes: 136 additions & 26 deletions pubsublite/internal/wire/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ type singlePartitionPublisher struct {
// singlePartitionPublisherFactory creates instances of singlePartitionPublisher
// for given partition numbers.
type singlePartitionPublisherFactory struct {
ctx context.Context
pubClient *vkit.PublisherClient
settings PublishSettings
topicPath string
ctx context.Context
pubClient *vkit.PublisherClient
settings PublishSettings
topicPath string
unloadDelay time.Duration
}

func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPublisher {
Expand Down Expand Up @@ -113,9 +114,6 @@ func (pp *singlePartitionPublisher) Stop() {

// Publish a pub/sub message.
func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) {
pp.mu.Lock()
defer pp.mu.Unlock()

processMessage := func() error {
// Messages are accepted while the service is starting up or active. During
// startup, messages are queued in the batcher and will be published once
Expand All @@ -134,10 +132,17 @@ func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult Publ
return nil
}

pp.mu.Lock()
err := processMessage()
// If the new message cannot be published, flush pending messages and then
// terminate the stream once results are received.
if err := processMessage(); err != nil {
if err != nil {
pp.unsafeInitiateShutdown(serviceTerminating, err)
}
pp.mu.Unlock()

if err != nil {
// Invoke callback without lock held.
onResult(nil, err)
}
}
Expand Down Expand Up @@ -199,24 +204,27 @@ func (pp *singlePartitionPublisher) onNewBatch(batch *publishBatch) {
}

func (pp *singlePartitionPublisher) onResponse(response interface{}) {
pp.mu.Lock()
defer pp.mu.Unlock()

processResponse := func() error {
processResponse := func() ([]*publishResult, error) {
pubResponse, _ := response.(*pb.PublishResponse)
if pubResponse.GetMessageResponse() == nil {
return errInvalidMsgPubResponse
return nil, errInvalidMsgPubResponse
}
firstOffset := pubResponse.GetMessageResponse().GetStartCursor().GetOffset()
if err := pp.batcher.OnPublishResponse(firstOffset); err != nil {
return err
}
pp.unsafeCheckDone()
return nil
return pp.batcher.OnPublishResponse(firstOffset)
}
if err := processResponse(); err != nil {

pp.mu.Lock()
results, err := processResponse()
if err != nil {
pp.unsafeInitiateShutdown(serviceTerminated, err)
}
pp.unsafeCheckDone()
pp.mu.Unlock()

// Invoke callbacks without lock held.
for _, r := range results {
r.OnResult(r.Metadata, nil)
}
}

// unsafeInitiateShutdown must be provided a target serviceStatus, which must be
Expand Down Expand Up @@ -274,6 +282,107 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() {
}
}

// lazyPartitionPublisher lazily creates an underlying singlePartitionPublisher
// and unloads it after a period of inactivity.
type lazyPartitionPublisher struct {
// Immutable after creation.
pubFactory *singlePartitionPublisherFactory
partition int
idleTimer *streamIdleTimer

// Fields below must be guarded with mu.
publisher *singlePartitionPublisher
outstandingMessages int

abstractService
}

func newLazyPartitionPublisher(partition int, pubFactory *singlePartitionPublisherFactory) *lazyPartitionPublisher {
pub := &lazyPartitionPublisher{
pubFactory: pubFactory,
partition: partition,
}
pub.idleTimer = newStreamIdleTimer(pubFactory.unloadDelay, pub.onIdle)
return pub
}

func (lp *lazyPartitionPublisher) Start() {
lp.mu.Lock()
defer lp.mu.Unlock()
lp.unsafeUpdateStatus(serviceActive, nil)
}

func (lp *lazyPartitionPublisher) Stop() {
lp.mu.Lock()
defer lp.mu.Unlock()

lp.idleTimer.Shutdown()
if lp.publisher == nil {
lp.unsafeUpdateStatus(serviceTerminated, nil)
} else if lp.unsafeUpdateStatus(serviceTerminating, nil) {
lp.publisher.Stop()
}
}

func (lp *lazyPartitionPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) {
publisher, err := func() (*singlePartitionPublisher, error) {
lp.mu.Lock()
defer lp.mu.Unlock()

if lp.status >= serviceTerminating {
return nil, ErrServiceStopped
}
if lp.publisher == nil {
lp.publisher = lp.pubFactory.New(lp.partition)
lp.publisher.AddStatusChangeReceiver(lp.Handle(), lp.onStatusChange)
lp.publisher.Start()
}
lp.idleTimer.Stop() // Prevent the underlying publisher from being unloaded
lp.outstandingMessages++
return lp.publisher, nil
}()
if err != nil {
onResult(nil, err)
return
}
// Publish without lock held, as the callback may be invoked inline.
publisher.Publish(msg, func(metadata *MessageMetadata, err error) {
lp.onResult()
onResult(metadata, err)
})
}

func (lp *lazyPartitionPublisher) onStatusChange(handle serviceHandle, status serviceStatus, err error) {
if status >= serviceTerminating {
lp.mu.Lock()
defer lp.mu.Unlock()
lp.unsafeUpdateStatus(status, err)
}
}

func (lp *lazyPartitionPublisher) onResult() {
lp.mu.Lock()
defer lp.mu.Unlock()

lp.outstandingMessages--
if lp.outstandingMessages == 0 {
// Schedule the underlying publisher for unload if no new messages are
// published before the timer expires.
lp.idleTimer.Restart()
}
}

func (lp *lazyPartitionPublisher) onIdle() {
lp.mu.Lock()
defer lp.mu.Unlock()

if lp.outstandingMessages == 0 && lp.publisher != nil {
lp.publisher.RemoveStatusChangeReceiver(lp.Handle())
lp.publisher.Stop()
lp.publisher = nil
}
}

// routingPublisher publishes messages to multiple topic partitions, each
// managed by a singlePartitionPublisher. It supports increasing topic partition
// count, but not decreasing.
Expand All @@ -285,7 +394,7 @@ type routingPublisher struct {

// Fields below must be guarded with mu.
msgRouter messageRouter
publishers []*singlePartitionPublisher
publishers []*lazyPartitionPublisher

compositeService
}
Expand Down Expand Up @@ -319,7 +428,7 @@ func (rp *routingPublisher) onPartitionCountChanged(partitionCount int) {

prevPartitionCount := len(rp.publishers)
for i := prevPartitionCount; i < partitionCount; i++ {
pub := rp.pubFactory.New(i)
pub := newLazyPartitionPublisher(i, rp.pubFactory)
rp.publishers = append(rp.publishers, pub)
rp.unsafeAddServices(pub)
}
Expand All @@ -335,7 +444,7 @@ func (rp *routingPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResul
pub.Publish(msg, onResult)
}

func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePartitionPublisher, error) {
func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*lazyPartitionPublisher, error) {
rp.mu.Lock()
defer rp.mu.Unlock()

Expand Down Expand Up @@ -395,10 +504,11 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa

msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano())))
pubFactory := &singlePartitionPublisherFactory{
ctx: ctx,
pubClient: pubClient,
settings: settings,
topicPath: topicPath,
ctx: ctx,
pubClient: pubClient,
settings: settings,
topicPath: topicPath,
unloadDelay: time.Minute * 5,
}
return newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory), nil
}
Loading

0 comments on commit 176f533

Please sign in to comment.