Skip to content

Commit

Permalink
fix(join): Guard message processing with mutex (#5035)
Browse files Browse the repository at this point in the history
  • Loading branch information
scbrickley authored Jul 28, 2022
1 parent 555afcd commit c236288
Showing 1 changed file with 2 additions and 3 deletions.
5 changes: 2 additions & 3 deletions stdlib/join/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (t *MergeJoinTransformation) Dataset() *execute.TransportDataset {
}

func (t *MergeJoinTransformation) ProcessMessage(m execute.Message) error {
t.mu.Lock()
defer t.mu.Unlock()
defer m.Ack()

switch m := m.(type) {
Expand Down Expand Up @@ -149,9 +151,6 @@ func (t *MergeJoinTransformation) initState(state interface{}) (*joinState, bool
// After that, it calls mergeJoin() on the passed-in chunk, which will do as much as it can to produce
// the joined output tables and update the state object accordingly.
func (t *MergeJoinTransformation) processChunk(chunk table.Chunk, state interface{}, id execute.DatasetID) (*joinState, bool, error) {
t.mu.Lock()
defer t.mu.Unlock()

s, ok := t.initState(state)
if !ok {
return nil, false, errors.New(codes.Internal, "invalid join state")
Expand Down

0 comments on commit c236288

Please sign in to comment.