From d3b0132061235f316ac554690eb7e471caf95955 Mon Sep 17 00:00:00 2001 From: cooldome Date: Thu, 16 Apr 2020 22:27:08 +0100 Subject: [PATCH] Step2: fixes #13781, fixes #13805 (#13897) * Fix sym owner in wrapper proc * threadpool changes * revert lowerings * add newFastMoveStmt * try fixing test by switching to cpp Co-authored-by: cooldome --- compiler/injectdestructors.nim | 13 ++++-- compiler/lowerings.nim | 7 ++++ compiler/spawn.nim | 24 +++++------ lib/pure/concurrency/threadpool.nim | 49 ++++++++++------------ tests/arc/tthread.nim | 63 +++++++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 45 deletions(-) create mode 100644 tests/arc/tthread.nim diff --git a/compiler/injectdestructors.nim b/compiler/injectdestructors.nim index 07ab2730d8894..680a3839700e9 100644 --- a/compiler/injectdestructors.nim +++ b/compiler/injectdestructors.nim @@ -44,7 +44,7 @@ type graph: ModuleGraph emptyNode: PNode otherRead: PNode - inLoop, hasUnstructuredCf, inDangerousBranch: int + inLoop, inSpawn, hasUnstructuredCf, inDangerousBranch: int declaredVars: IntSet # variables we already moved to the top level uninit: IntSet # set of uninit'ed vars uninitComputed: bool @@ -398,7 +398,7 @@ proc passCopyToSink(n: PNode; c: var Con): PNode = var m = genCopy(c, tmp, n) m.add p(n, c, normal) result.add m - if isLValue(n) and not isClosureEnv(n) and n.typ.skipTypes(abstractInst).kind != tyRef: + if isLValue(n) and not isClosureEnv(n) and n.typ.skipTypes(abstractInst).kind != tyRef and c.inSpawn == 0: message(c.graph.config, n.info, hintPerformance, ("passing '$1' to a sink parameter introduces an implicit copy; " & "use 'move($1)' to prevent it") % $n) @@ -830,6 +830,12 @@ proc p(n: PNode; c: var Con; mode: ProcessMode): PNode = if mode == normal and isRefConstr: result = ensureDestruction(result, c) of nkCallKinds: + let inSpawn = c.inSpawn + if n[0].kind == nkSym and n[0].sym.magic == mSpawn: + c.inSpawn.inc + elif c.inSpawn > 0: + c.inSpawn.dec + let parameters = n[0].typ let L = if parameters != nil: parameters.len else: 0 @@ -840,7 +846,7 @@ proc p(n: PNode; c: var Con; mode: ProcessMode): PNode = result = shallowCopy(n) for i in 1.. 0): result[i] = p(n[i], c, sinkArg) else: result[i] = p(n[i], c, normal) @@ -1085,7 +1091,6 @@ proc injectDestructorCalls*(g: ModuleGraph; owner: PSym; n: PNode): PNode = let t = params[i].sym.typ if isSinkTypeForParam(t) and hasDestructor(t.skipTypes({tySink})): c.destroys.add genDestroy(c, params[i]) - #if optNimV2 in c.graph.config.globalOptions: # injectDefaultCalls(n, c) let body = p(n, c, normal) diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim index 091b291c2e403..575bfe8aa72d2 100644 --- a/compiler/lowerings.nim +++ b/compiler/lowerings.nim @@ -59,6 +59,13 @@ proc newFastAsgnStmt*(le, ri: PNode): PNode = result[0] = le result[1] = ri +proc newFastMoveStmt*(g: ModuleGraph, le, ri: PNode): PNode = + result = newNodeI(nkFastAsgn, le.info, 2) + result[0] = le + result[1] = newNodeIT(nkCall, ri.info, ri.typ) + result[1].add newSymNode(getSysMagic(g, ri.info, "move", mMove)) + result[1].add ri + proc lowerTupleUnpacking*(g: ModuleGraph; n: PNode; owner: PSym): PNode = assert n.kind == nkVarTuple let value = n.lastSon diff --git a/compiler/spawn.nim b/compiler/spawn.nim index 2f078309bddfa..c60dd934c64d5 100644 --- a/compiler/spawn.nim +++ b/compiler/spawn.nim @@ -10,7 +10,7 @@ ## This module implements threadpool's ``spawn``. import ast, types, idents, magicsys, msgs, options, modulegraphs, - lowerings + lowerings, liftdestructors from trees import getMagic, getRoot proc callProc(a: PNode): PNode = @@ -36,8 +36,9 @@ proc spawnResult*(t: PType; inParallel: bool): TSpawnResult = elif inParallel and not containsGarbageCollectedRef(t): srByVar else: srFlowVar -proc flowVarKind(t: PType): TFlowVarKind = - if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC +proc flowVarKind(c: ConfigRef, t: PType): TFlowVarKind = + if c.selectedGC in {gcArc, gcOrc}: fvBlob + elif t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC elif containsGarbageCollectedRef(t): fvInvalid else: fvBlob @@ -66,17 +67,11 @@ proc addLocalVar(g: ModuleGraph; varSection, varInit: PNode; owner: PSym; typ: P varSection.add vpart if varInit != nil: if g.config.selectedGC in {gcArc, gcOrc}: - if typ.attachedOps[attachedAsgn] != nil: - var call = newNode(nkCall) - call.add newSymNode(typ.attachedOps[attachedAsgn]) - call.add genAddrOf(newSymNode(result), tyVar) - call.add v - varInit.add call - else: - varInit.add newFastAsgnStmt(newSymNode(result), v) + # inject destructors pass will do its own analysis + varInit.add newFastMoveStmt(g, newSymNode(result), v) else: if useShallowCopy and typeNeedsNoDeepCopy(typ) or optTinyRtti in g.config.globalOptions: - varInit.add newFastAsgnStmt(newSymNode(result), v) + varInit.add newFastMoveStmt(g, newSymNode(result), v) else: let deepCopyCall = newNodeI(nkCall, varInit.info, 3) deepCopyCall[0] = newSymNode(getSysMagic(g, varSection.info, "deepCopy", mDeepCopy)) @@ -145,7 +140,7 @@ proc createWrapperProc(g: ModuleGraph; f: PNode; threadParam, argsParam: PSym; if spawnKind == srByVar: body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call) elif fv != nil: - let fk = fv.typ[1].flowVarKind + let fk = flowVarKind(g.config, fv.typ[1]) if fk == fvInvalid: localError(g.config, f.info, "cannot create a flowVar of type: " & typeToString(fv.typ[1])) @@ -333,6 +328,8 @@ proc wrapProcForSpawn*(g: ModuleGraph; owner: PSym; spawnExpr: PNode; retType: P wrapperProc = newSym(skProc, getIdent(g.cache, name), owner, fn.info, g.config.options) threadParam = newSym(skParam, getIdent(g.cache, "thread"), wrapperProc, n.info, g.config.options) argsParam = newSym(skParam, getIdent(g.cache, "args"), wrapperProc, n.info, g.config.options) + + wrapperProc.flags.incl sfInjectDestructors block: let ptrType = getSysType(g, n.info, tyPointer) threadParam.typ = ptrType @@ -411,6 +408,7 @@ proc wrapProcForSpawn*(g: ModuleGraph; owner: PSym; spawnExpr: PNode; retType: P fvAsExpr = indirectAccess(castExpr, field, n.info) result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest)) + createTypeBoundOps(g, nil, objType, n.info) createWrapperProc(g, fn, threadParam, argsParam, varSection, varInit, call, barrierAsExpr, fvAsExpr, spawnKind, wrapperProc) diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index ec950aa19c9d9..2abcafb805a84 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -137,7 +137,7 @@ type const threadpoolWaitMs {.intdefine.}: int = 100 -proc blockUntil*(fv: FlowVarBase) = +proc blockUntil*(fv: var FlowVarBaseObj) = ## Waits until the value for the ``fv`` arrives. ## ## Usually it is not necessary to call this explicitly. @@ -185,7 +185,7 @@ proc attach(fv: FlowVarBase; i: int): bool = result = false release(fv.cv.L) -proc finished(fv: FlowVarBase) = +proc finished(fv: var FlowVarBaseObj) = doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'" # we have to protect against the rare cases where the owner of the flowVar # simply disregards the flowVar and yet the "flowVar" has not yet written @@ -208,10 +208,12 @@ proc finished(fv: FlowVarBase) = # the worker thread waits for "data" to be set to nil before shutting down owner.data = nil -proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv) +proc `=destroy`[T](fv: var FlowVarObj[T]) = + finished(fv) + `=destroy`(fv.blob) proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} = - new(result, fvFinalizer) + new(result) proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerProc.} = fv.cv.initSemaphore() @@ -234,43 +236,34 @@ proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) = ## Note that due to Nim's parameter passing semantics this ## means that ``T`` doesn't need to be copied so ``awaitAndThen`` can ## sometimes be more efficient than `^ proc <#^,FlowVar[T]>`_. - blockUntil(fv) - when T is string or T is seq: + blockUntil(fv[]) + when defined(nimV2): + action(fv.blob) + elif T is string or T is seq: action(cast[T](fv.data)) elif T is ref: {.error: "'awaitAndThen' not available for FlowVar[ref]".} else: action(fv.blob) - finished(fv) + finished(fv[]) proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T = ## Blocks until the value is available and then returns this value. - blockUntil(fv) - result = cast[ptr T](fv.data) - finished(fv) - -proc `^`*[T](fv: FlowVar[ref T]): ref T = - ## Blocks until the value is available and then returns this value. - blockUntil(fv) - let src = cast[ref T](fv.data) + blockUntil(fv[]) when defined(nimV2): - result = src + result = cast[ptr T](fv.blob) else: - deepCopy result, src - finished(fv) + result = cast[ptr T](fv.data) + finished(fv[]) proc `^`*[T](fv: FlowVar[T]): T = ## Blocks until the value is available and then returns this value. - blockUntil(fv) - when T is string or T is seq: - let src = cast[T](fv.data) - when defined(nimV2): - result = src - else: - deepCopy result, src + blockUntil(fv[]) + when not defined(nimV2) and (T is string or T is seq or T is ref): + deepCopy result, cast[T](fv.data) else: result = fv.blob - finished(fv) + finished(fv[]) proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int = ## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar`` @@ -457,14 +450,14 @@ proc preferSpawn*(): bool = ## <#spawnX.t>`_ instead. result = gSomeReady.counter > 0 -proc spawn*(call: typed): void {.magic: "Spawn".} +proc spawn*(call: sink typed): void {.magic: "Spawn".} ## Always spawns a new task, so that the ``call`` is never executed on ## the calling thread. ## ## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a ## return type that is either ``void`` or compatible with ``FlowVar[T]``. -proc pinnedSpawn*(id: ThreadId; call: typed): void {.magic: "Spawn".} +proc pinnedSpawn*(id: ThreadId; call: sink typed): void {.magic: "Spawn".} ## Always spawns a new task on the worker thread with ``id``, so that ## the ``call`` is **always** executed on the thread. ## diff --git a/tests/arc/tthread.nim b/tests/arc/tthread.nim new file mode 100644 index 0000000000000..c653c753f8496 --- /dev/null +++ b/tests/arc/tthread.nim @@ -0,0 +1,63 @@ +discard """ + cmd: "nim cpp --gc:arc --threads:on $file" + output: '''ok1 +ok2 +destroyed +destroyed +destroyed +''' +""" +import threadpool, os + +type + MyObj = object + p: int + MyObjRef = ref MyObj + +proc `=destroy`(x: var MyObj) = + if x.p != 0: + echo "destroyed" + +proc thread1(): string = + os.sleep(1000) + return "ok1" + +proc thread2(): ref string = + os.sleep(1000) + new(result) + result[] = "ok2" + +proc thread3(): ref MyObj = + os.sleep(1000) + new(result) + result[].p = 2 + +var fv1 = spawn thread1() +var fv2 = spawn thread2() +var fv3 = spawn thread3() +sync() +echo ^fv1 +echo (^fv2)[] + + +proc thread4(x: MyObjRef): MyObjRef {.nosinks.} = + os.sleep(1000) + result = x + +proc thread5(x: sink MyObjRef): MyObjRef = + os.sleep(1000) + result = x + +proc ref_forwarding_test = + var x = new(MyObj) + x[].p = 2 + var y = spawn thread4(x) + +proc ref_sink_forwarding_test = + var x = new(MyObj) + x[].p = 2 + var y = spawn thread5(x) + +ref_forwarding_test() +ref_sink_forwarding_test() +sync()