Skip to content

Commit

Permalink
Using compound operations as well as increments and decrements on pri…
Browse files Browse the repository at this point in the history
…mitive fields are not atomic operations. Here when volatile primitive field is incremented or decremented,  we run into data loss if threads interleave in steps of update. 
  • Loading branch information
ajithme committed Jan 2, 2020
1 parent 83d289e commit 9be777c
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private static class StreamState {
int curChunk = 0;

// Used to keep track of the number of chunks being transferred and not finished yet.
volatile long chunksBeingTransferred = 0L;
volatile AtomicLong chunksBeingTransferred = new AtomicLong(0L);

StreamState(String appId, Iterator<ManagedBuffer> buffers, Channel channel) {
this.appId = appId;
Expand Down Expand Up @@ -153,7 +153,7 @@ public void checkAuthorization(TransportClient client, long streamId) {
public void chunkBeingSent(long streamId) {
StreamState streamState = streams.get(streamId);
if (streamState != null) {
streamState.chunksBeingTransferred++;
streamState.chunksBeingTransferred.incrementAndGet();
}

}
Expand All @@ -167,7 +167,7 @@ public void streamBeingSent(String streamId) {
public void chunkSent(long streamId) {
StreamState streamState = streams.get(streamId);
if (streamState != null) {
streamState.chunksBeingTransferred--;
streamState.chunksBeingTransferred.decrementAndGet();
}
}

Expand All @@ -180,7 +180,7 @@ public void streamSent(String streamId) {
public long chunksBeingTransferred() {
long sum = 0L;
for (StreamState streamState: streams.values()) {
sum += streamState.chunksBeingTransferred;
sum += streamState.chunksBeingTransferred.get();
}
return sum;
}
Expand Down

0 comments on commit 9be777c

Please sign in to comment.