Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
xumanbu committed Jul 3, 2024
1 parent 785e4be commit 2906e54
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ protected synchronized StageAttemptShuffleHandleInfo getRemoteShuffleHandleInfoW
RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
new RssPartitionToShuffleServerRequest(shuffleId);
RssReassignOnStageRetryResponse rpcPartitionToShufflerServer =
shuffleManagerClient.getPartitionToShufflerServerWithStageRetry(
getOrCreateShuffleManagerClient().getPartitionToShufflerServerWithStageRetry(
rssPartitionToShuffleServerRequest);
StageAttemptShuffleHandleInfo shuffleHandleInfo =
StageAttemptShuffleHandleInfo.fromProto(
Expand All @@ -616,15 +616,15 @@ protected synchronized MutableShuffleHandleInfo getRemoteShuffleHandleInfoWithBl
RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
new RssPartitionToShuffleServerRequest(shuffleId);
RssReassignOnBlockSendFailureResponse rpcPartitionToShufflerServer =
shuffleManagerClient.getPartitionToShufflerServerWithBlockRetry(
getOrCreateShuffleManagerClient().getPartitionToShufflerServerWithBlockRetry(
rssPartitionToShuffleServerRequest);
MutableShuffleHandleInfo shuffleHandleInfo =
MutableShuffleHandleInfo.fromProto(rpcPartitionToShufflerServer.getHandle());
return shuffleHandleInfo;
});
}

