Skip to content

Commit

Permalink
[SPARK-3686][STREAMING] Wait for sink to commit the channel before ch…
Browse files Browse the repository at this point in the history
…ecking for the channel size.
  • Loading branch information
harishreedharan committed Sep 25, 2014
1 parent 74fb2ec commit 6ce9d8b
Showing 1 changed file with 2 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class SparkSinkSuite extends FunSuite {
val events = client.getEventBatch(1000)
client.ack(events.getSequenceNumber)
assert(events.getEvents.size() === 1000)
TimeUnit.SECONDS.sleep(1) // Allow the sink to commit the transactions.
assertChannelIsEmpty(channel)
sink.stop()
channel.stop()
Expand All @@ -70,6 +71,7 @@ class SparkSinkSuite extends FunSuite {
val events = client.getEventBatch(1000)
assert(events.getEvents.size() === 1000)
client.nack(events.getSequenceNumber)
TimeUnit.SECONDS.sleep(1) // Allow the sink to commit the transactions.
assert(availableChannelSlots(channel) === 4000)
sink.stop()
channel.stop()
Expand Down

0 comments on commit 6ce9d8b

Please sign in to comment.