Skip to content

Commit

Permalink
Step2: fixes #13781, fixes #13805 (#13897)
Browse files Browse the repository at this point in the history
* Fix sym owner in wrapper proc
* threadpool changes
* revert lowerings
* add newFastMoveStmt
* try fixing test by switching to cpp

Co-authored-by: cooldome <ariabushenko@bk.ru>
  • Loading branch information
cooldome and cooldome authored Apr 16, 2020
1 parent 9295251 commit d3b0132
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 45 deletions.
13 changes: 9 additions & 4 deletions compiler/injectdestructors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type
graph: ModuleGraph
emptyNode: PNode
otherRead: PNode
inLoop, hasUnstructuredCf, inDangerousBranch: int
inLoop, inSpawn, hasUnstructuredCf, inDangerousBranch: int
declaredVars: IntSet # variables we already moved to the top level
uninit: IntSet # set of uninit'ed vars
uninitComputed: bool
Expand Down Expand Up @@ -398,7 +398,7 @@ proc passCopyToSink(n: PNode; c: var Con): PNode =
var m = genCopy(c, tmp, n)
m.add p(n, c, normal)
result.add m
if isLValue(n) and not isClosureEnv(n) and n.typ.skipTypes(abstractInst).kind != tyRef:
if isLValue(n) and not isClosureEnv(n) and n.typ.skipTypes(abstractInst).kind != tyRef and c.inSpawn == 0:
message(c.graph.config, n.info, hintPerformance,
("passing '$1' to a sink parameter introduces an implicit copy; " &
"use 'move($1)' to prevent it") % $n)
Expand Down Expand Up @@ -830,6 +830,12 @@ proc p(n: PNode; c: var Con; mode: ProcessMode): PNode =
if mode == normal and isRefConstr:
result = ensureDestruction(result, c)
of nkCallKinds:
let inSpawn = c.inSpawn
if n[0].kind == nkSym and n[0].sym.magic == mSpawn:
c.inSpawn.inc
elif c.inSpawn > 0:
c.inSpawn.dec

let parameters = n[0].typ
let L = if parameters != nil: parameters.len else: 0

Expand All @@ -840,7 +846,7 @@ proc p(n: PNode; c: var Con; mode: ProcessMode): PNode =

result = shallowCopy(n)
for i in 1..<n.len:
if i < L and isSinkTypeForParam(parameters[i]):
if i < L and (isSinkTypeForParam(parameters[i]) or inSpawn > 0):
result[i] = p(n[i], c, sinkArg)
else:
result[i] = p(n[i], c, normal)
Expand Down Expand Up @@ -1085,7 +1091,6 @@ proc injectDestructorCalls*(g: ModuleGraph; owner: PSym; n: PNode): PNode =
let t = params[i].sym.typ
if isSinkTypeForParam(t) and hasDestructor(t.skipTypes({tySink})):
c.destroys.add genDestroy(c, params[i])

#if optNimV2 in c.graph.config.globalOptions:
# injectDefaultCalls(n, c)
let body = p(n, c, normal)
Expand Down
7 changes: 7 additions & 0 deletions compiler/lowerings.nim
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ proc newFastAsgnStmt*(le, ri: PNode): PNode =
result[0] = le
result[1] = ri

proc newFastMoveStmt*(g: ModuleGraph, le, ri: PNode): PNode =
result = newNodeI(nkFastAsgn, le.info, 2)
result[0] = le
result[1] = newNodeIT(nkCall, ri.info, ri.typ)
result[1].add newSymNode(getSysMagic(g, ri.info, "move", mMove))
result[1].add ri

proc lowerTupleUnpacking*(g: ModuleGraph; n: PNode; owner: PSym): PNode =
assert n.kind == nkVarTuple
let value = n.lastSon
Expand Down
24 changes: 11 additions & 13 deletions compiler/spawn.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
## This module implements threadpool's ``spawn``.

import ast, types, idents, magicsys, msgs, options, modulegraphs,
lowerings
lowerings, liftdestructors
from trees import getMagic, getRoot

