Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/ledger: Fix memory leak in HistoryDBSource #2548

Merged
merged 5 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

* Add `last_modified_time` to account responses. `last_modified_time` is the
closing time of the most recent ledger in which the account was modified.
* Fix a memory leak in the code responsible for streaming [#2548](https://github.com/stellar/go/pull/2548).

## v1.2.1

Expand Down
24 changes: 19 additions & 5 deletions services/horizon/internal/ledger/ledger_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
)

// Source exposes two helpers methods to help you find out the current
// ledger and yield every time there is a new ledger.
// ledger and yield every time there is a new ledger. Call `Close` when
// source is no longer used.
type Source interface {
CurrentLedger() uint32
NextLedger(currentSequence uint32) chan uint32
Close()
}

type currentStateFunc func() State
Expand All @@ -19,23 +21,24 @@ type currentStateFunc func() State
type HistoryDBSource struct {
updateFrequency time.Duration
currentState currentStateFunc
closed bool
}

// NewHistoryDBSource constructs a new instance of HistoryDBSource
func NewHistoryDBSource(updateFrequency time.Duration) HistoryDBSource {
return HistoryDBSource{
func NewHistoryDBSource(updateFrequency time.Duration) *HistoryDBSource {
return &HistoryDBSource{
updateFrequency: updateFrequency,
currentState: CurrentState,
}
}

// CurrentLedger returns the current ledger.
func (source HistoryDBSource) CurrentLedger() uint32 {
func (source *HistoryDBSource) CurrentLedger() uint32 {
return source.currentState().ExpHistoryLatest
}

// NextLedger returns a channel which yields every time there is a new ledger with a sequence number larger than currentSequence.
func (source HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 {
func (source *HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 {
// Make sure this is buffered channel of size 1. Otherwise, the go routine below
// will never return if `newLedgers` channel is not read. From Effective Go:
// > If the channel is unbuffered, the sender blocks until the receiver has received the value.
Expand All @@ -46,6 +49,10 @@ func (source HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 {
time.Sleep(source.updateFrequency)
}

if source.closed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another go routine may write to source.closed so I think technically the behavior is undefined. I think it would be a good idea to use a lock or use a context to signal cancelation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I was under the impression that single read/single write is totally fine but https://golang.org/ref/mem is clear that it's not. Fixed by using explicit synchronization.

return
}

currentLedgerState := source.currentState()
if currentLedgerState.ExpHistoryLatest > currentSequence {
newLedgers <- currentLedgerState.ExpHistoryLatest
Expand All @@ -57,6 +64,11 @@ func (source HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 {
return newLedgers
}

// Close closes the internal go routines.
func (source *HistoryDBSource) Close() {
source.closed = true
}

// TestingSource is helper struct which implements the LedgerSource
// interface.
type TestingSource struct {
Expand Down Expand Up @@ -106,3 +118,5 @@ func (source *TestingSource) NextLedger(currentSequence uint32) chan uint32 {

return response
}

func (source *TestingSource) Close() {}
15 changes: 11 additions & 4 deletions services/horizon/internal/render/sse/stream_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
"github.com/stellar/throttled"
)

type LedgerSourceFactory interface {
Get() ledger.Source
}

// StreamHandler represents a stream handling action
type StreamHandler struct {
RateLimiter *throttled.HTTPRateLimiter
LedgerSource ledger.Source
RateLimiter *throttled.HTTPRateLimiter
LedgerSourceFactory LedgerSourceFactory
}

// GenerateEventsFunc generates a slice of sse.Event which are sent via
Expand All @@ -30,7 +34,10 @@ func (handler StreamHandler) ServeStream(
stream := NewStream(ctx, w)
stream.SetLimit(limit)

currentLedgerSequence := handler.LedgerSource.CurrentLedger()
ledgerSource := handler.LedgerSourceFactory.Get()
defer ledgerSource.Close()

currentLedgerSequence := ledgerSource.CurrentLedger()
for {
// Rate limit the request if it's a call to stream since it queries the DB every second. See
// https://github.com/stellar/go/issues/715 for more details.
Expand Down Expand Up @@ -71,7 +78,7 @@ func (handler StreamHandler) ServeStream(
stream.Init()

select {
case currentLedgerSequence = <-handler.LedgerSource.NextLedger(currentLedgerSequence):
case currentLedgerSequence = <-ledgerSource.NextLedger(currentLedgerSequence):
continue
case <-ctx.Done():
stream.Done()
Expand Down
12 changes: 9 additions & 3 deletions services/horizon/internal/render/sse/stream_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@ import (
"github.com/stellar/go/services/horizon/internal/ledger"
)

type testingFactory struct {
ledgerSource ledger.Source
}

func (f *testingFactory) Get() ledger.Source {
return f.ledgerSource
}

func TestSendByeByeOnContextDone(t *testing.T) {
ledgerSource := ledger.NewTestingSource(1)
handler := StreamHandler{
LedgerSource: ledgerSource,
}
handler := StreamHandler{LedgerSourceFactory: &testingFactory{ledgerSource}}

r, err := http.NewRequest("GET", "http://localhost", nil)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions services/horizon/internal/stream_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ import (
"github.com/stellar/go/support/render/hal"
)

type testingFactory struct {
ledgerSource ledger.Source
}

func (f *testingFactory) Get() ledger.Source {
return f.ledgerSource
}

// StreamTest utility struct to wrap SSE related tests.
type StreamTest struct {
ledgerSource *ledger.TestingSource
Expand Down Expand Up @@ -64,7 +72,7 @@ func NewStreamablePageTest(
) *StreamTest {
ledgerSource := ledger.NewTestingSource(currentLedger)
action.ledgerSource = ledgerSource
streamHandler := sse.StreamHandler{LedgerSource: ledgerSource}
streamHandler := sse.StreamHandler{LedgerSourceFactory: &testingFactory{ledgerSource}}
handler := streamablePageHandler(action, streamHandler)

return newStreamTest(
Expand All @@ -85,7 +93,7 @@ func NewStreamableObjectTest(
) *StreamTest {
ledgerSource := ledger.NewTestingSource(currentLedger)
action.ledgerSource = ledgerSource
streamHandler := sse.StreamHandler{LedgerSource: ledgerSource}
streamHandler := sse.StreamHandler{LedgerSourceFactory: &testingFactory{ledgerSource}}
handler := streamableObjectActionHandler{action: action, limit: limit, streamHandler: streamHandler}

return newStreamTest(
Expand Down
12 changes: 10 additions & 2 deletions services/horizon/internal/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ func (w *web) mustInstallMiddlewares(app *App, connTimeout time.Duration) {
w.internalRouter.Use(loggerMiddleware)
}

type historyLedgerSourceFactory struct {
updateFrequency time.Duration
}

func (f historyLedgerSourceFactory) Get() ledger.Source {
return ledger.NewHistoryDBSource(f.updateFrequency)
}

// mustInstallActions installs the routing configuration of horizon onto the
// provided app. All route registration should be implemented here.
func (w *web) mustInstallActions(
Expand All @@ -151,8 +159,8 @@ func (w *web) mustInstallActions(
r.Get("/", RootAction{}.Handle)

streamHandler := sse.StreamHandler{
RateLimiter: w.rateLimiter,
LedgerSource: ledger.NewHistoryDBSource(w.sseUpdateFrequency),
RateLimiter: w.rateLimiter,
LedgerSourceFactory: historyLedgerSourceFactory{updateFrequency: w.sseUpdateFrequency},
}

// State endpoints behind stateMiddleware
Expand Down