Skip to content

Commit

Permalink
[ISSUE-378][HugePartition][Part-1] Record every partition data size f…
Browse files Browse the repository at this point in the history
…or one app (#458)


### What changes were proposed in this pull request?

Record every partition data size for one app

### Why are the changes needed?

This is a subtask for #378

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

1. UTs
  • Loading branch information
zuston authored Jan 6, 2023
1 parent 2b756c3 commit ebaff6a
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void sendShuffleData(SendShuffleDataRequest req,
// after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
manager.updateCachedBlockIds(appId, shuffleId, spd.getBlockList());
manager.updateCachedBlockIds(appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
}
} catch (Exception e) {
String errorMsg = "Error happened when shuffleEngine.write for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.collect.Maps;
Expand Down Expand Up @@ -46,13 +47,20 @@ public class ShuffleTaskInfo {

private AtomicReference<ShuffleDataDistributionType> dataDistType;

private AtomicLong totalDataSize = new AtomicLong(0);
/**
* shuffleId -> partitionId -> partition shuffle data size
*/
private Map<Integer, Map<Integer, Long>> partitionDataSizes;

public ShuffleTaskInfo() {
this.currentTimes = System.currentTimeMillis();
this.commitCounts = Maps.newConcurrentMap();
this.commitLocks = Maps.newConcurrentMap();
this.cachedBlockIds = Maps.newConcurrentMap();
this.user = new AtomicReference<>();
this.dataDistType = new AtomicReference<>();
this.partitionDataSizes = Maps.newConcurrentMap();
}

public Long getCurrentTimes() {
Expand Down Expand Up @@ -91,4 +99,29 @@ public void setDataDistType(
public ShuffleDataDistributionType getDataDistType() {
return dataDistType.get();
}

public void addPartitionDataSize(int shuffleId, int partitionId, long delta) {
totalDataSize.addAndGet(delta);
partitionDataSizes.computeIfAbsent(shuffleId, key -> Maps.newConcurrentMap());
Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
partitions.putIfAbsent(partitionId, 0L);
partitions.computeIfPresent(partitionId, (k, v) -> v + delta);
}

public long getTotalDataSize() {
return totalDataSize.get();
}

public long getPartitionDataSize(int shuffleId, int partitionId) {
Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
if (partitions == null) {
return 0;
}
Long size = partitions.get(partitionId);
if (size == null) {
return 0L;
}
return size;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,31 @@ public int updateAndGetCommitCount(String appId, int shuffleId) {
return commitNum.incrementAndGet();
}

// Only for tests
public void updateCachedBlockIds(String appId, int shuffleId, ShufflePartitionedBlock[] spbs) {
updateCachedBlockIds(appId, shuffleId, 0, spbs);
}

public void updateCachedBlockIds(String appId, int shuffleId, int partitionId, ShufflePartitionedBlock[] spbs) {
if (spbs == null || spbs.length == 0) {
return;
}
ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo());
Roaring64NavigableMap bitmap = shuffleTaskInfo.getCachedBlockIds()
.computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());

long size = 0L;
synchronized (bitmap) {
for (ShufflePartitionedBlock spb : spbs) {
bitmap.addLong(spb.getBlockId());
size += spb.getSize();
}
}
shuffleTaskInfo.addPartitionDataSize(
shuffleId,
partitionId,
size
);
}

public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
Expand Down Expand Up @@ -603,6 +616,11 @@ public ShuffleDataDistributionType getDataDistributionType(String appId) {
return shuffleTaskInfos.get(appId).getDataDistType();
}

@VisibleForTesting
public ShuffleTaskInfo getShuffleTaskInfo(String appId) {
return shuffleTaskInfos.get(appId);
}

private void triggerFlush() {
synchronized (this.shuffleBufferManager) {
this.shuffleBufferManager.flushIfNecessary();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class ShuffleTaskInfoTest {

@Test
public void partitionSizeSummaryTest() {
ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
// case1
long size = shuffleTaskInfo.getPartitionDataSize(0, 0);
assertEquals(0, size);

// case2
shuffleTaskInfo.addPartitionDataSize(0, 0, 1000);
size = shuffleTaskInfo.getPartitionDataSize(0, 0);
assertEquals(1000, size);

// case3
shuffleTaskInfo.addPartitionDataSize(0, 0, 500);
size = shuffleTaskInfo.getPartitionDataSize(0, 0);
assertEquals(1500, size);

assertEquals(
1500,
shuffleTaskInfo.getTotalDataSize()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,49 @@ public static void tearDown() {
ShuffleServerMetrics.clear();
}

@Test
public void partitionDataSizeSummaryTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
ShuffleServer shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();

String appId = "partitionDataSizeSummaryTest";
int shuffleId = 1;

shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);

// case1
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
long size1 = partitionedData0.getTotalBlockSize();
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());

assertEquals(
size1,
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()
);

// case2
partitionedData0 = createPartitionedData(1, 1, 35);
long size2 = partitionedData0.getTotalBlockSize();
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
assertEquals(
size1 + size2,
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()
);
assertEquals(
size1 + size2,
shuffleTaskManager.getShuffleTaskInfo(appId).getPartitionDataSize(1, 1)
);
}

@Test
public void registerShuffleTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
Expand Down

0 comments on commit ebaff6a

Please sign in to comment.