From 9be777cea4d9f3102d709ed02c1afec9ba53a19e Mon Sep 17 00:00:00 2001 From: Ajith Date: Thu, 2 Jan 2020 12:40:02 +0530 Subject: [PATCH] =?UTF-8?q?Using=20compound=20operations=20as=20well=20as?= =?UTF-8?q?=20increments=20and=20decrements=20on=20primitive=20fields=20ar?= =?UTF-8?q?e=20not=20atomic=20operations.=20Here=20when=20volatile=20primi?= =?UTF-8?q?tive=20field=20is=20incremented=20or=20decremented,=C2=A0=20we?= =?UTF-8?q?=20run=20into=20data=20loss=20if=20threads=20interleave=20in=20?= =?UTF-8?q?steps=20of=20update.=C2=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../spark/network/server/OneForOneStreamManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 67f64d7962035..ebf2ccff40e46 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -57,7 +57,7 @@ private static class StreamState { int curChunk = 0; // Used to keep track of the number of chunks being transferred and not finished yet. - volatile long chunksBeingTransferred = 0L; + volatile AtomicLong chunksBeingTransferred = new AtomicLong(0L); StreamState(String appId, Iterator buffers, Channel channel) { this.appId = appId; @@ -153,7 +153,7 @@ public void checkAuthorization(TransportClient client, long streamId) { public void chunkBeingSent(long streamId) { StreamState streamState = streams.get(streamId); if (streamState != null) { - streamState.chunksBeingTransferred++; + streamState.chunksBeingTransferred.incrementAndGet(); } } @@ -167,7 +167,7 @@ public void streamBeingSent(String streamId) { public void chunkSent(long streamId) { StreamState streamState = streams.get(streamId); if (streamState != null) { - streamState.chunksBeingTransferred--; + streamState.chunksBeingTransferred.decrementAndGet(); } } @@ -180,7 +180,7 @@ public void streamSent(String streamId) { public long chunksBeingTransferred() { long sum = 0L; for (StreamState streamState: streams.values()) { - sum += streamState.chunksBeingTransferred; + sum += streamState.chunksBeingTransferred.get(); } return sum; }