Skip to content

Commit

Permalink
Revert "[Improvement] Skip blocks when read from memory (#294)" (#403)
Browse files Browse the repository at this point in the history
This reverts commit 55191c4.

### What changes were proposed in this pull request?
Revert #294 

### Why are the changes needed?
BlockId is discontinuous, so BLOCKID_RANGE is not a good choice to filter memory data

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No need
  • Loading branch information
xianjingfeng authored Dec 12, 2022
1 parent 9b10545 commit 45e600c
Show file tree
Hide file tree
Showing 26 changed files with 90 additions and 579 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,6 @@ public class RssMRConfig {
public static final int RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE =
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE;

public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY;
public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE;

public static final String RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS;
public static final int RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE;

public static final String RSS_CONF_FILE = "rss_conf.xml";

public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.UnitConverter;
Expand Down Expand Up @@ -83,8 +82,6 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
private int readBufferSize;
private RemoteStorageInfo remoteStorageInfo;
private int appAttemptId;
private BlockSkipStrategy blockSkipStrategy;
private int maxBlockIdRangeSegments;

@Override
public void init(ShuffleConsumerPlugin.Context context) {
Expand Down Expand Up @@ -112,12 +109,6 @@ public void init(ShuffleConsumerPlugin.Context context) {
RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
this.replica = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_DATA_REPLICA,
RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
blockSkipStrategy = BlockSkipStrategy.valueOf(RssMRUtils.getString(rssJobConf, mrJobConf,
RssMRConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY,
RssMRConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE));
maxBlockIdRangeSegments = RssMRUtils.getInt(rssJobConf, mrJobConf,
RssMRConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS,
RssMRConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE);

this.partitionNum = mrJobConf.getNumReduceTasks();
this.partitionNumPerRange = RssMRUtils.getInt(rssJobConf, mrJobConf, RssMRConfig.RSS_PARTITION_NUM_PER_RANGE,
Expand Down Expand Up @@ -203,7 +194,7 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, 0, reduceId.getTaskID().getId(), storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, serverInfoList,
readerJobConf, new MRIdHelper(), blockSkipStrategy, maxBlockIdRangeSegments);
readerJobConf, new MRIdHelper());
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus, merger, copyPhase, reporter, metrics,
shuffleReadClient, blockIdBitmap.getLongCardinality(), RssMRConfig.toRssConf(rssJobConf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,6 @@ public class RssSparkConfig {
+ " spark.rss.estimate.server.assignment.enabled"))
.createWithDefault(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);

public static final ConfigEntry<String> RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY = createStringBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY)
.doc("The strategy for skip block when read from memory."))
.createWithDefault(RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE);


public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
ImmutableSet.of(RSS_STORAGE_TYPE.key(), RSS_REMOTE_STORAGE_PATH.key());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;

Expand All @@ -73,8 +71,6 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
private List<ShuffleServerInfo> shuffleServerInfoList;
private Configuration hadoopConf;
private RssConf rssConf;
private final BlockSkipStrategy blockSkipStrategy;
private final int maxBlockIdRangeSegments;

public RssShuffleReader(
int startPartition,
Expand Down Expand Up @@ -111,14 +107,6 @@ public RssShuffleReader(
this.shuffleServerInfoList =
(List<ShuffleServerInfo>) (rssShuffleHandle.getPartitionToServers().get(startPartition));
this.rssConf = rssConf;

BlockSkipStrategy blockSkipStrategy = BlockSkipStrategy.valueOf(
rssConf.getString(RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY,
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE));
this.blockSkipStrategy = shuffleServerInfoList.size() <= 1 ? BlockSkipStrategy.NONE : blockSkipStrategy;

maxBlockIdRangeSegments = rssConf.getInteger(RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS,
RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE);
}

@Override
Expand All @@ -127,8 +115,7 @@ public Iterator<Product2<K, C>> read() {

CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, shuffleId, startPartition, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
blockSkipStrategy, maxBlockIdRangeSegments);
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf);
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator rssShuffleDataIterator = new RssShuffleDataIterator<K, C>(
shuffleDependency.serializer(), shuffleReadClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,14 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;

public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
private static final Logger LOG = LoggerFactory.getLogger(RssShuffleReader.class);
private final Map<Integer, List<ShuffleServerInfo>> partitionToShuffleServers;

private String appId;
private int shuffleId;
private int startPartition;
Expand All @@ -79,8 +78,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
private ShuffleReadMetrics readMetrics;
private RssConf rssConf;
private ShuffleDataDistributionType dataDistributionType;
private final BlockSkipStrategy blockSkipStrategy;
private final int maxBlockIdRangeSegments;
private boolean expectedTaskIdsBitmapFilterEnable;

public RssShuffleReader(
int startPartition,
Expand Down Expand Up @@ -122,11 +120,9 @@ public RssShuffleReader(
this.partitionToShuffleServers = rssShuffleHandle.getPartitionToServers();
this.rssConf = rssConf;
this.dataDistributionType = dataDistributionType;
blockSkipStrategy = BlockSkipStrategy.valueOf(
rssConf.getString(RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY,
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE));
maxBlockIdRangeSegments = rssConf.getInteger(RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS,
RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE);
// This mechanism of expectedTaskIdsBitmap filter is to filter out the most of data.
// especially for AQE skew optimization
this.expectedTaskIdsBitmapFilterEnable = !(mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE);
}

@Override
Expand Down Expand Up @@ -211,15 +207,10 @@ class MultiPartitionIterator<K, C> extends AbstractIterator<Product2<K, C>> {
continue;
}
List<ShuffleServerInfo> shuffleServerInfoList = partitionToShuffleServers.get(partition);
// If AQE is disable and the number of replica is 1, we should set BlockSkipStrategy to NONE
// for reduce data transmission
BlockSkipStrategy realBlockSkipStrategy = shuffleServerInfoList.size() <= 1
&& mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE
? BlockSkipStrategy.NONE : blockSkipStrategy;
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, shuffleId, partition, storageType, basePath, indexReadLimit, readBufferSize,
1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList,
hadoopConf, dataDistributionType, realBlockSkipStrategy, maxBlockIdRangeSegments);
hadoopConf, dataDistributionType, expectedTaskIdsBitmapFilterEnable);
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator iterator = new RssShuffleDataIterator<K, C>(
shuffleDependency.serializer(), shuffleReadClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest
request.getHadoopConf(),
request.getIdHelper(),
request.getShuffleDataDistributionType(),
request.getBlockSkipStrategy(),
request.getMaxBlockIdRangeSegments()
request.isExpectedTaskIdsBitmapFilterEnable()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.util.IdHelper;
import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
Expand Down Expand Up @@ -79,8 +78,7 @@ public ShuffleReadClientImpl(
Configuration hadoopConf,
IdHelper idHelper,
ShuffleDataDistributionType dataDistributionType,
BlockSkipStrategy blockSkipStrategy,
int maxBlockIdRangeSegments) {
boolean expectedTaskIdsBitmapFilterEnable) {
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.blockIdBitmap = blockIdBitmap;
Expand All @@ -104,10 +102,9 @@ public ShuffleReadClientImpl(
request.setProcessBlockIds(processedBlockIds);
request.setDistributionType(dataDistributionType);
request.setExpectTaskIds(taskIdBitmap);
if (BlockSkipStrategy.BLOCKID_RANGE.equals(blockSkipStrategy)) {
request.setMaxBlockIdRangeSegments(maxBlockIdRangeSegments);
if (expectedTaskIdsBitmapFilterEnable) {
request.useExpectedTaskIdsBitmapFilter();
}
request.setBlockSkipStrategy(blockSkipStrategy);

List<Long> removeBlockIds = Lists.newArrayList();
blockIdBitmap.forEach(bid -> {
Expand Down Expand Up @@ -144,7 +141,7 @@ public ShuffleReadClientImpl(
this(storageType, appId, shuffleId, partitionId, indexReadLimit,
partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
idHelper, ShuffleDataDistributionType.NORMAL, BlockSkipStrategy.TASK_BITMAP, 0);
idHelper, ShuffleDataDistributionType.NORMAL, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.client.util.IdHelper;
import org.apache.uniffle.common.BlockSkipStrategy;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;

Expand All @@ -45,8 +44,7 @@ public class CreateShuffleReadClientRequest {
private Configuration hadoopConf;
private IdHelper idHelper;
private ShuffleDataDistributionType shuffleDataDistributionType = ShuffleDataDistributionType.NORMAL;
private BlockSkipStrategy blockSkipStrategy;
private int maxBlockIdRangeSegments;
private boolean expectedTaskIdsBitmapFilterEnable = false;

public CreateShuffleReadClientRequest(
String appId,
Expand All @@ -63,12 +61,12 @@ public CreateShuffleReadClientRequest(
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
ShuffleDataDistributionType dataDistributionType,
BlockSkipStrategy blockSkipStrategy,
int maxBlockIdRangeSegments) {
boolean expectedTaskIdsBitmapFilterEnable) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList,
hadoopConf, new DefaultIdHelper(), blockSkipStrategy, maxBlockIdRangeSegments);
hadoopConf, new DefaultIdHelper());
this.shuffleDataDistributionType = dataDistributionType;
this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
}

public CreateShuffleReadClientRequest(
Expand All @@ -84,12 +82,10 @@ public CreateShuffleReadClientRequest(
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
BlockSkipStrategy blockSkipStrategy,
int maxBlockIdRangeSegments) {
Configuration hadoopConf) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList,
hadoopConf, new DefaultIdHelper(), blockSkipStrategy, maxBlockIdRangeSegments);
hadoopConf, new DefaultIdHelper());
}

public CreateShuffleReadClientRequest(
Expand All @@ -106,9 +102,7 @@ public CreateShuffleReadClientRequest(
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
IdHelper idHelper,
BlockSkipStrategy blockSkipStrategy,
int maxBlockIdRangeSegments) {
IdHelper idHelper) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
Expand All @@ -123,8 +117,6 @@ public CreateShuffleReadClientRequest(
this.shuffleServerInfoList = shuffleServerInfoList;
this.hadoopConf = hadoopConf;
this.idHelper = idHelper;
this.blockSkipStrategy = blockSkipStrategy;
this.maxBlockIdRangeSegments = maxBlockIdRangeSegments;
}

public String getAppId() {
Expand Down Expand Up @@ -187,19 +179,7 @@ public ShuffleDataDistributionType getShuffleDataDistributionType() {
return shuffleDataDistributionType;
}

public int getMaxBlockIdRangeSegments() {
return maxBlockIdRangeSegments;
}

public void setMaxBlockIdRangeSegments(int maxBlockIdRangeSegments) {
this.maxBlockIdRangeSegments = maxBlockIdRangeSegments;
}

public BlockSkipStrategy getBlockSkipStrategy() {
return blockSkipStrategy;
}

public void setBlockSkipStrategy(BlockSkipStrategy blockSkipStrategy) {
this.blockSkipStrategy = blockSkipStrategy;
public boolean isExpectedTaskIdsBitmapFilterEnable() {
return expectedTaskIdsBitmapFilterEnable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class RssClientConfig {
// The tags specified by rss client to determine server assignment.
public static final String RSS_CLIENT_ASSIGNMENT_TAGS = "rss.client.assignment.tags";
public static final String RSS_TEST_MODE_ENABLE = "rss.test.mode.enable";

public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL = "rss.client.assignment.retry.interval";
public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE = 65000;
public static final String RSS_CLIENT_ASSIGNMENT_RETRY_TIMES = "rss.client.assignment.retry.times";
Expand All @@ -86,11 +86,4 @@ public class RssClientConfig {
public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = "rss.estimate.task.concurrency.per.server";
public static final int RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE = 80;

public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY = "rss.client.read.block.skip.strategy";

public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE = "TASK_BITMAP";

public static final String RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS =
"rss.client.read.block.skip.range.segments.max";
public static final int RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE = 10;
}

This file was deleted.

Loading

0 comments on commit 45e600c

Please sign in to comment.