Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-475][Improvement] It's unnecessary to use ConcurrentHashMap for "partitionToBlockIds" in RssShuffleWriter #480

Merged
merged 5 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public RssShuffleWriter(
this.sendSizeLimit = sparkConf.getSizeAsBytes(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(),
RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.defaultValue().get());
this.bitmapSplitNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_BITMAP_SPLIT_NUM);
this.partitionToBlockIds = Maps.newConcurrentMap();
this.partitionToBlockIds = Maps.newHashMap();
this.shuffleWriteClient = shuffleWriteClient;
this.shuffleServersForData = rssHandle.getShuffleServersForData();
this.partitionToServers = rssHandle.getPartitionToServers();
Expand Down Expand Up @@ -172,8 +172,8 @@ public void write(Iterator<Product2<K, V>> records) {
}

private void writeImpl(Iterator<Product2<K,V>> records) {
List<ShuffleBlockInfo> shuffleBlockInfos = null;
Set<Long> blockIds = Sets.newConcurrentHashSet();
List<ShuffleBlockInfo> shuffleBlockInfos;
Set<Long> blockIds = Sets.newHashSet();
while (records.hasNext()) {
Product2<K, V> record = records.next();
int partition = getPartition(record._1());
Expand Down Expand Up @@ -225,8 +225,7 @@ private void processShuffleBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoLis
blockIds.add(blockId);
// update [partition, blockIds], it will be sent to shuffle server
int partitionId = sbi.getPartitionId();
partitionToBlockIds.putIfAbsent(partitionId, Sets.newConcurrentHashSet());
partitionToBlockIds.get(partitionId).add(blockId);
partitionToBlockIds.computeIfAbsent(partitionId, k -> Sets.newHashSet()).add(blockId);
});
postBlockEvent(shuffleBlockInfoList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public RssShuffleWriter(
this.sendSizeLimit = sparkConf.getSizeAsBytes(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(),
RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.defaultValue().get());
this.bitmapSplitNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_BITMAP_SPLIT_NUM);
this.partitionToBlockIds = Maps.newConcurrentMap();
this.partitionToBlockIds = Maps.newHashMap();
this.shuffleWriteClient = shuffleWriteClient;
this.shuffleServersForData = rssHandle.getShuffleServersForData();
this.partitionLengths = new long[partitioner.numPartitions()];
Expand All @@ -166,8 +166,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}

private void writeImpl(Iterator<Product2<K,V>> records) {
List<ShuffleBlockInfo> shuffleBlockInfos = null;
Set<Long> blockIds = Sets.newConcurrentHashSet();
List<ShuffleBlockInfo> shuffleBlockInfos;
Set<Long> blockIds = Sets.newHashSet();
boolean isCombine = shuffleDependency.mapSideCombine();
Function1 createCombiner = null;
if (isCombine) {
Expand Down Expand Up @@ -223,8 +223,7 @@ private void processShuffleBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoLis
blockIds.add(blockId);
// update [partition, blockIds], it will be sent to shuffle server
int partitionId = sbi.getPartitionId();
partitionToBlockIds.putIfAbsent(partitionId, Sets.newConcurrentHashSet());
partitionToBlockIds.get(partitionId).add(blockId);
partitionToBlockIds.computeIfAbsent(partitionId, k -> Sets.newHashSet()).add(blockId);
advancedxy marked this conversation as resolved.
Show resolved Hide resolved
partitionLengths[partitionId] += sbi.getLength();
});
postBlockEvent(shuffleBlockInfoList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo
}

// maintain the count of blocks that have been sent to the server
Map<Long, AtomicInteger> blockIdsTracker = Maps.newConcurrentMap();
Map<Long, AtomicInteger> blockIdsTracker = Maps.newHashMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable will be accessed by multiple threads in sendShuffleDataAsync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it's not shared since a new instance is created each time you call the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I overlooked the CompletableFuture part inside sendShuffleDataAsync. let me rollback change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it's not shared since a new instance is created each time you call the method.

You can see for more details.

serverToBlockIds.get(ssi).forEach(block -> blockIdsTracker.get(block).incrementAndGet());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rolled back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through some logic and didn't find any update to "blockIdsTracker" (correct me if I am wrong) in main thread after "sendShuffleDataAsync" call which runs asynchronously in the threadpool, "dataTransferPool". According to BlockingQueue (used internally by the thread pool), "...actions in a thread prior to placing an object into a BlockingQueue happen-before actions subsequent to the access or removal of that element from the BlockingQueue in another thread.".

So, I think we don't need cocurrentHashmap for "blockIdsTracker". And you use "AtomicInteger" as value part of "blockIdsTracker", it's enough to make the updated value visible to other threads in later code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. But it's more safe to use cocurrentHashmap. If we modify this logic one day, we could forget to change this type to ConcurrentHashmap. If you still think it's meaningful to modify this type, I think we could add some comments to explain why we don't use ConcurrentHashmap and remind us of this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just changed back to HashMap with comments to show reason. And from the code logic perspective, we will unlikely to insert/delete entries after dispatching it for sendShuffleDataAsync.

primaryServerToBlockIds.values().forEach(
blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new AtomicInteger(0)))
);
Expand Down Expand Up @@ -473,8 +473,8 @@ public void reportShuffleResult(
long taskAttemptId,
Map<Integer, List<Long>> partitionToBlockIds,
int bitmapNum) {
Map<ShuffleServerInfo, List<Integer>> groupedPartitions = Maps.newConcurrentMap();
Map<Integer, Integer> partitionReportTracker = Maps.newConcurrentMap();
Map<ShuffleServerInfo, List<Integer>> groupedPartitions = Maps.newHashMap();
Map<Integer, Integer> partitionReportTracker = Maps.newHashMap();
for (Map.Entry<Integer, List<ShuffleServerInfo>> entry : partitionToServers.entrySet()) {
for (ShuffleServerInfo ssi : entry.getValue()) {
if (!groupedPartitions.containsKey(ssi)) {
Expand Down