Skip to content

Commit

Permalink
support generating larger block size during shuffle map task by spill…
Browse files Browse the repository at this point in the history
… partial partitions data
  • Loading branch information
leslizhang committed May 11, 2024
2 parents 1a8e023 + 8e26a34 commit a650c97
Show file tree
Hide file tree
Showing 88 changed files with 3,188 additions and 907 deletions.
14 changes: 14 additions & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ com.google.api.grpc:proto-google-common-protos
org.javassist:javassist
com.google.code.findbugs:jsr305
jakarta.validation:jakarta.validation-api
org.yaml:snakeyaml
org.eclipse.jetty:jetty-proxy

-----------------------------------------------------------------------------------
This product bundles various third-party components under other open source licenses.
Expand All @@ -273,6 +275,14 @@ org.slf4j:slf4j-reload4j
org.codehaus.mojo:animal-sniffer-annotations
org.checkerframework:checker-qual

axios
core-js
element-plus
moment
vue
vue-resource
vue-router

Common Development and Distribution License (CDDL) 1.0
------------------------------------------------------

Expand All @@ -290,3 +300,7 @@ Eclipse Public License (EPL) 2.0
jakarta.annotation:jakarta.annotation-api
org.glassfish.hk2.external:jakarta.inject
org.apache.hbase.thirdparty:hbase-shaded-jersey

The ISC License
---------------
rimraf
Original file line number Diff line number Diff line change
Expand Up @@ -341,22 +341,51 @@ public SortBufferIterator(SortWriteBuffer<K, V> sortWriteBuffer) {
this.iterator = sortWriteBuffer.records.iterator();
}

private byte[] fetchDataFromBuffers(int index, int offset, int length) {
// Adjust start index and offset for the start of the value
while (offset >= sortWriteBuffer.buffers.get(index).getSize()) {
offset -= sortWriteBuffer.buffers.get(index).getSize();
index++;
}

byte[] data = new byte[length]; // Create a new array to store the complete data
int copyDestPos = 0;

while (length > 0) {
WrappedBuffer currentBuffer = sortWriteBuffer.buffers.get(index);
byte[] currentBufferData = currentBuffer.getBuffer();
int currentBufferCapacity = currentBuffer.getSize();
int copyLength = Math.min(currentBufferCapacity - offset, length);

// Copy data from the current buffer to the data array
System.arraycopy(currentBufferData, offset, data, copyDestPos, copyLength);
length -= copyLength;
copyDestPos += copyLength;

// Move to the next buffer
index++;
offset = 0; // Start position in the new buffer is 0
}
return data;
}

@Override
public DataInputBuffer getKey() {
SortWriteBuffer.WrappedBuffer keyWrappedBuffer =
sortWriteBuffer.buffers.get(currentRecord.getKeyIndex());
byte[] rawData = keyWrappedBuffer.getBuffer();
keyBuffer.reset(rawData, currentRecord.getKeyOffSet(), currentRecord.getKeyLength());
int keyIndex = currentRecord.getKeyIndex();
int keyOffset = currentRecord.getKeyOffSet();
int keyLength = currentRecord.getKeyLength();
byte[] keyData = fetchDataFromBuffers(keyIndex, keyOffset, keyLength);
keyBuffer.reset(keyData, 0, keyLength);
return keyBuffer;
}

