Skip to content

Commit

Permalink
Enable dynamic client conf test in RssShuffleManagerTest
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Mar 13, 2024
1 parent 8773b65 commit cc564bb
Showing 1 changed file with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -125,8 +124,6 @@ public void testRssShuffleManagerClientConf(BlockIdLayout layout) throws Excepti

@ParameterizedTest
@MethodSource("testBlockIdLayouts")
@Disabled(
"Dynamic client conf not working for arguments used to create ShuffleWriteClient: issue #1554")
public void testRssShuffleManagerDynamicClientConf(BlockIdLayout layout) throws Exception {
doTestRssShuffleManager(null, layout, layout, true);
}
Expand Down Expand Up @@ -159,7 +156,7 @@ private void doTestRssShuffleManager(
conf.set("spark." + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL, "1000");
conf.set("spark." + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES, "10");

// configure block id layout (if set)
// configure client conf block id layout (if set)
if (clientConfLayout != null) {
conf.set(
"spark." + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
Expand All @@ -185,6 +182,17 @@ private void doTestRssShuffleManager(
RssShuffleManagerBase shuffleManager =
(RssShuffleManagerBase) SparkEnv.get().shuffleManager();

// configure expected block id layout
conf.set(
"spark." + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
String.valueOf(expectedLayout.sequenceNoBits));
conf.set(
"spark." + RssClientConf.BLOCKID_PARTITION_ID_BITS.key(),
String.valueOf(expectedLayout.partitionIdBits));
conf.set(
"spark." + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key(),
String.valueOf(expectedLayout.taskAttemptIdBits));

// get written block ids (we know there is one shuffle where two task attempts wrote two
// partitions)
RssConf rssConf = RssSparkConfig.toRssConf(conf);
Expand Down

0 comments on commit cc564bb

Please sign in to comment.