Skip to content

Commit

Permalink
[Improvement] Add RssUtils#cloneBitMap() (#103)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

1. Add `RssUtils#cloneBitMap()`.
2. Replace `deserializeBitMap(serializeBitMap(bitmap))` by `cloneBitMap(bitmap)`.

### Why are the changes needed?

1. No need to handle `IOException`.
2. More efficient.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test `RssUtilsTest#testCloneBitmap()`
  • Loading branch information
kaijchen authored Jul 29, 2022
1 parent 493bf19 commit 4e4b940
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.uniffle.client.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
Expand Down Expand Up @@ -110,11 +109,7 @@ public ShuffleReadClientImpl(
}

// copy blockIdBitmap to track all pending blocks
try {
pendingBlockIds = RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
} catch (IOException ioe) {
throw new RuntimeException("Can't create pending blockIds.", ioe);
}
pendingBlockIds = RssUtils.cloneBitMap(blockIdBitmap);

clientReadHandler = ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
}
Expand Down Expand Up @@ -213,11 +208,7 @@ private int read() {
@Override
public void checkProcessedBlockIds() {
Roaring64NavigableMap cloneBitmap;
try {
cloneBitmap = RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
} catch (IOException ioe) {
throw new RuntimeException("Can't validate processed blockIds.", ioe);
}
cloneBitmap = RssUtils.cloneBitMap(blockIdBitmap);
cloneBitmap.and(processedBlockIds);
if (!blockIdBitmap.equals(cloneBitmap)) {
throw new RssException("Blocks read inconsistent: expected " + blockIdBitmap.getLongCardinality()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ public static Roaring64NavigableMap deserializeBitMap(byte[] bytes) throws IOExc
return bitmap;
}

public static Roaring64NavigableMap cloneBitMap(Roaring64NavigableMap bitmap) {
Roaring64NavigableMap clone = Roaring64NavigableMap.bitmapOf();
clone.or(bitmap);
return clone;
}

public static List<ShuffleDataSegment> transIndexDataToSegments(
ShuffleIndexResult shuffleIndexResult, int readBufferSize) {
if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -88,6 +89,14 @@ public void testSerializeBitmap() throws Exception {
assertEquals(Roaring64NavigableMap.bitmapOf(), RssUtils.deserializeBitMap(new byte[]{}));
}

@Test
public void testCloneBitmap() {
Roaring64NavigableMap bitmap1 = Roaring64NavigableMap.bitmapOf(1, 2, 100, 10000);
Roaring64NavigableMap bitmap2 = RssUtils.cloneBitMap(bitmap1);
assertNotSame(bitmap1, bitmap2);
assertEquals(bitmap1, bitmap2);
}

@Test
public void testShuffleIndexSegment() {
ShuffleIndexResult shuffleIndexResult = new ShuffleIndexResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void readTest9() throws Exception {
ShuffleReadClientImpl readClient;

createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap);
Roaring64NavigableMap beforeAdded = RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
Roaring64NavigableMap beforeAdded = RssUtils.cloneBitMap(blockIdBitmap);
// write data by another task, read data again, the cache for index file should be updated
blocks = createShuffleBlockList(
0, 0, 1, 3, 25, blockIdBitmap, Maps.newHashMap(), mockSSI);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,9 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
if (System.currentTimeMillis() - start > commitTimeout) {
throw new RuntimeException("Shuffle data commit timeout for " + commitTimeout + " ms");
}
byte[] bitmapBytes;
synchronized (cachedBlockIds) {
bitmapBytes = RssUtils.serializeBitMap(cachedBlockIds);
cloneBlockIds = RssUtils.cloneBitMap(cachedBlockIds);
}
cloneBlockIds = RssUtils.deserializeBitMap(bitmapBytes);
long expectedCommitted = cloneBlockIds.getLongCardinality();
shuffleBufferManager.commitShuffleTask(appId, shuffleId);
Roaring64NavigableMap committedBlockIds;
Expand All @@ -183,9 +181,8 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
while (true) {
committedBlockIds = shuffleFlushManager.getCommittedBlockIds(appId, shuffleId);
synchronized (committedBlockIds) {
bitmapBytes = RssUtils.serializeBitMap(committedBlockIds);
cloneCommittedBlockIds = RssUtils.cloneBitMap(committedBlockIds);
}
cloneCommittedBlockIds = RssUtils.deserializeBitMap(bitmapBytes);
cloneBlockIds.andNot(cloneCommittedBlockIds);
if (cloneBlockIds.isEmpty()) {
break;
Expand Down

0 comments on commit 4e4b940

Please sign in to comment.