Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Restart Stellar-Core when it's context is cancelled #4192

Merged
merged 5 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,14 @@ func (c *CaptiveStellarCore) IsPrepared(ctx context.Context, ledgerRange Range)
}

func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
if c.isClosed() {
if c.closed {
return false
}

if c.stellarCoreRunner == nil {
return false
}
if c.closed {
if c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil {
return false
}

lastLedger := uint32(0)
if c.lastLedger != nil {
lastLedger = *c.lastLedger
Expand Down Expand Up @@ -464,7 +462,7 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd
return *c.cachedMeta, nil
}

if c.isClosed() {
if c.closed {
return xdr.LedgerCloseMeta{}, errors.New("stellar-core is no longer usable")
}

Expand Down Expand Up @@ -607,7 +605,7 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
c.stellarCoreLock.RLock()
defer c.stellarCoreLock.RUnlock()

if c.isClosed() {
if c.closed {
return 0, errors.New("stellar-core is no longer usable")
}
if c.prepared == nil {
Expand All @@ -626,10 +624,6 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
return *c.lastLedger, nil
}

func (c *CaptiveStellarCore) isClosed() bool {
return c.closed
}
Comment on lines -629 to -631
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we simplify this to include the above condition, instead? In other words,

func (c *CaptiveStellarCore) isClosed() bool {
	return c.closed || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil
}

Basically, is there a situation in which the instance would not be closed and the runner wouldn't exist? Intuitively seems like no, but I'm not strong on this part of the codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding extra conditions to isClosed won't work. In #4088 we wanted to separate two cases:

  • CaptiveCoreBackend closed - means no usable, you need to create a new instance.
  • stellarCoreRunner closed - means it just needs to be restarted by calling PrepareRange.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for clarifying! LGTM 👍


// Close closes existing Stellar-Core process, streaming sessions and removes all
// temporary files. Note, once a CaptiveStellarCore instance is closed it can no longer be used and
// all subsequent calls to PrepareRange(), GetLedger(), etc will fail.
Expand Down
44 changes: 36 additions & 8 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,12 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once()
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Twice()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil)
mockRunner.On("getProcessExitError").Return(true, nil)
mockRunner.On("getProcessExitError").Return(false, nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -313,6 +316,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("close").Return(fmt.Errorf("transient error"))
mockRunner.On("context").Return(ctx)

captiveBackend := CaptiveStellarCore{
nextLedger: 300,
Expand Down Expand Up @@ -603,12 +607,12 @@ func TestCaptiveGetLedger(t *testing.T) {

ledgerRange := BoundedRange(65, 66)
tt.False(captiveBackend.isPrepared(ledgerRange), "core is not prepared until explicitly prepared")
tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)
err = captiveBackend.PrepareRange(ctx, ledgerRange)
assert.NoError(t, err)

tt.True(captiveBackend.isPrepared(ledgerRange))
tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)

_, err = captiveBackend.GetLedger(ctx, 64)
tt.Error(err, "requested ledger 64 is behind the captive core stream (expected=66)")
Expand All @@ -634,12 +638,12 @@ func TestCaptiveGetLedger(t *testing.T) {
tt.NoError(err)

tt.False(captiveBackend.isPrepared(ledgerRange))
tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)
_, err = captiveBackend.GetLedger(ctx, 66)
tt.NoError(err)

// core is not closed unless it's explicitly closed
tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)

mockArchive.AssertExpectations(t)
mockRunner.AssertExpectations(t)
Expand Down Expand Up @@ -903,14 +907,14 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) {
tt.NoError(err)
tt.Equal(xdr.Uint32(65), meta.V0.LedgerHeader.Header.LedgerSeq)

tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)

// try reading from an empty buffer
_, err = captiveBackend.GetLedger(ctx, 66)
tt.EqualError(err, "unmarshalling error")

// not closed even if there is an error getting ledger
tt.False(captiveBackend.isClosed())
tt.False(captiveBackend.closed)

mockArchive.AssertExpectations(t)
mockRunner.AssertExpectations(t)
Expand Down Expand Up @@ -997,7 +1001,7 @@ func TestCaptiveAfterClose(t *testing.T) {
assert.NoError(t, err)

assert.NoError(t, captiveBackend.Close())
assert.True(t, captiveBackend.isClosed())
assert.True(t, captiveBackend.closed)

_, err = captiveBackend.GetLedger(ctx, boundedRange.to)
assert.EqualError(t, err, "stellar-core is no longer usable")
Expand Down Expand Up @@ -1340,6 +1344,30 @@ func TestCaptiveIsPrepared(t *testing.T) {
}
}

// TestCaptiveIsPreparedCoreContextCancelled checks if IsPrepared returns false
// if the stellarCoreRunner.context() is cancelled. This can happen when
// stellarCoreRunner was closed, ex. when binary file was updated.
func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
ctx, cancel := context.WithCancel(context.Background())
mockRunner.On("context").Return(ctx).Maybe()

rang := UnboundedRange(100)
captiveBackend := CaptiveStellarCore{
nextLedger: 100,
prepared: &rang,
stellarCoreRunner: mockRunner,
}

result := captiveBackend.isPrepared(UnboundedRange(100))
assert.True(t, result)

cancel()

result = captiveBackend.isPrepared(UnboundedRange(100))
assert.False(t, result)
}

// TestCaptivePreviousLedgerCheck checks if previousLedgerHash is set in PrepareRange
// and then checked and updated in GetLedger.
func TestCaptivePreviousLedgerCheck(t *testing.T) {
Expand Down