protected AutoCloseWrapper<ShuffleManagerClient> getOrCreateShuffleManagerClientWrapper() {
protected synchronized AutoCloseWrapper<ShuffleManagerClient> getOrCreateShuffleManagerClientWrapper() {
if (managerClientAutoCloseWrapper == null) {
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
String driver = rssConf.getString("driver.host", "");
Expand All @@ -639,6 +639,21 @@ protected AutoCloseWrapper<ShuffleManagerClient> getOrCreateShuffleManagerClient
return managerClientAutoCloseWrapper;
}

private ShuffleManagerClient shuffleManagerClient;

protected synchronized ShuffleManagerClient getOrCreateShuffleManagerClient() {
if (shuffleManagerClient == null) {
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
String driver = rssConf.getString("driver.host", "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
long rpcTimeout = rssConf.getLong(RssBaseConf.RSS_CLIENT_TYPE_GRPC_TIMEOUT_MS);
this.shuffleManagerClient =
ShuffleManagerClientFactory.getInstance()
.createShuffleManagerClient(ClientType.GRPC, driver, port);
}
return shuffleManagerClient;
}

@Override
public ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int shuffleId) {
return shuffleHandleInfoManager.get(shuffleId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,76 +17,77 @@

package org.apache.uniffle.common.util;

import com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.common.exception.RssException;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;

import org.apache.uniffle.common.exception.RssException;

public class AutoCloseWrapper<T extends Closeable> implements Closeable {
private static final AtomicInteger REF_COUNTER = new AtomicInteger();
private volatile T t;
private Supplier<T> cf;
private static final AtomicInteger REF_COUNTER = new AtomicInteger(0);
private volatile T t;
private Supplier<T> cf;

public AutoCloseWrapper(Supplier<T> cf) {
this.cf = cf;
}
public AutoCloseWrapper(Supplier<T> cf) {
this.cf = cf;
}

public T get() {
if (t == null) {
synchronized (this) {
if (t == null) {
t = cf.get();
}
}
public synchronized T get() {
// if (REF_COUNTER.get() == 0) {
// synchronized (this) {
// if (REF_COUNTER.get() == 0) {
// t = cf.get();
// System.out.println(this + "==============get============");
// }
// }
// }
// REF_COUNTER.incrementAndGet();
return cf.get();
}
REF_COUNTER.incrementAndGet();
return t;
}

@Override
public synchronized void close() throws IOException {
int count = REF_COUNTER.get();
if (count == 0 || t == null) {
return;
@Override
public synchronized void close() throws IOException {
// int count = REF_COUNTER.get();
// if (count == 0) {
// return;
// }
// if (REF_COUNTER.compareAndSet(count, count - 1)) {
// if (REF_COUNTER.get() == 0) {
// try {
// // t.close();
// System.out.println(this + "==============close============");
// } catch (Exception e) {
// throw new IOException("Failed to close the resource", e);
// } finally {
// t = null;
// }
// }
// }
}
if (REF_COUNTER.compareAndSet(count, count - 1)) {
if (count == 1) {
try {
t.close();
} catch (Exception e) {
throw new IOException("Failed to close the resource", e);
} finally {
t = null;

public synchronized void forceClose() throws IOException {
while (t != null) {
this.close();
}
}
}
}

public void forceClose() throws IOException {
while (t != null) {
this.close();
@VisibleForTesting
public int getRefCount() {
return REF_COUNTER.get();
}
}

@VisibleForTesting
public int getRefCount() {
return REF_COUNTER.get();
}

public static <T, X extends Closeable> T run(
AutoCloseWrapper<X> autoCloseWrapper, AutoCloseCmd<T, X> cmd) {
try (AutoCloseWrapper<X> wrapper = autoCloseWrapper) {
return cmd.execute(wrapper.get());
} catch (IOException e) {
throw new RssException("Error closing client with error:", e);
public static <T, X extends Closeable> T run(
AutoCloseWrapper<X> autoCloseWrapper, AutoCloseCmd<T, X> cmd) {
try (AutoCloseWrapper<X> wrapper = autoCloseWrapper) {
return cmd.execute(wrapper.get());
} catch (IOException e) {
throw new RssException("Error closing client with error:", e);
}
}
}

public interface AutoCloseCmd<T, X> {
T execute(X x);
}
public interface AutoCloseCmd<T, X> {
T execute(X x);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Random;
import java.util.function.Supplier;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -62,6 +63,25 @@ void test2() {
assertEquals(mockClientAutoCloseWrapper.getRefCount(), 0);
}

@Test
void test3() throws InterruptedException {
Supplier<MockClient> cf = () -> new MockClient(false);
AutoCloseWrapper<MockClient> mockClientAutoCloseWrapper = new AutoCloseWrapper<>(cf);
for (int i = 0; i < 1000; i++) {
new Thread(()->{
AutoCloseWrapper.run(
mockClientAutoCloseWrapper,
(MockClient mockClient) -> {
mockClient.doSomething();
return "t1";
});
}).start();
}
Thread.sleep(15000);
assertEquals(mockClientAutoCloseWrapper.getRefCount(), 0);
}


@Test
void forceClose() {
Supplier<MockClient> cf = () -> new MockClient(true);
Expand All @@ -75,21 +95,46 @@ void forceClose() {
// ignore
}
assertEquals(mockClientAutoCloseWrapper.getRefCount(), 0);
MockClient mockClient3 = mockClientAutoCloseWrapper.get();
assertNotNull(mockClient3);
assertTrue(mockClient3 != mockClient);
}

static class MockClient implements Closeable {
boolean withException;

volatile boolean isClosed;

public MockClient(boolean withException) {
this.withException = withException;
this.isClosed = false;
}

@Override
public void close() throws IOException {
new Thread(()->{
try {
Thread.sleep(2000);
isClosed = true;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
if (withException) {
throw new IOException("test exception!");
}
}

public void doSomething() {
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if(isClosed) {
throw new RuntimeException("client is closed.");
}
}
}

private static void closeWrapper(AutoCloseWrapper<MockClient> mockClientAutoCloseWrapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private ShuffleManagerClientFactory() {}
public ShuffleManagerGrpcClient createShuffleManagerClient(
ClientType clientType, String host, int port, long rpcTimeout) {
if (ClientType.GRPC.equals(clientType)) {
return new ShuffleManagerGrpcClient(host, port, rpcTimeout);
return new ShuffleManagerGrpcClient(host, port);
} else {
throw new UnsupportedOperationException("Unsupported client type " + clientType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
Expand All @@ -47,26 +48,22 @@
public class ShuffleManagerGrpcClient extends GrpcClient implements ShuffleManagerClient {

private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcClient.class);
private long rpcTimeout;
private static RssBaseConf rssConf = new RssBaseConf();
private long rpcTimeout = rssConf.getLong(RssBaseConf.RSS_CLIENT_TYPE_GRPC_TIMEOUT_MS);
private ShuffleManagerGrpc.ShuffleManagerBlockingStub blockingStub;

public ShuffleManagerGrpcClient(String host, int port) {
this(host, port, 60, 3);
this(host, port, 0);
}

public ShuffleManagerGrpcClient(String host, int port, long rpcTimeout) {
this(host, port, rpcTimeout, 3);
}

public ShuffleManagerGrpcClient(String host, int port, long rpcTimeout, int maxRetryAttempts) {
this(host, port, rpcTimeout, maxRetryAttempts, true);
public ShuffleManagerGrpcClient(String host, int port, int maxRetryAttempts) {
this(host, port, maxRetryAttempts, true);
}

public ShuffleManagerGrpcClient(
String host, int port, long rpcTimeout, int maxRetryAttempts, boolean usePlaintext) {
String host, int port, int maxRetryAttempts, boolean usePlaintext) {
super(host, port, maxRetryAttempts, usePlaintext);
blockingStub = ShuffleManagerGrpc.newBlockingStub(channel);
this.rpcTimeout = rpcTimeout;
}

public ShuffleManagerGrpc.ShuffleManagerBlockingStub getBlockingStub() {
Expand Down

0 comments on commit 2906e54

Please sign in to comment.