proc callProc(a: PNode): PNode =
Expand All @@ -36,8 +36,9 @@ proc spawnResult*(t: PType; inParallel: bool): TSpawnResult =
elif inParallel and not containsGarbageCollectedRef(t): srByVar
else: srFlowVar

proc flowVarKind(t: PType): TFlowVarKind =
if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
proc flowVarKind(c: ConfigRef, t: PType): TFlowVarKind =
if c.selectedGC in {gcArc, gcOrc}: fvBlob
elif t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
elif containsGarbageCollectedRef(t): fvInvalid
else: fvBlob

Expand Down Expand Up @@ -66,17 +67,11 @@ proc addLocalVar(g: ModuleGraph; varSection, varInit: PNode; owner: PSym; typ: P
varSection.add vpart
if varInit != nil:
if g.config.selectedGC in {gcArc, gcOrc}:
if typ.attachedOps[attachedAsgn] != nil:
var call = newNode(nkCall)
call.add newSymNode(typ.attachedOps[attachedAsgn])
call.add genAddrOf(newSymNode(result), tyVar)
call.add v
varInit.add call
else:
varInit.add newFastAsgnStmt(newSymNode(result), v)
# inject destructors pass will do its own analysis
varInit.add newFastMoveStmt(g, newSymNode(result), v)
else:
if useShallowCopy and typeNeedsNoDeepCopy(typ) or optTinyRtti in g.config.globalOptions:
varInit.add newFastAsgnStmt(newSymNode(result), v)
varInit.add newFastMoveStmt(g, newSymNode(result), v)
else:
let deepCopyCall = newNodeI(nkCall, varInit.info, 3)
deepCopyCall[0] = newSymNode(getSysMagic(g, varSection.info, "deepCopy", mDeepCopy))
Expand Down Expand Up @@ -145,7 +140,7 @@ proc createWrapperProc(g: ModuleGraph; f: PNode; threadParam, argsParam: PSym;
if spawnKind == srByVar:
body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call)
elif fv != nil:
let fk = fv.typ[1].flowVarKind
let fk = flowVarKind(g.config, fv.typ[1])
if fk == fvInvalid:
localError(g.config, f.info, "cannot create a flowVar of type: " &
typeToString(fv.typ[1]))
Expand Down Expand Up @@ -333,6 +328,8 @@ proc wrapProcForSpawn*(g: ModuleGraph; owner: PSym; spawnExpr: PNode; retType: P
wrapperProc = newSym(skProc, getIdent(g.cache, name), owner, fn.info, g.config.options)
threadParam = newSym(skParam, getIdent(g.cache, "thread"), wrapperProc, n.info, g.config.options)
argsParam = newSym(skParam, getIdent(g.cache, "args"), wrapperProc, n.info, g.config.options)

wrapperProc.flags.incl sfInjectDestructors
block:
let ptrType = getSysType(g, n.info, tyPointer)
threadParam.typ = ptrType
Expand Down Expand Up @@ -411,6 +408,7 @@ proc wrapProcForSpawn*(g: ModuleGraph; owner: PSym; spawnExpr: PNode; retType: P
fvAsExpr = indirectAccess(castExpr, field, n.info)
result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest))

createTypeBoundOps(g, nil, objType, n.info)
createWrapperProc(g, fn, threadParam, argsParam,
varSection, varInit, call,
barrierAsExpr, fvAsExpr, spawnKind, wrapperProc)
Expand Down
49 changes: 21 additions & 28 deletions lib/pure/concurrency/threadpool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type

const threadpoolWaitMs {.intdefine.}: int = 100

proc blockUntil*(fv: FlowVarBase) =
proc blockUntil*(fv: var FlowVarBaseObj) =
## Waits until the value for the ``fv`` arrives.
##
## Usually it is not necessary to call this explicitly.
Expand Down Expand Up @@ -185,7 +185,7 @@ proc attach(fv: FlowVarBase; i: int): bool =
result = false
release(fv.cv.L)

proc finished(fv: FlowVarBase) =
proc finished(fv: var FlowVarBaseObj) =
doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
# we have to protect against the rare cases where the owner of the flowVar
# simply disregards the flowVar and yet the "flowVar" has not yet written
Expand All @@ -208,10 +208,12 @@ proc finished(fv: FlowVarBase) =
# the worker thread waits for "data" to be set to nil before shutting down
owner.data = nil

proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
proc `=destroy`[T](fv: var FlowVarObj[T]) =
finished(fv)
`=destroy`(fv.blob)

proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} =
new(result, fvFinalizer)
new(result)

proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerProc.} =
fv.cv.initSemaphore()
Expand All @@ -234,43 +236,34 @@ proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
## Note that due to Nim's parameter passing semantics this
## means that ``T`` doesn't need to be copied so ``awaitAndThen`` can
## sometimes be more efficient than `^ proc <#^,FlowVar[T]>`_.
blockUntil(fv)
when T is string or T is seq:
blockUntil(fv[])
when defined(nimV2):
action(fv.blob)
elif T is string or T is seq:
action(cast[T](fv.data))
elif T is ref:
{.error: "'awaitAndThen' not available for FlowVar[ref]".}
else:
action(fv.blob)
finished(fv)
finished(fv[])

proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
result = cast[ptr T](fv.data)
finished(fv)

proc `^`*[T](fv: FlowVar[ref T]): ref T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
let src = cast[ref T](fv.data)
blockUntil(fv[])
when defined(nimV2):
result = src
result = cast[ptr T](fv.blob)
else:
deepCopy result, src
finished(fv)
result = cast[ptr T](fv.data)
finished(fv[])

proc `^`*[T](fv: FlowVar[T]): T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
when T is string or T is seq:
let src = cast[T](fv.data)
when defined(nimV2):
result = src
else:
deepCopy result, src
blockUntil(fv[])
when not defined(nimV2) and (T is string or T is seq or T is ref):
deepCopy result, cast[T](fv.data)
else:
result = fv.blob
finished(fv)
finished(fv[])

proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar``
Expand Down Expand Up @@ -457,14 +450,14 @@ proc preferSpawn*(): bool =
## <#spawnX.t>`_ instead.
result = gSomeReady.counter > 0

proc spawn*(call: typed): void {.magic: "Spawn".}
proc spawn*(call: sink typed): void {.magic: "Spawn".}
## Always spawns a new task, so that the ``call`` is never executed on
## the calling thread.
##
## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a
## return type that is either ``void`` or compatible with ``FlowVar[T]``.

proc pinnedSpawn*(id: ThreadId; call: typed): void {.magic: "Spawn".}
proc pinnedSpawn*(id: ThreadId; call: sink typed): void {.magic: "Spawn".}
## Always spawns a new task on the worker thread with ``id``, so that
## the ``call`` is **always** executed on the thread.
##
Expand Down
63 changes: 63 additions & 0 deletions tests/arc/tthread.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
discard """
cmd: "nim cpp --gc:arc --threads:on $file"
output: '''ok1
ok2
destroyed
destroyed
destroyed
'''
"""
import threadpool, os

type
MyObj = object
p: int
MyObjRef = ref MyObj

proc `=destroy`(x: var MyObj) =
if x.p != 0:
echo "destroyed"

proc thread1(): string =
os.sleep(1000)
return "ok1"

proc thread2(): ref string =
os.sleep(1000)
new(result)
result[] = "ok2"

proc thread3(): ref MyObj =
os.sleep(1000)
new(result)
result[].p = 2

var fv1 = spawn thread1()
var fv2 = spawn thread2()
var fv3 = spawn thread3()
sync()
echo ^fv1
echo (^fv2)[]


proc thread4(x: MyObjRef): MyObjRef {.nosinks.} =
os.sleep(1000)
result = x

proc thread5(x: sink MyObjRef): MyObjRef =
os.sleep(1000)
result = x

proc ref_forwarding_test =
var x = new(MyObj)
x[].p = 2
var y = spawn thread4(x)

proc ref_sink_forwarding_test =
var x = new(MyObj)
x[].p = 2
var y = spawn thread5(x)

ref_forwarding_test()
ref_sink_forwarding_test()
sync()

0 comments on commit d3b0132

Please sign in to comment.