Skip to content

Commit

Permalink
merge: use unbuffered channels to force task completion
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdecaf committed Mar 15, 2024
1 parent 1003092 commit 4ce69e4
Showing 1 changed file with 36 additions and 44 deletions.
80 changes: 36 additions & 44 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,8 @@ type MergeDirOptions struct {
ValidateOptsExtension string

// ParseWorkers is the concurrent number of ACH file reader/parser goroutines
// Default: 50
// Default: 10
ParseWorkers int

// DiscoveredPathsQueueDepth is the buffer size of discovered paths to merge
// Default: ParseWorkers * 2
DiscoveredPathsQueueDepth int

// MergableFilesQueueDepth is the buffer size of parsed files to merge
// Default: ParseWorkers * 2
MergableFilesQueueDepth int
}

// DefaultFileAcceptor is the default logic for which file extensions to merge and how to read them.
Expand Down Expand Up @@ -197,6 +189,7 @@ func MergeDir(dir string, conditions Conditions, opts *MergeDirOptions) ([]*File
}
if opts.FS == nil {
opts.FS = os.DirFS(dir)
dir = "."
}

sorted := &outFile{}
Expand All @@ -217,63 +210,62 @@ func MergeDir(dir string, conditions Conditions, opts *MergeDirOptions) ([]*File
parseWorkers = opts.ParseWorkers
}

discoveredPathsDepth := parseWorkers * 2
if opts.DiscoveredPathsQueueDepth > 0 {
discoveredPathsDepth = opts.DiscoveredPathsQueueDepth
}
discoveredPaths := make(chan string, discoveredPathsDepth)

mergableFilesDepth := parseWorkers * 2
if opts.MergableFilesQueueDepth > 0 {
mergableFilesDepth = opts.MergableFilesQueueDepth
}
mergableFiles := make(chan *File, mergableFilesDepth)

ctx, cancelFunc := context.WithCancel(context.Background())
discoveredPaths := make(chan string)
mergableFiles := make(chan *File)

// We are going to scan the directory for files to parse and merge.
pathsCtx, pathsCancelFunc := context.WithCancel(context.Background())

var pathsGroup sync.WaitGroup
pathsGroup.Add(1)
g.Go(func() error {
defer func() {
// After we're done reading paths close the channel
cancelFunc()
close(discoveredPaths)
pathsGroup.Done()
}()

return walkDir(opts.FS, dir, discoveredPaths)
})
g.Go(func() error {
pathsGroup.Wait()
pathsCancelFunc()
return nil
})

// Setup concurrent ACH file parsers which is typically the longest part of merging.
var wg sync.WaitGroup
wg.Add(parseWorkers)
parsingCtx, parsingCancelFunc := context.WithCancel(context.Background())

var parsingGroup sync.WaitGroup
parsingGroup.Add(parseWorkers)
for i := 0; i < parseWorkers; i++ {
g.Go(func() error {
defer wg.Done()
defer parsingGroup.Done()

return queueFileForMerging(ctx, discoveredPaths, &setup, sorted, mergableFiles, opts)
return queueFileForMerging(pathsCtx, discoveredPaths, &setup, sorted, mergableFiles, opts)
})
}
g.Go(func() error {
wg.Wait()

// Sending a nil file is the signal to stop merging
mergableFiles <- nil
close(mergableFiles)

parsingGroup.Wait()
parsingCancelFunc()
return nil
})

// Merge ACH files into the final output
g.Go(func() error {
for {
file := <-mergableFiles
if file == nil {
return nil
}
select {
case file := <-mergableFiles:
if file == nil {
continue
}

// accumulate the file into our merged set
err := sorted.add(file)
if err != nil {
return fmt.Errorf("adding file into merged set failed: %w", err)
// accumulate the file into our merged set
err := sorted.add(file)
if err != nil {
return fmt.Errorf("adding file into merged set failed: %w", err)
}

case <-parsingCtx.Done():
return nil
}
}
})
Expand Down Expand Up @@ -316,7 +308,7 @@ func queueFileForMerging(ctx context.Context, discoveredPaths chan string, setup
select {
case path := <-discoveredPaths:
if path == "" {
return nil
continue
}

var file *File
Expand Down

0 comments on commit 4ce69e4

Please sign in to comment.