From 8a99e2c7a322dd570c4b303a64df72e349846549 Mon Sep 17 00:00:00 2001 From: Qi Xiao Date: Tue, 10 Sep 2024 17:29:08 +0100 Subject: [PATCH] pkg/eval: Simplify how the lifecycle of ports are tracked. The lifecycle of a port is often tied to the execution of a form - for example, in "echo > out", port 1 is redirected to a file that should be closed when the form finishes execution. Previously, this is tracked with the following scheme: - The port field has two fields - closeFile and closeChan - indicating whether the file and chan components should be closed when the form finishes execution. - However, a port could be used in multiple forms: for example, in { echo foo; echo bar } > out The same port 1 is used for both echo commands, but the file in it should only be closed when the overall form finishes execution. - To handle this correct, the idea of "forking" a port is introduced - the forked ports always have their closeFile and closeChan set to false. Similarly, when a Frame gets "forked", all its ports are forked as well. This commit switches to a different approach based on the observation that there are only two places that care about closeFile and closeChan: pipelineOp.exec and redirOp.exec, and they are very close in the call frame (with only formOp.exec between them). So instead of embedding the ownership information to the port struct, track it in a separate formOwnedPort struct, and thread a *[]formOwnedPort as an additional argument of pipelineOp.exec, formOp.exec and redirOp.exec. This allows us to get rid of concept of "forking" a port. The Frame.Fork method still exists because it needs to make a shallow copy of the ports slice, but the implementation is now simpler. --- pkg/eval/builtin_fn_flow.go | 2 - pkg/eval/compile_effect.go | 84 ++++++++++++++++++++++++------------- pkg/eval/eval.go | 2 +- pkg/eval/frame.go | 28 +++---------- pkg/eval/port.go | 29 +++---------- pkg/mods/re/re.go | 1 - 6 files changed, 65 insertions(+), 81 deletions(-) diff --git a/pkg/eval/builtin_fn_flow.go b/pkg/eval/builtin_fn_flow.go index b71bf84d4..8c931c45f 100644 --- a/pkg/eval/builtin_fn_flow.go +++ b/pkg/eval/builtin_fn_flow.go @@ -61,7 +61,6 @@ func each(fm *Frame, f Callable, inputs Inputs) error { } newFm := fm.Fork() ex := f.Call(newFm, []any{v}, NoOpts) - newFm.Close() if ex != nil { switch Reason(ex) { @@ -111,7 +110,6 @@ func peach(fm *Frame, opts peachOpt, f Callable, inputs Inputs) error { newFm := fm.Fork() newFm.ports[0] = DummyInputPort ex := f.Call(newFm, []any{v}, NoOpts) - newFm.Close() if ex != nil { switch Reason(ex) { diff --git a/pkg/eval/compile_effect.go b/pkg/eval/compile_effect.go index d59da0649..7910c038f 100644 --- a/pkg/eval/compile_effect.go +++ b/pkg/eval/compile_effect.go @@ -66,6 +66,22 @@ func (cp *compiler) pipelineOp(n *parse.Pipeline) *pipelineOp { const pipelineChanBufferSize = 32 +// Keeps track of whether the File and Port parts of a port are owned by a form +// and should be closed when the form finishes execution. +type formOwnedPort struct { + File bool + Chan bool +} + +func (fop formOwnedPort) close(p *Port) { + if fop.File { + p.File.Close() + } + if fop.Chan { + close(p.Chan) + } +} + func (op *pipelineOp) exec(fm *Frame) Exception { if fm.Canceled() { return fm.errorp(op, ErrInterrupted) @@ -89,10 +105,12 @@ func (op *pipelineOp) exec(fm *Frame) Exception { // For each form, create a dedicated evalCtx and run asynchronously for i, form := range op.forms { newFm := fm.Fork() + var fops []formOwnedPort inputIsPipe := i > 0 outputIsPipe := i < nforms-1 if inputIsPipe { newFm.ports[0] = nextIn + growAccess(&fops, 0).File = true } if outputIsPipe { // Each internal port pair consists of a (byte) pipe pair and a @@ -108,16 +126,15 @@ func (op *pipelineOp) exec(fm *Frame) Exception { readerGone := new(atomic.Bool) newFm.ports[1] = &Port{ File: writer, Chan: ch, - closeFile: true, closeChan: true, sendStop: sendStop, sendError: sendError, readerGone: readerGone} + *growAccess(&fops, 1) = formOwnedPort{File: true, Chan: true} nextIn = &Port{ File: reader, Chan: ch, - closeFile: true, closeChan: false, // Store in input port for ease of retrieval later sendStop: sendStop, sendError: sendError, readerGone: readerGone} } - f := func(form *formOp, pexc *Exception) { - exc := form.exec(newFm) + f := func(form *formOp, fops []formOwnedPort, pexc *Exception) { + exc := form.exec(newFm, &fops) if exc != nil && !(outputIsPipe && isReaderGone(exc)) { *pexc = exc } @@ -127,13 +144,15 @@ func (op *pipelineOp) exec(fm *Frame) Exception { close(input.sendStop) input.readerGone.Store(true) } - newFm.Close() + for i, fop := range fops { + fop.close(newFm.ports[i]) + } wg.Done() } if i == nforms-1 && !op.bg { - f(form, &excs[i]) + f(form, fops, &excs[i]) } else { - go f(form, &excs[i]) + go f(form, fops, &excs[i]) } } @@ -236,13 +255,13 @@ func (cp *compiler) formBody(n *parse.Form) formBody { return formBody{ordinaryCmd: ordinaryCmd{headOp, argOps, optsOp}} } -func (op *formOp) exec(fm *Frame) (errRet Exception) { +func (op *formOp) exec(fm *Frame, fops *[]formOwnedPort) (errRet Exception) { // fm here is always a sub-frame created in compiler.pipeline, so it can // be safely modified. // Redirections. for _, redirOp := range op.redirs { - exc := redirOp.exec(fm) + exc := redirOp.exec(fm, fops) if exc != nil { return exc } @@ -377,7 +396,7 @@ type InvalidFD struct{ FD int } func (err InvalidFD) Error() string { return fmt.Sprintf("invalid fd: %d", err.FD) } -func (op *redirOp) exec(fm *Frame) Exception { +func (op *redirOp) exec(fm *Frame, fops *[]formOwnedPort) Exception { var dst int if op.dstOp == nil { // No explicit FD destination specified; use default destinations @@ -398,8 +417,12 @@ func (op *redirOp) exec(fm *Frame) Exception { } } - growPorts(&fm.ports, dst+1) - fm.ports[dst].close() + dstPort := growAccess(&fm.ports, dst) + dstFop := growAccess(fops, dst) + if *dstPort != nil { + dstFop.close(*dstPort) + *dstFop = formOwnedPort{File: false, Chan: false} + } if op.srcIsFd { src, err := evalForFd(fm, op.srcOp, true, "redirection source") @@ -409,13 +432,13 @@ func (op *redirOp) exec(fm *Frame) Exception { switch { case src == -1: // close - fm.ports[dst] = &Port{ + *dstPort = &Port{ // Ensure that writing to value output throws an exception sendStop: closedSendStop, sendError: &ErrPortDoesNotSupportValueOutput} case src >= len(fm.ports) || fm.ports[src] == nil: return fm.errorp(op, InvalidFD{FD: src}) default: - fm.ports[dst] = fm.ports[src].fork() + *dstPort = fm.ports[src] } return nil } @@ -429,9 +452,10 @@ func (op *redirOp) exec(fm *Frame) Exception { if err != nil { return fm.errorpf(op, "failed to open file %s: %s", vals.ReprPlain(src), err) } - fm.ports[dst] = fileRedirPort(op.mode, f, true) + *dstPort = fileRedirPort(op.mode, f) + dstFop.File = true case vals.File: - fm.ports[dst] = fileRedirPort(op.mode, src, false) + *dstPort = fileRedirPort(op.mode, src) default: if _, isMap := src.(vals.Map); !isMap && !vals.IsFieldMap(src) { return fm.errorp(op.srcOp, errs.BadValue{ @@ -463,7 +487,7 @@ func (op *redirOp) exec(fm *Frame) Exception { default: return fm.errorpf(op, "can only use < or > with maps") } - fm.ports[dst] = fileRedirPort(op.mode, srcFile, false) + *dstPort = fileRedirPort(op.mode, srcFile) } return nil } @@ -471,31 +495,21 @@ func (op *redirOp) exec(fm *Frame) Exception { // Creates a port that only have a file component, populating the // channel-related fields with suitable values depending on the redirection // mode. -func fileRedirPort(mode parse.RedirMode, f *os.File, closeFile bool) *Port { +func fileRedirPort(mode parse.RedirMode, f *os.File) *Port { if mode == parse.Read { return &Port{ - File: f, closeFile: closeFile, + File: f, // ClosedChan produces no values when reading. Chan: ClosedChan, } } return &Port{ - File: f, closeFile: closeFile, + File: f, // Throws errValueOutputIsClosed when writing. Chan: nil, sendStop: closedSendStop, sendError: &ErrPortDoesNotSupportValueOutput, } } -// Makes the size of *ports at least n, adding nil's if necessary. -func growPorts(ports *[]*Port, n int) { - if len(*ports) >= n { - return - } - oldPorts := *ports - *ports = make([]*Port, n) - copy(*ports, oldPorts) -} - func evalForFd(fm *Frame, op valuesOp, closeOK bool, what string) (int, error) { value, err := evalForValue(fm, op, what) if err != nil { @@ -538,3 +552,13 @@ func (op seqOp) exec(fm *Frame) Exception { type nopOp struct{} func (nopOp) exec(fm *Frame) Exception { return nil } + +// Accesses s[i], growing the slice with zero values if necessary. +func growAccess[T any](s *[]T, i int) *T { + if i >= len(*s) { + old := *s + *s = make([]T, i+1) + copy(*s, old) + } + return &(*s)[i] +} diff --git a/pkg/eval/eval.go b/pkg/eval/eval.go index 2307fcc83..e113fa147 100644 --- a/pkg/eval/eval.go +++ b/pkg/eval/eval.go @@ -395,7 +395,7 @@ func (ev *Evaler) prepareFrame(src parse.Source, cfg EvalCfg) (*Frame, func()) { } func fillDefaultDummyPorts(ports []*Port) []*Port { - growPorts(&ports, 3) + growAccess(&ports, 2) if ports[0] == nil { ports[0] = DummyInputPort } diff --git a/pkg/eval/frame.go b/pkg/eval/frame.go index 62e103d2d..d2660fca3 100644 --- a/pkg/eval/frame.go +++ b/pkg/eval/frame.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "slices" "sync" "src.elv.sh/pkg/diag" @@ -87,15 +88,6 @@ func (fm *Frame) Eval(src parse.Source, r diag.Ranger, ns *Ns) (*Ns, error) { return newLocal, exec() } -// Close releases resources allocated for this frame. It always returns a nil -// error. It may be called only once. -func (fm *Frame) Close() error { - for _, port := range fm.ports { - port.close() - } - return nil -} - // InputChan returns a channel from which input can be read. func (fm *Frame) InputChan() chan any { return fm.ports[0].Chan @@ -190,21 +182,11 @@ func (fm *Frame) Canceled() bool { } } -// Fork returns a modified copy of fm. The ports are forked, and the name is -// changed to the given value. Other fields are copied shallowly. +// Fork returns a copy of fm, with the ports cloned. func (fm *Frame) Fork() *Frame { - newPorts := make([]*Port, len(fm.ports)) - for i, p := range fm.ports { - if p != nil { - newPorts[i] = p.fork() - } - } - return &Frame{ - fm.Evaler, fm.src, - fm.local, fm.up, fm.defers, - fm.ctx, newPorts, - fm.traceback, fm.background, - } + newFm := *fm + newFm.ports = slices.Clone(fm.ports) + return &newFm } // A shorthand for forking a frame and setting the output port. diff --git a/pkg/eval/port.go b/pkg/eval/port.go index 28ecee320..5e60119ff 100644 --- a/pkg/eval/port.go +++ b/pkg/eval/port.go @@ -16,10 +16,8 @@ import ( // Port conveys data stream. It always consists of a byte band and a channel band. type Port struct { - File *os.File - Chan chan any - closeFile bool - closeChan bool + File *os.File + Chan chan any // The following two fields are populated as an additional control mechanism // for output ports. When no more value should be send on Chan, sendError is @@ -45,24 +43,6 @@ var closedSendStop = make(chan struct{}) func init() { close(closedSendStop) } -// Returns a copy of the Port with the Close* flags unset. -func (p *Port) fork() *Port { - return &Port{p.File, p.Chan, false, false, p.sendStop, p.sendError, p.readerGone} -} - -// Closes a Port. -func (p *Port) close() { - if p == nil { - return - } - if p.closeFile { - p.File.Close() - } - if p.closeChan { - close(p.Chan) - } -} - var ( // ClosedChan is a closed channel, suitable as a placeholder input channel. ClosedChan = getClosedChan() @@ -131,9 +111,10 @@ func PipePort(vCb func(<-chan any), bCb func(*os.File)) (*Port, func(), error) { bCb(r) }() - port := &Port{Chan: ch, closeChan: true, File: w, closeFile: true} + port := &Port{File: w, Chan: ch} done := func() { - port.close() + w.Close() + close(ch) wg.Wait() } return port, done, nil diff --git a/pkg/mods/re/re.go b/pkg/mods/re/re.go index 2e560a330..4b981d2e9 100644 --- a/pkg/mods/re/re.go +++ b/pkg/mods/re/re.go @@ -188,7 +188,6 @@ func awk(fm *eval.Frame, opts awkOpt, f eval.Callable, inputs eval.Inputs) error newFm := fm.Fork() // TODO: Close port 0 of newFm. ex := f.Call(newFm, args, eval.NoOpts) - newFm.Close() if ex != nil { switch eval.Reason(ex) {