Skip to content

Commit

Permalink
[SPARK-27234][SS][PYTHON] Use InheritableThreadLocal for current epoc…
Browse files Browse the repository at this point in the history
…h in EpochTracker (to support Python UDFs)

This PR proposes to use `InheritableThreadLocal` instead of `ThreadLocal` for current epoch in `EpochTracker`. Python UDF needs threads to write out to and read it from Python processes and when there are new threads, previously set epoch is lost.

After this PR, Python UDFs can be used at Structured Streaming with the continuous mode.

The test cases were written on the top of apache#24945.
Unit tests were added.

Manual tests.

Closes apache#24946 from HyukjinKwon/SPARK-27234.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed Aug 15, 2019
1 parent dfcebca commit 4c5fdd6
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ class ContinuousCoalesceRDD(
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
while (!context.isInterrupted() && !context.isCompleted()) {
writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]])
// Note that current epoch is a non-inheritable thread local, so each writer thread
// can properly increment its own epoch without affecting the main task thread.
// Note that current epoch is a inheritable thread local but makes another instance,
// so each writer thread can properly increment its own epoch without affecting
// the main task thread.
EpochTracker.incrementCurrentEpoch()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@ import java.util.concurrent.atomic.AtomicLong
object EpochTracker {
// The current epoch. Note that this is a shared reference; ContinuousWriteRDD.compute() will
// update the underlying AtomicLong as it finishes epochs. Other code should only read the value.
private val currentEpoch: ThreadLocal[AtomicLong] = new ThreadLocal[AtomicLong] {
override def initialValue() = new AtomicLong(-1)
private val currentEpoch: InheritableThreadLocal[AtomicLong] = {
new InheritableThreadLocal[AtomicLong] {
override protected def childValue(parent: AtomicLong): AtomicLong = {
// Note: make another instance so that changes in the parent epoch aren't reflected in
// those in the children threads. This is required at `ContinuousCoalesceRDD`.
new AtomicLong(parent.get)
}
override def initialValue() = new AtomicLong(-1)
}
}

/**
Expand Down

0 comments on commit 4c5fdd6

Please sign in to comment.