Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step2: fixes #13781, fixes #13805 #13897

Merged
merged 15 commits into from
Apr 16, 2020
13 changes: 9 additions & 4 deletions compiler/injectdestructors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type
graph: ModuleGraph
emptyNode: PNode
otherRead: PNode
inLoop, hasUnstructuredCf, inDangerousBranch: int
inLoop, inSpawn, hasUnstructuredCf, inDangerousBranch: int
declaredVars: IntSet # variables we already moved to the top level
uninit: IntSet # set of uninit'ed vars
uninitComputed: bool
Expand Down Expand Up @@ -359,7 +359,7 @@ proc passCopyToSink(n: PNode; c: var Con): PNode =
var m = genCopy(c, tmp, n)
m.add p(n, c, normal)
result.add m
if isLValue(n) and not isClosureEnv(n) and n.typ.skipTypes(abstractInst).kind != tyRef:
if isLValue(n) and not isClosureEnv(n) and n.typ.skipTypes(abstractInst).kind != tyRef and c.inSpawn == 0:
message(c.graph.config, n.info, hintPerformance,
("passing '$1' to a sink parameter introduces an implicit copy; " &
"use 'move($1)' to prevent it") % $n)
Expand Down Expand Up @@ -791,6 +791,12 @@ proc p(n: PNode; c: var Con; mode: ProcessMode): PNode =
if mode == normal and isRefConstr:
result = ensureDestruction(result, c)
of nkCallKinds:
let inSpawn = c.inSpawn
if n[0].kind == nkSym and n[0].sym.magic == mSpawn:
c.inSpawn.inc
elif c.inSpawn > 0:
Copy link
Member

@Araq Araq Apr 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suspicious logic. Why not at the end of the of nkCallkinds handling code like

if n[0].kind == nkSym and n[0].sym.magic == mSpawn:
  c.inSpawn.dec

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that spawn can be written in the following way:

