Skip to content

Commit

Permalink
Merge pull request lightninglabs#210 from halseth/venue-isolation-3
Browse files Browse the repository at this point in the history
venue: ignore all messages from traders not part of the batch
  • Loading branch information
wpaulino authored Oct 2, 2020
2 parents 5b0ac12 + eb643c7 commit 73cc371
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 65 deletions.
51 changes: 48 additions & 3 deletions venue/batch_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ func (b *BatchExecutor) executor() {
defer b.wg.Done()

var (
exeState ExecutionState
exeState = NoActiveBatch
env environment
)

Expand All @@ -1094,6 +1094,18 @@ func (b *BatchExecutor) executor() {
// state machine until either we finish the batch, or end up at
// the same start as before.
case event := <-b.venueEvents:
// If this is a message from a trader that's not part
// of this current batch (or there is no current
// batch), then we'll ignore it.
if m, ok := event.(*msgRecvEvent); ok {
src := m.msg.Src()
if !env.traderPartOfBatch(src) {
log.Warnf("Ignoring message from "+
"trader=%x, not part of batch",
src)
continue
}
}

var err error
out:
Expand All @@ -1112,7 +1124,26 @@ func (b *BatchExecutor) executor() {

env.cancel()
env = environment{}

// Error was encountered during batch
// execution, go back to NoActiveBatch
// state.
exeState = NoActiveBatch
log.Infof("Error during batch "+
"execution: %v. State "+
"transition: %v -> %v", err,
priorState, exeState)

err := b.cfg.Store.UpdateExecutionState(
exeState,
)
if err != nil {
log.Errorf("unable to update "+
"execution state: %v",
err)
break out
}

break out
}

Expand Down Expand Up @@ -1144,8 +1175,24 @@ func (b *BatchExecutor) executor() {

env.cancel()
env = environment{}

// Now that the batch was completed, we
// reset the state machine by
// transitioning back to NoActiveState.
exeState = NoActiveBatch
log.Infof("Batch execution completed. "+
"State transition: %v -> %v",
BatchComplete, exeState)

err := b.cfg.Store.UpdateExecutionState(
exeState,
)
if err != nil {
log.Errorf("unable to update "+
"execution state: %v",
err)
break out
}
break out
}
}
Expand All @@ -1157,8 +1204,6 @@ func (b *BatchExecutor) executor() {
log.Infof("New OrderBatch(id=%x)",
newBatch.exeCtx.BatchID)

exeState = NoActiveBatch

msgTimers := newMsgTimers(b.cfg.TraderMsgTimeout)
env = newEnvironment(newBatch, msgTimers)

Expand Down
Loading

0 comments on commit 73cc371

Please sign in to comment.