Skip to content

Commit

Permalink
ingest/ledgerbackend: Restart Stellar-Core when it's context is cance…
Browse files Browse the repository at this point in the history
…lled (#4192)

After refactoring in #4088 (and as a result of my wrong comment:
#4088#discussion_r764394296) the `CaptiveCoreBackend.isPrepared` method returned
`true` if `stellarCoreRunner` process was shutdown without calling `close()` -
so in case of binary update but also in case of Stellar-Core crash.

This commit fixes this bug by checking if `stellarCoreRunner` context was
cancelled (meaning Stellar-Core is closed or closing but not as a result of
`close` call). I also removed `isClose` method because it was simply checking
`close` variable. All test changes are only adding an extra `context()` call
mocks because `isPrepared` now calls it.
  • Loading branch information
bartekn authored Jan 28, 2022
1 parent d3ce98c commit a947f2f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
16 changes: 5 additions & 11 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,14 @@ func (c *CaptiveStellarCore) IsPrepared(ctx context.Context, ledgerRange Range)
}

func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
if c.isClosed() {
if c.closed {
return false
}

if c.stellarCoreRunner == nil {
return false
}
if c.closed {
if c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil {
return false
}

lastLedger := uint32(0)
if c.lastLedger != nil {
lastLedger = *c.lastLedger
Expand Down Expand Up @@ -464,7 +462,7 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd
return *c.cachedMeta, nil
}

if c.isClosed() {
if c.closed {
return xdr.LedgerCloseMeta{}, errors.New("stellar-core is no longer usable")
}

Expand Down Expand Up @@ -607,7 +605,7 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
c.stellarCoreLock.RLock()
defer c.stellarCoreLock.RUnlock()

if c.isClosed() {
if c.closed {
return 0, errors.New("stellar-core is no longer usable")
}
if c.prepared == nil {
Expand All @@ -626,10 +624,6 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
return *c.lastLedger, nil
}

func (c *CaptiveStellarCore) isClosed() bool {
return c.closed
}

// Close closes existing Stellar-Core process, streaming sessions and removes all
// temporary files. Note, once a CaptiveStellarCore instance is closed it can no longer be used and
// all subsequent calls to PrepareRange(), GetLedger(), etc will fail.
Expand Down
44 changes: 36 additions & 8 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,12 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once()
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Twice()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil)
mockRunner.On("getProcessExitError").Return(true, nil)
mockRunner.On("getProcessExitError").Return(false, nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -313,6 +316,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("close").Return(fmt.Errorf("transient error"))
mockRunner.On("context").Return(ctx)

captiveBackend := CaptiveStellarCore{
nextLedger: 300,
Expand Down Expand Up @@ -603,12 +607,12 @@ func TestCaptiveGetLedger(t *testing.T) {

ledgerRange := BoundedRange(65, 66)
tt.False(captiveBackend.isPrepared(ledgerRange), "core is not prepared until explicitly prepared")
tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)
err = captiveBackend.PrepareRange(ctx, ledgerRange)
assert.NoError(t, err)

tt.True(captiveBackend.isPrepared(ledgerRange))
tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)

_, err = captiveBackend.GetLedger(ctx, 64)
tt.Error(err, "requested ledger 64 is behind the captive core stream (expected=66)")
Expand All @@ -634,12 +638,12 @@ func TestCaptiveGetLedger(t *testing.T) {
tt.NoError(err)

tt.False(captiveBackend.isPrepared(ledgerRange))
tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)
_, err = captiveBackend.GetLedger(ctx, 66)
tt.NoError(err)

// core is not closed unless it's explicitly closed
tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)

mockArchive.AssertExpectations(t)
mockRunner.AssertExpectations(t)
Expand Down Expand Up @@ -903,14 +907,14 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) {
tt.NoError(err)
tt.Equal(xdr.Uint32(65), meta.V0.LedgerHeader.Header.LedgerSeq)

tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)

// try reading from an empty buffer
_, err = captiveBackend.GetLedger(ctx, 66)
tt.EqualError(err, "unmarshalling error")

// not closed even if there is an error getting ledger
tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)

mockArchive.AssertExpectations(t)
mockRunner.AssertExpectations(t)
Expand Down Expand Up @@ -997,7 +1001,7 @@ func TestCaptiveAfterClose(t *testing.T) {
assert.NoError(t, err)

assert.NoError(t, captiveBackend.Close())
assert.True(t, captiveBackend.isClosed())
assert.True(t, captiveBackend.closed)

_, err = captiveBackend.GetLedger(ctx, boundedRange.to)
assert.EqualError(t, err, "stellar-core is no longer usable")
Expand Down Expand Up @@ -1340,6 +1344,30 @@ func TestCaptiveIsPrepared(t *testing.T) {
}
}

// TestCaptiveIsPreparedCoreContextCancelled checks if IsPrepared returns false
// if the stellarCoreRunner.context() is cancelled. This can happen when
// stellarCoreRunner was closed, ex. when binary file was updated.
func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
ctx, cancel := context.WithCancel(context.Background())
mockRunner.On("context").Return(ctx).Maybe()

rang := UnboundedRange(100)
captiveBackend := CaptiveStellarCore{
nextLedger: 100,
prepared: &rang,
stellarCoreRunner: mockRunner,
}

result := captiveBackend.isPrepared(UnboundedRange(100))
assert.True(t, result)

cancel()

result = captiveBackend.isPrepared(UnboundedRange(100))
assert.False(t, result)
}

// TestCaptivePreviousLedgerCheck checks if previousLedgerHash is set in PrepareRange
// and then checked and updated in GetLedger.
func TestCaptivePreviousLedgerCheck(t *testing.T) {
Expand Down

0 comments on commit a947f2f

Please sign in to comment.