spawn myproc(call1(), (let x = call2(); call3(x, 0))

I want c.inSpawn to be greater than zero only for myproc call but not for call2 and call3 in example above.

c.inSpawn.dec

let parameters = n[0].typ
let L = if parameters != nil: parameters.len else: 0

Expand All @@ -801,7 +807,7 @@ proc p(n: PNode; c: var Con; mode: ProcessMode): PNode =

result = shallowCopy(n)
for i in 1..<n.len:
if i < L and isSinkTypeForParam(parameters[i]):
if i < L and (isSinkTypeForParam(parameters[i]) or inSpawn > 0):
result[i] = p(n[i], c, sinkArg)
else:
result[i] = p(n[i], c, normal)
Expand Down Expand Up @@ -1044,7 +1050,6 @@ proc injectDestructorCalls*(g: ModuleGraph; owner: PSym; n: PNode): PNode =
let t = params[i].sym.typ
if isSinkTypeForParam(t) and hasDestructor(t.skipTypes({tySink})):
c.destroys.add genDestroy(c, params[i])

#if optNimV2 in c.graph.config.globalOptions:
# injectDefaultCalls(n, c)
let body = p(n, c, normal)
Expand Down
7 changes: 7 additions & 0 deletions compiler/lowerings.nim
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ proc newFastAsgnStmt*(le, ri: PNode): PNode =
result[0] = le
result[1] = ri

proc newFastMoveStmt*(g: ModuleGraph, le, ri: PNode): PNode =
result = newNodeI(nkFastAsgn, le.info, 2)
result[0] = le
result[1] = newNodeIT(nkCall, ri.info, ri.typ)
result[1].add newSymNode(getSysMagic(g, ri.info, "move", mMove))
result[1].add ri

proc lowerTupleUnpacking*(g: ModuleGraph; n: PNode; owner: PSym): PNode =
assert n.kind == nkVarTuple
let value = n.lastSon
Expand Down
24 changes: 11 additions & 13 deletions compiler/spawn.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
## This module implements threadpool's ``spawn``.

import ast, types, idents, magicsys, msgs, options, modulegraphs,
lowerings
lowerings, liftdestructors
from trees import getMagic

proc callProc(a: PNode): PNode =
Expand All @@ -36,8 +36,9 @@ proc spawnResult*(t: PType; inParallel: bool): TSpawnResult =
elif inParallel and not containsGarbageCollectedRef(t): srByVar
else: srFlowVar

proc flowVarKind(t: PType): TFlowVarKind =
if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
proc flowVarKind(c: ConfigRef, t: PType): TFlowVarKind =
if c.selectedGC in {gcArc, gcOrc}: fvBlob
elif t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
elif containsGarbageCollectedRef(t): fvInvalid
else: fvBlob

Expand Down Expand Up @@ -66,17 +67,11 @@ proc addLocalVar(g: ModuleGraph; varSection, varInit: PNode; owner: PSym; typ: P
varSection.add vpart
if varInit != nil:
if g.config.selectedGC in {gcArc, gcOrc}:
if typ.attachedOps[attachedAsgn] != nil:
var call = newNode(nkCall)
call.add newSymNode(typ.attachedOps[attachedAsgn])
call.add genAddrOf(newSymNode(result), tyVar)
call.add v
varInit.add call
else:
varInit.add newFastAsgnStmt(newSymNode(result), v)
# inject destructors pass will do its own analysis
varInit.add newFastMoveStmt(g, newSymNode(result), v)
else:
if useShallowCopy and typeNeedsNoDeepCopy(typ) or optTinyRtti in g.config.globalOptions:
varInit.add newFastAsgnStmt(newSymNode(result), v)
varInit.add newFastMoveStmt(g, newSymNode(result), v)
else:
let deepCopyCall = newNodeI(nkCall, varInit.info, 3)
deepCopyCall[0] = newSymNode(getSysMagic(g, varSection.info, "deepCopy", mDeepCopy))
Expand Down Expand Up @@ -145,7 +140,7 @@ proc createWrapperProc(g: ModuleGraph; f: PNode; threadParam, argsParam: PSym;
if spawnKind == srByVar:
body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call)
elif fv != nil:
let fk = fv.typ[1].flowVarKind
let fk = flowVarKind(g.config, fv.typ[1])
if fk == fvInvalid:
localError(g.config, f.info, "cannot create a flowVar of type: " &
typeToString(fv.typ[1]))
Expand Down Expand Up @@ -350,6 +345,8 @@ proc wrapProcForSpawn*(g: ModuleGraph; owner: PSym; spawnExpr: PNode; retType: P
wrapperProc = newSym(skProc, getIdent(g.cache, name), owner, fn.info, g.config.options)
threadParam = newSym(skParam, getIdent(g.cache, "thread"), wrapperProc, n.info, g.config.options)
argsParam = newSym(skParam, getIdent(g.cache, "args"), wrapperProc, n.info, g.config.options)

wrapperProc.flags.incl sfInjectDestructors
block:
let ptrType = getSysType(g, n.info, tyPointer)
threadParam.typ = ptrType
Expand Down Expand Up @@ -428,6 +425,7 @@ proc wrapProcForSpawn*(g: ModuleGraph; owner: PSym; spawnExpr: PNode; retType: P
fvAsExpr = indirectAccess(castExpr, field, n.info)
result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest))

createTypeBoundOps(g, nil, objType, n.info)
createWrapperProc(g, fn, threadParam, argsParam,
varSection, varInit, call,
barrierAsExpr, fvAsExpr, spawnKind, wrapperProc)
Expand Down
63 changes: 32 additions & 31 deletions lib/pure/concurrency/threadpool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type

const threadpoolWaitMs {.intdefine.}: int = 100

proc blockUntil*(fv: FlowVarBase) =
proc blockUntil*(fv: var FlowVarBaseObj) =
## Waits until the value for the ``fv`` arrives.
##
## Usually it is not necessary to call this explicitly.
Expand Down Expand Up @@ -185,7 +185,7 @@ proc attach(fv: FlowVarBase; i: int): bool =
result = false
release(fv.cv.L)

proc finished(fv: FlowVarBase) =
proc finished(fv: var FlowVarBaseObj) =
doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
# we have to protect against the rare cases where the owner of the flowVar
# simply disregards the flowVar and yet the "flowVar" has not yet written
Expand All @@ -208,10 +208,12 @@ proc finished(fv: FlowVarBase) =
# the worker thread waits for "data" to be set to nil before shutting down
owner.data = nil

proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
proc `=destroy`[T](fv: var FlowVarObj[T]) =
finished(fv)
`=destroy`(fv.blob)

proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} =
new(result, fvFinalizer)
new(result)

proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerProc.} =
fv.cv.initSemaphore()
Expand All @@ -234,43 +236,42 @@ proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
## Note that due to Nim's parameter passing semantics this
## means that ``T`` doesn't need to be copied so ``awaitAndThen`` can
## sometimes be more efficient than `^ proc <#^,FlowVar[T]>`_.
blockUntil(fv)
when T is string or T is seq:
blockUntil(fv[])
when defined(nimV2):
action(fv.blob)
elif T is string or T is seq:
action(cast[T](fv.data))
elif T is ref:
{.error: "'awaitAndThen' not available for FlowVar[ref]".}
else:
action(fv.blob)
finished(fv)
finished(fv[])

proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
result = cast[ptr T](fv.data)
finished(fv)

proc `^`*[T](fv: FlowVar[ref T]): ref T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
let src = cast[ref T](fv.data)
blockUntil(fv[])
when defined(nimV2):
result = src
result = cast[ptr T](fv.blob)
else:
deepCopy result, src
finished(fv)
result = cast[ptr T](fv.data)
finished(fv[])

proc `^`*[T](fv: FlowVar[T]): T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
when T is string or T is seq:
let src = cast[T](fv.data)
when defined(nimV2):
result = src
else:
deepCopy result, src
else:
when defined(nimV2):
proc `^`*[T](fv: FlowVar[T]): T =
## Blocks until the value is available and then returns this value.
blockUntil(fv[])
result = fv.blob
finished(fv)
finished(fv[])

else:
proc `^`*[T](fv: FlowVar[T]): T =
## Blocks until the value is available and then returns this value.
blockUntil(fv[])
when T is string or T is seq or T is ref:
deepCopy result, cast[T](fv.data)
else:
result = fv.blob
finished(fv[])

proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar``
Expand Down Expand Up @@ -457,14 +458,14 @@ proc preferSpawn*(): bool =
## <#spawnX.t>`_ instead.
result = gSomeReady.counter > 0

proc spawn*(call: typed): void {.magic: "Spawn".}
proc spawn*(call: sink typed): void {.magic: "Spawn".}
## Always spawns a new task, so that the ``call`` is never executed on
## the calling thread.
##
## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a
## return type that is either ``void`` or compatible with ``FlowVar[T]``.

proc pinnedSpawn*(id: ThreadId; call: typed): void {.magic: "Spawn".}
proc pinnedSpawn*(id: ThreadId; call: sink typed): void {.magic: "Spawn".}
## Always spawns a new task on the worker thread with ``id``, so that
## the ``call`` is **always** executed on the thread.
##
Expand Down
63 changes: 63 additions & 0 deletions tests/arc/tthread.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
discard """
cmd: "nim c --gc:arc --threads:on $file"
output: '''ok1
ok2
destroyed
destroyed
destroyed
'''
"""
import threadpool, os

type
MyObj = object
p: int
MyObjRef = ref MyObj

proc `=destroy`(x: var MyObj) =
if x.p != 0:
echo "destroyed"

proc thread1(): string =
os.sleep(1000)
return "ok1"

proc thread2(): ref string =
os.sleep(1000)
new(result)
result[] = "ok2"

proc thread3(): ref MyObj =
os.sleep(1000)
new(result)
result[].p = 2

var fv1 = spawn thread1()
var fv2 = spawn thread2()
var fv3 = spawn thread3()
sync()
echo ^fv1
echo (^fv2)[]


proc thread4(x: MyObjRef): MyObjRef {.nosinks.} =
os.sleep(1000)
result = x

proc thread5(x: sink MyObjRef): MyObjRef =
os.sleep(1000)
result = x

proc ref_forwarding_test =
var x = new(MyObj)
x[].p = 2
var y = spawn thread4(x)

proc ref_sink_forwarding_test =
var x = new(MyObj)
x[].p = 2
var y = spawn thread5(x)

ref_forwarding_test()
ref_sink_forwarding_test()
sync()