@Override
public DataInputBuffer getValue() {
SortWriteBuffer.WrappedBuffer valueWrappedBuffer =
sortWriteBuffer.buffers.get(currentRecord.getKeyIndex());
byte[] rawData = valueWrappedBuffer.getBuffer();
int keyIndex = currentRecord.getKeyIndex();
int valueOffset = currentRecord.getKeyOffSet() + currentRecord.getKeyLength();
valueBuffer.reset(rawData, valueOffset, currentRecord.getValueLength());
int valueLength = currentRecord.getValueLength();
byte[] valueData = fetchDataFromBuffers(keyIndex, valueOffset, valueLength);
valueBuffer.reset(valueData, 0, valueLength);
return valueBuffer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ public void testCombineBuffer() throws Exception {
jobConf, new TaskAttemptID(), combineInputCounter, reporter, null);

SortWriteBuffer<Text, IntWritable> buffer =
new SortWriteBuffer<Text, IntWritable>(
1, comparator, 10000, keySerializer, valueSerializer);
new SortWriteBuffer<Text, IntWritable>(1, comparator, 3072, keySerializer, valueSerializer);

List<String> wordTable =
Lists.newArrayList(
Expand All @@ -383,7 +382,7 @@ public void testCombineBuffer() throws Exception {
for (int i = 0; i < 8; i++) {
buffer.addRecord(new Text(wordTable.get(i)), new IntWritable(1));
}
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 10000; i++) {
int index = random.nextInt(wordTable.size());
buffer.addRecord(new Text(wordTable.get(index)), new IntWritable(1));
}
Expand Down Expand Up @@ -429,7 +428,7 @@ public void testCombineBuffer() throws Exception {
while (kvIterator2.next()) {
count2++;
}
assertEquals(108, count1);
assertEquals(10008, count1);
assertEquals(8, count2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Random;

import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.Deserializer;
Expand Down Expand Up @@ -153,6 +157,55 @@ public void testReadWrite() throws IOException {
assertEquals(bigWritableValue, valueRead);
}

@Test
public void testSortBufferIterator() throws IOException {
SerializationFactory serializationFactory =
new SerializationFactory(new JobConf(new Configuration()));
Serializer<Text> keySerializer = serializationFactory.getSerializer(Text.class);
Deserializer<Text> keyDeserializer = serializationFactory.getDeserializer(Text.class);
Serializer<IntWritable> valueSerializer = serializationFactory.getSerializer(IntWritable.class);
Deserializer<IntWritable> valueDeserializer =
serializationFactory.getDeserializer(IntWritable.class);

SortWriteBuffer<Text, IntWritable> buffer =
new SortWriteBuffer<Text, IntWritable>(1, null, 3072, keySerializer, valueSerializer);

List<String> wordTable =
Lists.newArrayList(
"apple", "banana", "fruit", "cherry", "Chinese", "America", "Japan", "tomato");

List<String> keys = Lists.newArrayList();

Random random = new Random();
for (int i = 0; i < 8; i++) {
buffer.addRecord(new Text(wordTable.get(i)), new IntWritable(1));
keys.add(wordTable.get(i));
}
for (int i = 0; i < 10000; i++) {
int index = random.nextInt(wordTable.size());
buffer.addRecord(new Text(wordTable.get(index)), new IntWritable(1));
keys.add(wordTable.get(index));
}

SortWriteBuffer.SortBufferIterator<Text, IntWritable> iterator =
new SortWriteBuffer.SortBufferIterator<>(buffer);

int ind = 0;

Text key = new Text();
IntWritable value = new IntWritable();
while (iterator.next()) {
iterator.getKey().getData();
iterator.getValue().getData();
keyDeserializer.open(iterator.getKey());
valueDeserializer.open(iterator.getValue());
keyDeserializer.deserialize(key);
valueDeserializer.deserialize(value);
assertEquals(keys.get(ind), key.toString());
ind++;
}
}

int readInt(DataInputStream dStream) throws IOException {
return WritableUtils.readVInt(dStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.spark.ShuffleDependency;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;

import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
Expand All @@ -31,14 +32,14 @@ public class RssShuffleHandle<K, V, C> extends ShuffleHandle {
private String appId;
private int numMaps;
private ShuffleDependency<K, V, C> dependency;
private Broadcast<ShuffleHandleInfo> handlerInfoBd;
private Broadcast<SimpleShuffleHandleInfo> handlerInfoBd;

public RssShuffleHandle(
int shuffleId,
String appId,
int numMaps,
ShuffleDependency<K, V, C> dependency,
Broadcast<ShuffleHandleInfo> handlerInfoBd) {
Broadcast<SimpleShuffleHandleInfo> handlerInfoBd) {
super(shuffleId);
this.appId = appId;
this.numMaps = numMaps;
Expand Down Expand Up @@ -67,6 +68,6 @@ public RemoteStorageInfo getRemoteStorage() {
}

public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
return handlerInfoBd.value().getPartitionToServers();
return handlerInfoBd.value().getAvailablePartitionServersForWriter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@

public class RssSparkConfig {

public static final ConfigOption<Boolean> RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED =
ConfigOptions.key("rss.blockId.selfManagementEnabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable the blockId self management in spark driver side. Default value is false.");

public static final ConfigOption<Long> RSS_CLIENT_SEND_SIZE_LIMITATION =
ConfigOptions.key("rss.client.send.size.limit")
.longType()
Expand Down Expand Up @@ -70,6 +77,18 @@ public class RssSparkConfig {
.defaultValue(1.0d)
.withDescription(
"The buffer size to spill when spill triggered by config spark.rss.writer.buffer.spill.size");
public static final ConfigOption<Integer> RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM =
ConfigOptions.key("rss.client.reassign.maxReassignServerNum")
.intType()
.defaultValue(10)
.withDescription(
"The max reassign server num for one partition when using partition reassign mechanism.");

public static final ConfigOption<Integer> RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES =
ConfigOptions.key("rss.client.reassign.blockRetryMaxTimes")
.intType()
.defaultValue(1)
.withDescription("The block retry max times when partition reassign is enabled.");

public static final String SPARK_RSS_CONFIG_PREFIX = "spark.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.spark.SparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.apache.spark.storage.BlockManagerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -62,8 +63,8 @@ public class RssSparkShuffleUtils {

private static final Logger LOG = LoggerFactory.getLogger(RssSparkShuffleUtils.class);

public static final ClassTag<ShuffleHandleInfo> SHUFFLE_HANDLER_INFO_CLASS_TAG =
scala.reflect.ClassTag$.MODULE$.apply(ShuffleHandleInfo.class);
public static final ClassTag<SimpleShuffleHandleInfo> DEFAULT_SHUFFLE_HANDLER_INFO_CLASS_TAG =
scala.reflect.ClassTag$.MODULE$.apply(SimpleShuffleHandleInfo.class);
public static final ClassTag<byte[]> BYTE_ARRAY_CLASS_TAG =
scala.reflect.ClassTag$.MODULE$.apply(byte[].class);

Expand Down Expand Up @@ -256,22 +257,22 @@ public static SparkContext getActiveSparkContext() {
}

/**
* create broadcast variable of {@link ShuffleHandleInfo}
* create broadcast variable of {@link SimpleShuffleHandleInfo}
*
* @param sc expose for easy unit-test
* @param shuffleId
* @param partitionToServers
* @param storageInfo
* @return Broadcast variable registered for auto cleanup
*/
public static Broadcast<ShuffleHandleInfo> broadcastShuffleHdlInfo(
public static Broadcast<SimpleShuffleHandleInfo> broadcastShuffleHdlInfo(
SparkContext sc,
int shuffleId,
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
RemoteStorageInfo storageInfo) {
ShuffleHandleInfo handleInfo =
new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);
return sc.broadcast(handleInfo, SHUFFLE_HANDLER_INFO_CLASS_TAG);
SimpleShuffleHandleInfo handleInfo =
new SimpleShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);
return sc.broadcast(handleInfo, DEFAULT_SHUFFLE_HANDLER_INFO_CLASS_TAG);
}

private static <T> T instantiateFetchFailedException(
Expand Down
Loading

0 comments on commit a650c97

Please sign in to comment.