From 5f82f4dc0f58fc513eed9fe081b6fef8a64e7a01 Mon Sep 17 00:00:00 2001 From: Yiqi You <2810592075@qq.com> Date: Tue, 6 Feb 2024 14:16:08 +0800 Subject: [PATCH 1/6] Fix issue #1476: Provide dedicated unregister app rpc interface Signed-off-by: Yiqi You <2810592075@qq.com> --- .../client/impl/ShuffleWriteClientImpl.java | 66 +++++++++++-------- .../client/api/ShuffleServerClient.java | 27 ++------ .../impl/grpc/ShuffleServerGrpcClient.java | 54 ++++++++------- .../request/RssUnregisterAppRequest.java | 30 +++++++++ .../response/RssUnregisterAppResponse.java | 27 ++++++++ proto/src/main/proto/Rss.proto | 10 +++ .../server/ShuffleServerGrpcService.java | 34 ++++++++++ 7 files changed, 174 insertions(+), 74 deletions(-) create mode 100644 internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterAppRequest.java create mode 100644 internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterAppResponse.java diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index faa1f247cc..d830b3f9b8 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -48,33 +48,8 @@ import org.apache.uniffle.client.factory.CoordinatorClientFactory; import org.apache.uniffle.client.factory.ShuffleClientFactory; import org.apache.uniffle.client.factory.ShuffleServerClientFactory; -import org.apache.uniffle.client.request.RssAppHeartBeatRequest; -import org.apache.uniffle.client.request.RssApplicationInfoRequest; -import org.apache.uniffle.client.request.RssFetchClientConfRequest; -import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest; -import org.apache.uniffle.client.request.RssFinishShuffleRequest; -import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest; -import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest; -import org.apache.uniffle.client.request.RssGetShuffleResultRequest; -import org.apache.uniffle.client.request.RssRegisterShuffleRequest; -import org.apache.uniffle.client.request.RssReportShuffleResultRequest; -import org.apache.uniffle.client.request.RssSendCommitRequest; -import org.apache.uniffle.client.request.RssSendShuffleDataRequest; -import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; -import org.apache.uniffle.client.response.ClientResponse; -import org.apache.uniffle.client.response.RssAppHeartBeatResponse; -import org.apache.uniffle.client.response.RssApplicationInfoResponse; -import org.apache.uniffle.client.response.RssFetchClientConfResponse; -import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse; -import org.apache.uniffle.client.response.RssFinishShuffleResponse; -import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse; -import org.apache.uniffle.client.response.RssGetShuffleResultResponse; -import org.apache.uniffle.client.response.RssRegisterShuffleResponse; -import org.apache.uniffle.client.response.RssReportShuffleResultResponse; -import org.apache.uniffle.client.response.RssSendCommitResponse; -import org.apache.uniffle.client.response.RssSendShuffleDataResponse; -import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; -import org.apache.uniffle.client.response.SendShuffleDataResult; +import org.apache.uniffle.client.request.*; +import org.apache.uniffle.client.response.*; import org.apache.uniffle.client.util.ClientUtils; import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; @@ -984,6 +959,8 @@ public void unregisterShuffle(String appId, int shuffleId) { @Override public void unregisterShuffle(String appId) { + RssUnregisterAppRequest request = new RssUnregisterAppRequest(appId); + if (appId == null) { return; } @@ -991,7 +968,40 @@ public void unregisterShuffle(String appId) { if (appServerMap == null) { return; } - appServerMap.keySet().forEach(shuffleId -> unregisterShuffle(appId, shuffleId)); + Set shuffleServerInfos = getAllShuffleServers(appId); + + ExecutorService executorService = null; + try { + executorService = + ThreadUtils.getDaemonFixedThreadPool( + Math.min(unregisterThreadPoolSize, shuffleServerInfos.size()), "unregister-shuffle"); + + ThreadUtils.executeTasks( + executorService, + shuffleServerInfos, + shuffleServerInfo -> { + try { + ShuffleServerClient client = + ShuffleServerClientFactory.getInstance() + .getShuffleServerClient(clientType, shuffleServerInfo, rssConf); + RssUnregisterAppResponse response = client.unregisterApp(request); + if (response.getStatusCode() != StatusCode.SUCCESS) { + LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo); + } + } catch (Exception e) { + LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e); + } + return null; + }, + unregisterRequestTimeSec, + "unregister shuffle server"); + + } finally { + if (executorService != null) { + executorService.shutdownNow(); + } + shuffleServerInfoMap.remove(appId); + } } private void throwExceptionIfNecessary(ClientResponse response, String errorMsg) { diff --git a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java index eb4cd398f9..2b3a4b52f4 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java @@ -17,29 +17,8 @@ package org.apache.uniffle.client.api; -import org.apache.uniffle.client.request.RssAppHeartBeatRequest; -import org.apache.uniffle.client.request.RssFinishShuffleRequest; -import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest; -import org.apache.uniffle.client.request.RssGetShuffleDataRequest; -import org.apache.uniffle.client.request.RssGetShuffleIndexRequest; -import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest; -import org.apache.uniffle.client.request.RssGetShuffleResultRequest; -import org.apache.uniffle.client.request.RssRegisterShuffleRequest; -import org.apache.uniffle.client.request.RssReportShuffleResultRequest; -import org.apache.uniffle.client.request.RssSendCommitRequest; -import org.apache.uniffle.client.request.RssSendShuffleDataRequest; -import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; -import org.apache.uniffle.client.response.RssAppHeartBeatResponse; -import org.apache.uniffle.client.response.RssFinishShuffleResponse; -import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse; -import org.apache.uniffle.client.response.RssGetShuffleDataResponse; -import org.apache.uniffle.client.response.RssGetShuffleIndexResponse; -import org.apache.uniffle.client.response.RssGetShuffleResultResponse; -import org.apache.uniffle.client.response.RssRegisterShuffleResponse; -import org.apache.uniffle.client.response.RssReportShuffleResultResponse; -import org.apache.uniffle.client.response.RssSendCommitResponse; -import org.apache.uniffle.client.response.RssSendShuffleDataResponse; -import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; +import org.apache.uniffle.client.request.*; +import org.apache.uniffle.client.response.*; public interface ShuffleServerClient { @@ -47,6 +26,8 @@ public interface ShuffleServerClient { RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest request); + RssUnregisterAppResponse unregisterApp(RssUnregisterAppRequest request); + RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request); RssSendCommitResponse sendCommit(RssSendCommitRequest request); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 6bae6f62c3..be60c34490 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -34,29 +34,8 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.client.api.ShuffleServerClient; -import org.apache.uniffle.client.request.RssAppHeartBeatRequest; -import org.apache.uniffle.client.request.RssFinishShuffleRequest; -import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest; -import org.apache.uniffle.client.request.RssGetShuffleDataRequest; -import org.apache.uniffle.client.request.RssGetShuffleIndexRequest; -import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest; -import org.apache.uniffle.client.request.RssGetShuffleResultRequest; -import org.apache.uniffle.client.request.RssRegisterShuffleRequest; -import org.apache.uniffle.client.request.RssReportShuffleResultRequest; -import org.apache.uniffle.client.request.RssSendCommitRequest; -import org.apache.uniffle.client.request.RssSendShuffleDataRequest; -import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; -import org.apache.uniffle.client.response.RssAppHeartBeatResponse; -import org.apache.uniffle.client.response.RssFinishShuffleResponse; -import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse; -import org.apache.uniffle.client.response.RssGetShuffleDataResponse; -import org.apache.uniffle.client.response.RssGetShuffleIndexResponse; -import org.apache.uniffle.client.response.RssGetShuffleResultResponse; -import org.apache.uniffle.client.response.RssRegisterShuffleResponse; -import org.apache.uniffle.client.response.RssReportShuffleResultResponse; -import org.apache.uniffle.client.response.RssSendCommitResponse; -import org.apache.uniffle.client.response.RssSendShuffleDataResponse; -import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; +import org.apache.uniffle.client.request.*; +import org.apache.uniffle.client.response.*; import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; @@ -328,6 +307,35 @@ public long requirePreAllocation( return result; } + private RssProtos.AppUnregisterResponse doUnregisterApp(String appId) { + RssProtos.AppUnregisterRequest request = + RssProtos.AppUnregisterRequest.newBuilder().setAppId(appId).build(); + return blockingStub.unregisterApp(request); + } + + @Override + public RssUnregisterAppResponse unregisterApp(RssUnregisterAppRequest request) { + RssProtos.AppUnregisterResponse rpcResponse = doUnregisterApp(request.getAppId()); + + RssUnregisterAppResponse response; + RssProtos.StatusCode statusCode = rpcResponse.getStatus(); + + switch (statusCode) { + case SUCCESS: + response = new RssUnregisterAppResponse(StatusCode.SUCCESS); + break; + default: + String msg = + String.format( + "Errors on unregister app to %s:%s for appId[%s], error: %s", + host, port, request.getAppId(), rpcResponse.getRetMsg()); + LOG.error(msg); + throw new RssException(msg); + } + + return response; + } + private RssProtos.ShuffleUnregisterResponse doUnregisterShuffle(String appId, int shuffleId) { RssProtos.ShuffleUnregisterRequest request = RssProtos.ShuffleUnregisterRequest.newBuilder() diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterAppRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterAppRequest.java new file mode 100644 index 0000000000..3d1550d3e4 --- /dev/null +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterAppRequest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client.request; + +public class RssUnregisterAppRequest { + private String appId; + + public RssUnregisterAppRequest(String appId) { + this.appId = appId; + } + + public String getAppId() { + return appId; + } +} diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterAppResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterAppResponse.java new file mode 100644 index 0000000000..2c298fdb72 --- /dev/null +++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterAppResponse.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client.response; + +import org.apache.uniffle.common.rpc.StatusCode; + +public class RssUnregisterAppResponse extends ClientResponse { + + public RssUnregisterAppResponse(StatusCode statusCode) { + super(statusCode); + } +} diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 9f601f2428..8bf343a652 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -26,6 +26,7 @@ package rss.common; service ShuffleServer { rpc registerShuffle (ShuffleRegisterRequest) returns (ShuffleRegisterResponse); rpc unregisterShuffle(ShuffleUnregisterRequest) returns (ShuffleUnregisterResponse); + rpc unregisterApp(AppUnregisterRequest) returns (AppUnregisterResponse); rpc sendShuffleData (SendShuffleDataRequest) returns (SendShuffleDataResponse); rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns (GetLocalShuffleIndexResponse); rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns (GetLocalShuffleDataResponse); @@ -197,6 +198,15 @@ message ShuffleRegisterResponse { string retMsg = 2; } +message AppUnregisterRequest { + string appId = 1; +} + +message AppUnregisterResponse { + StatusCode status = 1; + string retMsg = 2; +} + message SendShuffleDataRequest { string appId = 1; int32 shuffleId = 2; diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 65c2b998bc..577e521bf0 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -18,6 +18,7 @@ package org.apache.uniffle.server; import java.nio.ByteBuffer; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -25,6 +26,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.RangeMap; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; @@ -86,6 +88,7 @@ import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse; import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase; import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo; +import org.apache.uniffle.server.buffer.ShuffleBuffer; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.common.StorageReadMetrics; import org.apache.uniffle.storage.util.ShuffleStorageUtils; @@ -99,6 +102,37 @@ public ShuffleServerGrpcService(ShuffleServer shuffleServer) { this.shuffleServer = shuffleServer; } + @Override + public void unregisterApp( + RssProtos.AppUnregisterRequest request, + StreamObserver responseStreamObserver) { + String appId = request.getAppId(); + + StatusCode result = StatusCode.SUCCESS; + String responseMessage = "OK"; + try { + Map> shuffleIdToBuffers = + shuffleServer.getShuffleBufferManager().getBufferPool().get(appId); + if (shuffleIdToBuffers != null) { + HashSet shuffleIds = Sets.newHashSet(shuffleIdToBuffers.keySet()); + shuffleIds.forEach( + shuffleId -> + shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId, shuffleId)); + } + + } catch (Exception e) { + result = StatusCode.INTERNAL_ERROR; + } + + RssProtos.AppUnregisterResponse reply = + RssProtos.AppUnregisterResponse.newBuilder() + .setStatus(result.toProto()) + .setRetMsg(responseMessage) + .build(); + responseStreamObserver.onNext(reply); + responseStreamObserver.onCompleted(); + } + @Override public void unregisterShuffle( RssProtos.ShuffleUnregisterRequest request, From a3a76f8329b61183c74b9f79a33823fbb23c309a Mon Sep 17 00:00:00 2001 From: Yiqi You <2810592075@qq.com> Date: Tue, 6 Feb 2024 15:12:14 +0800 Subject: [PATCH 2/6] [FEATURE] Provide dedicated unregister app rpc interface Signed-off-by: Yiqi You <2810592075@qq.com> --- .../client/impl/ShuffleWriteClientImpl.java | 31 +++++++++++++++++-- .../client/api/ShuffleServerClient.java | 27 ++++++++++++++-- .../impl/grpc/ShuffleServerGrpcClient.java | 27 ++++++++++++++-- 3 files changed, 79 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index d830b3f9b8..c156b9a1ba 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -48,8 +48,35 @@ import org.apache.uniffle.client.factory.CoordinatorClientFactory; import org.apache.uniffle.client.factory.ShuffleClientFactory; import org.apache.uniffle.client.factory.ShuffleServerClientFactory; -import org.apache.uniffle.client.request.*; -import org.apache.uniffle.client.response.*; +import org.apache.uniffle.client.request.RssAppHeartBeatRequest; +import org.apache.uniffle.client.request.RssApplicationInfoRequest; +import org.apache.uniffle.client.request.RssFetchClientConfRequest; +import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest; +import org.apache.uniffle.client.request.RssFinishShuffleRequest; +import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest; +import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest; +import org.apache.uniffle.client.request.RssGetShuffleResultRequest; +import org.apache.uniffle.client.request.RssRegisterShuffleRequest; +import org.apache.uniffle.client.request.RssReportShuffleResultRequest; +import org.apache.uniffle.client.request.RssSendCommitRequest; +import org.apache.uniffle.client.request.RssSendShuffleDataRequest; +import org.apache.uniffle.client.request.RssUnregisterAppRequest; +import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; +import org.apache.uniffle.client.response.ClientResponse; +import org.apache.uniffle.client.response.RssAppHeartBeatResponse; +import org.apache.uniffle.client.response.RssApplicationInfoResponse; +import org.apache.uniffle.client.response.RssFetchClientConfResponse; +import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse; +import org.apache.uniffle.client.response.RssFinishShuffleResponse; +import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse; +import org.apache.uniffle.client.response.RssGetShuffleResultResponse; +import org.apache.uniffle.client.response.RssRegisterShuffleResponse; +import org.apache.uniffle.client.response.RssReportShuffleResultResponse; +import org.apache.uniffle.client.response.RssSendCommitResponse; +import org.apache.uniffle.client.response.RssSendShuffleDataResponse; +import org.apache.uniffle.client.response.RssUnregisterAppResponse; +import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; +import org.apache.uniffle.client.response.SendShuffleDataResult; import org.apache.uniffle.client.util.ClientUtils; import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; diff --git a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java index 2b3a4b52f4..dfbaf570c3 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java @@ -17,8 +17,31 @@ package org.apache.uniffle.client.api; -import org.apache.uniffle.client.request.*; -import org.apache.uniffle.client.response.*; +import org.apache.uniffle.client.request.RssAppHeartBeatRequest; +import org.apache.uniffle.client.request.RssFinishShuffleRequest; +import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest; +import org.apache.uniffle.client.request.RssGetShuffleDataRequest; +import org.apache.uniffle.client.request.RssGetShuffleIndexRequest; +import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest; +import org.apache.uniffle.client.request.RssGetShuffleResultRequest; +import org.apache.uniffle.client.request.RssRegisterShuffleRequest; +import org.apache.uniffle.client.request.RssReportShuffleResultRequest; +import org.apache.uniffle.client.request.RssSendCommitRequest; +import org.apache.uniffle.client.request.RssSendShuffleDataRequest; +import org.apache.uniffle.client.request.RssUnregisterAppRequest; +import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; +import org.apache.uniffle.client.response.RssAppHeartBeatResponse; +import org.apache.uniffle.client.response.RssFinishShuffleResponse; +import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse; +import org.apache.uniffle.client.response.RssGetShuffleDataResponse; +import org.apache.uniffle.client.response.RssGetShuffleIndexResponse; +import org.apache.uniffle.client.response.RssGetShuffleResultResponse; +import org.apache.uniffle.client.response.RssRegisterShuffleResponse; +import org.apache.uniffle.client.response.RssReportShuffleResultResponse; +import org.apache.uniffle.client.response.RssSendCommitResponse; +import org.apache.uniffle.client.response.RssSendShuffleDataResponse; +import org.apache.uniffle.client.response.RssUnregisterAppResponse; +import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; public interface ShuffleServerClient { diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index be60c34490..60f2da3dcb 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -34,8 +34,31 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.client.api.ShuffleServerClient; -import org.apache.uniffle.client.request.*; -import org.apache.uniffle.client.response.*; +import org.apache.uniffle.client.request.RssAppHeartBeatRequest; +import org.apache.uniffle.client.request.RssFinishShuffleRequest; +import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest; +import org.apache.uniffle.client.request.RssGetShuffleDataRequest; +import org.apache.uniffle.client.request.RssGetShuffleIndexRequest; +import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest; +import org.apache.uniffle.client.request.RssGetShuffleResultRequest; +import org.apache.uniffle.client.request.RssRegisterShuffleRequest; +import org.apache.uniffle.client.request.RssReportShuffleResultRequest; +import org.apache.uniffle.client.request.RssSendCommitRequest; +import org.apache.uniffle.client.request.RssSendShuffleDataRequest; +import org.apache.uniffle.client.request.RssUnregisterAppRequest; +import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; +import org.apache.uniffle.client.response.RssAppHeartBeatResponse; +import org.apache.uniffle.client.response.RssFinishShuffleResponse; +import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse; +import org.apache.uniffle.client.response.RssGetShuffleDataResponse; +import org.apache.uniffle.client.response.RssGetShuffleIndexResponse; +import org.apache.uniffle.client.response.RssGetShuffleResultResponse; +import org.apache.uniffle.client.response.RssRegisterShuffleResponse; +import org.apache.uniffle.client.response.RssReportShuffleResultResponse; +import org.apache.uniffle.client.response.RssSendCommitResponse; +import org.apache.uniffle.client.response.RssSendShuffleDataResponse; +import org.apache.uniffle.client.response.RssUnregisterAppResponse; +import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; From 2db77f2c403acbb872e185525a1bfc974e94023e Mon Sep 17 00:00:00 2001 From: Yiqi You <2810592075@qq.com> Date: Tue, 6 Feb 2024 18:29:17 +0800 Subject: [PATCH 3/6] feat(spark): Provide dedicated unregister app rpc interface Signed-off-by: Yiqi You <2810592075@qq.com> --- .../client/impl/ShuffleWriteClientImpl.java | 9 +++--- .../client/api/ShuffleServerClient.java | 32 ++++--------------- .../impl/grpc/ShuffleServerGrpcClient.java | 22 +++++++------ ...> RssUnregisterShuffleByAppIdRequest.java} | 4 +-- ... RssUnregisterShuffleByAppIdResponse.java} | 4 +-- proto/src/main/proto/Rss.proto | 6 ++-- .../server/ShuffleServerGrpcService.java | 10 +++--- 7 files changed, 35 insertions(+), 52 deletions(-) rename internal-client/src/main/java/org/apache/uniffle/client/request/{RssUnregisterAppRequest.java => RssUnregisterShuffleByAppIdRequest.java} (89%) rename internal-client/src/main/java/org/apache/uniffle/client/response/{RssUnregisterAppResponse.java => RssUnregisterShuffleByAppIdResponse.java} (86%) diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index c156b9a1ba..337869d228 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -60,7 +60,7 @@ import org.apache.uniffle.client.request.RssReportShuffleResultRequest; import org.apache.uniffle.client.request.RssSendCommitRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; -import org.apache.uniffle.client.request.RssUnregisterAppRequest; +import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest; import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; import org.apache.uniffle.client.response.ClientResponse; import org.apache.uniffle.client.response.RssAppHeartBeatResponse; @@ -74,7 +74,7 @@ import org.apache.uniffle.client.response.RssReportShuffleResultResponse; import org.apache.uniffle.client.response.RssSendCommitResponse; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; -import org.apache.uniffle.client.response.RssUnregisterAppResponse; +import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse; import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; import org.apache.uniffle.client.response.SendShuffleDataResult; import org.apache.uniffle.client.util.ClientUtils; @@ -986,7 +986,7 @@ public void unregisterShuffle(String appId, int shuffleId) { @Override public void unregisterShuffle(String appId) { - RssUnregisterAppRequest request = new RssUnregisterAppRequest(appId); + RssUnregisterShuffleByAppIdRequest request = new RssUnregisterShuffleByAppIdRequest(appId); if (appId == null) { return; @@ -1011,7 +1011,8 @@ public void unregisterShuffle(String appId) { ShuffleServerClient client = ShuffleServerClientFactory.getInstance() .getShuffleServerClient(clientType, shuffleServerInfo, rssConf); - RssUnregisterAppResponse response = client.unregisterApp(request); + RssUnregisterShuffleByAppIdResponse response = + client.unregisterShuffleByAppId(request); if (response.getStatusCode() != StatusCode.SUCCESS) { LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo); } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java index dfbaf570c3..60172eaf87 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java @@ -17,31 +17,10 @@ package org.apache.uniffle.client.api; -import org.apache.uniffle.client.request.RssAppHeartBeatRequest; -import org.apache.uniffle.client.request.RssFinishShuffleRequest; -import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest; -import org.apache.uniffle.client.request.RssGetShuffleDataRequest; -import org.apache.uniffle.client.request.RssGetShuffleIndexRequest; -import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest; -import org.apache.uniffle.client.request.RssGetShuffleResultRequest; -import org.apache.uniffle.client.request.RssRegisterShuffleRequest; -import org.apache.uniffle.client.request.RssReportShuffleResultRequest; -import org.apache.uniffle.client.request.RssSendCommitRequest; -import org.apache.uniffle.client.request.RssSendShuffleDataRequest; -import org.apache.uniffle.client.request.RssUnregisterAppRequest; -import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; -import org.apache.uniffle.client.response.RssAppHeartBeatResponse; -import org.apache.uniffle.client.response.RssFinishShuffleResponse; -import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse; -import org.apache.uniffle.client.response.RssGetShuffleDataResponse; -import org.apache.uniffle.client.response.RssGetShuffleIndexResponse; -import org.apache.uniffle.client.response.RssGetShuffleResultResponse; -import org.apache.uniffle.client.response.RssRegisterShuffleResponse; -import org.apache.uniffle.client.response.RssReportShuffleResultResponse; -import org.apache.uniffle.client.response.RssSendCommitResponse; -import org.apache.uniffle.client.response.RssSendShuffleDataResponse; -import org.apache.uniffle.client.response.RssUnregisterAppResponse; -import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; +import org.apache.uniffle.client.request.*; +import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest; +import org.apache.uniffle.client.response.*; +import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse; public interface ShuffleServerClient { @@ -49,7 +28,8 @@ public interface ShuffleServerClient { RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest request); - RssUnregisterAppResponse unregisterApp(RssUnregisterAppRequest request); + RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId( + RssUnregisterShuffleByAppIdRequest request); RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 60f2da3dcb..7297aec9d7 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -45,7 +45,7 @@ import org.apache.uniffle.client.request.RssReportShuffleResultRequest; import org.apache.uniffle.client.request.RssSendCommitRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; -import org.apache.uniffle.client.request.RssUnregisterAppRequest; +import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest; import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; import org.apache.uniffle.client.response.RssAppHeartBeatResponse; import org.apache.uniffle.client.response.RssFinishShuffleResponse; @@ -57,7 +57,7 @@ import org.apache.uniffle.client.response.RssReportShuffleResultResponse; import org.apache.uniffle.client.response.RssSendCommitResponse; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; -import org.apache.uniffle.client.response.RssUnregisterAppResponse; +import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse; import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.PartitionRange; @@ -330,22 +330,24 @@ public long requirePreAllocation( return result; } - private RssProtos.AppUnregisterResponse doUnregisterApp(String appId) { - RssProtos.AppUnregisterRequest request = - RssProtos.AppUnregisterRequest.newBuilder().setAppId(appId).build(); - return blockingStub.unregisterApp(request); + private RssProtos.ShuffleUnregisterByAppIdResponse doUnregisterShuffleByAppId(String appId) { + RssProtos.ShuffleUnregisterByAppIdRequest request = + RssProtos.ShuffleUnregisterByAppIdRequest.newBuilder().setAppId(appId).build(); + return blockingStub.unregisterShuffleByAppId(request); } @Override - public RssUnregisterAppResponse unregisterApp(RssUnregisterAppRequest request) { - RssProtos.AppUnregisterResponse rpcResponse = doUnregisterApp(request.getAppId()); + public RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId( + RssUnregisterShuffleByAppIdRequest request) { + RssProtos.ShuffleUnregisterByAppIdResponse rpcResponse = + doUnregisterShuffleByAppId(request.getAppId()); - RssUnregisterAppResponse response; + RssUnregisterShuffleByAppIdResponse response; RssProtos.StatusCode statusCode = rpcResponse.getStatus(); switch (statusCode) { case SUCCESS: - response = new RssUnregisterAppResponse(StatusCode.SUCCESS); + response = new RssUnregisterShuffleByAppIdResponse(StatusCode.SUCCESS); break; default: String msg = diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterAppRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java similarity index 89% rename from internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterAppRequest.java rename to internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java index 3d1550d3e4..0992355a5e 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterAppRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java @@ -17,10 +17,10 @@ package org.apache.uniffle.client.request; -public class RssUnregisterAppRequest { +public class RssUnregisterShuffleByAppIdRequest { private String appId; - public RssUnregisterAppRequest(String appId) { + public RssUnregisterShuffleByAppIdRequest(String appId) { this.appId = appId; } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterAppResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java similarity index 86% rename from internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterAppResponse.java rename to internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java index 2c298fdb72..5c01e84e94 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterAppResponse.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java @@ -19,9 +19,9 @@ import org.apache.uniffle.common.rpc.StatusCode; -public class RssUnregisterAppResponse extends ClientResponse { +public class RssUnregisterShuffleByAppIdResponse extends ClientResponse { - public RssUnregisterAppResponse(StatusCode statusCode) { + public RssUnregisterShuffleByAppIdResponse(StatusCode statusCode) { super(statusCode); } } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 8bf343a652..aab38efd4b 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -26,7 +26,7 @@ package rss.common; service ShuffleServer { rpc registerShuffle (ShuffleRegisterRequest) returns (ShuffleRegisterResponse); rpc unregisterShuffle(ShuffleUnregisterRequest) returns (ShuffleUnregisterResponse); - rpc unregisterApp(AppUnregisterRequest) returns (AppUnregisterResponse); + rpc unregisterShuffleByAppId(ShuffleUnregisterByAppIdRequest) returns (ShuffleUnregisterByAppIdResponse); rpc sendShuffleData (SendShuffleDataRequest) returns (SendShuffleDataResponse); rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns (GetLocalShuffleIndexResponse); rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns (GetLocalShuffleDataResponse); @@ -198,11 +198,11 @@ message ShuffleRegisterResponse { string retMsg = 2; } -message AppUnregisterRequest { +message ShuffleUnregisterByAppIdRequest { string appId = 1; } -message AppUnregisterResponse { +message ShuffleUnregisterByAppIdResponse { StatusCode status = 1; string retMsg = 2; } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 577e521bf0..130e10b211 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -103,9 +103,9 @@ public ShuffleServerGrpcService(ShuffleServer shuffleServer) { } @Override - public void unregisterApp( - RssProtos.AppUnregisterRequest request, - StreamObserver responseStreamObserver) { + public void unregisterShuffleByAppId( + RssProtos.ShuffleUnregisterByAppIdRequest request, + StreamObserver responseStreamObserver) { String appId = request.getAppId(); StatusCode result = StatusCode.SUCCESS; @@ -124,8 +124,8 @@ public void unregisterApp( result = StatusCode.INTERNAL_ERROR; } - RssProtos.AppUnregisterResponse reply = - RssProtos.AppUnregisterResponse.newBuilder() + RssProtos.ShuffleUnregisterByAppIdResponse reply = + RssProtos.ShuffleUnregisterByAppIdResponse.newBuilder() .setStatus(result.toProto()) .setRetMsg(responseMessage) .build(); From a89fd66db25eca7721b9b99c9e70c288e75db889 Mon Sep 17 00:00:00 2001 From: Yiqi You <2810592075@qq.com> Date: Tue, 6 Feb 2024 18:31:06 +0800 Subject: [PATCH 4/6] feat(spark): Provide dedicated unregister app rpc interface Signed-off-by: Yiqi You <2810592075@qq.com> --- .../client/api/ShuffleServerClient.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java index 60172eaf87..8be3d67cb7 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java @@ -17,10 +17,31 @@ package org.apache.uniffle.client.api; -import org.apache.uniffle.client.request.*; +import org.apache.uniffle.client.request.RssAppHeartBeatRequest; +import org.apache.uniffle.client.request.RssFinishShuffleRequest; +import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest; +import org.apache.uniffle.client.request.RssGetShuffleDataRequest; +import org.apache.uniffle.client.request.RssGetShuffleIndexRequest; +import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest; +import org.apache.uniffle.client.request.RssGetShuffleResultRequest; +import org.apache.uniffle.client.request.RssRegisterShuffleRequest; +import org.apache.uniffle.client.request.RssReportShuffleResultRequest; +import org.apache.uniffle.client.request.RssSendCommitRequest; +import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest; -import org.apache.uniffle.client.response.*; +import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; +import org.apache.uniffle.client.response.RssAppHeartBeatResponse; +import org.apache.uniffle.client.response.RssFinishShuffleResponse; +import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse; +import org.apache.uniffle.client.response.RssGetShuffleDataResponse; +import org.apache.uniffle.client.response.RssGetShuffleIndexResponse; +import org.apache.uniffle.client.response.RssGetShuffleResultResponse; +import org.apache.uniffle.client.response.RssRegisterShuffleResponse; +import org.apache.uniffle.client.response.RssReportShuffleResultResponse; +import org.apache.uniffle.client.response.RssSendCommitResponse; +import org.apache.uniffle.client.response.RssSendShuffleDataResponse; import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse; +import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; public interface ShuffleServerClient { From d5281e5c663916085d8a74b9044a1535eb123cb4 Mon Sep 17 00:00:00 2001 From: Yiqi You <2810592075@qq.com> Date: Wed, 7 Feb 2024 16:28:13 +0800 Subject: [PATCH 5/6] feat(spark): Provide dedicated unregister app rpc interface Signed-off-by: Yiqi You <2810592075@qq.com> --- .../server/ShuffleServerGrpcService.java | 12 +--------- .../uniffle/server/ShuffleTaskManager.java | 17 +++++++++---- .../server/event/AppUnregisterPurgeEvent.java | 24 +++++++++++++++++++ 3 files changed, 37 insertions(+), 16 deletions(-) create mode 100644 server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 130e10b211..ac9b95c344 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -18,7 +18,6 @@ package org.apache.uniffle.server; import java.nio.ByteBuffer; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -26,7 +25,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.RangeMap; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; @@ -88,7 +86,6 @@ import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse; import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase; import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo; -import org.apache.uniffle.server.buffer.ShuffleBuffer; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.common.StorageReadMetrics; import org.apache.uniffle.storage.util.ShuffleStorageUtils; @@ -111,14 +108,7 @@ public void unregisterShuffleByAppId( StatusCode result = StatusCode.SUCCESS; String responseMessage = "OK"; try { - Map> shuffleIdToBuffers = - shuffleServer.getShuffleBufferManager().getBufferPool().get(appId); - if (shuffleIdToBuffers != null) { - HashSet shuffleIds = Sets.newHashSet(shuffleIdToBuffers.keySet()); - shuffleIds.forEach( - shuffleId -> - shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId, shuffleId)); - } + shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId); } catch (Exception e) { result = StatusCode.INTERNAL_ERROR; diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 04d62dabee..d097924481 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -37,11 +37,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; -import com.google.common.collect.Range; -import com.google.common.collect.Sets; +import com.google.common.collect.*; import org.apache.commons.collections.CollectionUtils; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; @@ -70,6 +66,7 @@ import org.apache.uniffle.server.buffer.ShuffleBuffer; import org.apache.uniffle.server.buffer.ShuffleBufferManager; import org.apache.uniffle.server.event.AppPurgeEvent; +import org.apache.uniffle.server.event.AppUnregisterPurgeEvent; import org.apache.uniffle.server.event.PurgeEvent; import org.apache.uniffle.server.event.ShufflePurgeEvent; import org.apache.uniffle.server.storage.StorageManager; @@ -183,6 +180,12 @@ public ShuffleTaskManager( (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND; ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime); } + if (event instanceof AppUnregisterPurgeEvent) { + removeResources(event.getAppId(), false); + double usedTime = + (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND; + ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime); + } if (event instanceof ShufflePurgeEvent) { removeResourcesByShuffleIds(event.getAppId(), event.getShuffleIds()); double usedTime = @@ -842,6 +845,10 @@ public void removeShuffleDataAsync(String appId, int shuffleId) { new ShufflePurgeEvent(appId, getUserByAppId(appId), Arrays.asList(shuffleId))); } + public void removeShuffleDataAsync(String appId) { + expiredAppIdQueue.add(new AppUnregisterPurgeEvent(appId, getUserByAppId(appId))); + } + @VisibleForTesting void removeShuffleDataSync(String appId, int shuffleId) { removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId)); diff --git a/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java b/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java new file mode 100644 index 0000000000..04d6318ceb --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.event; + +public class AppUnregisterPurgeEvent extends PurgeEvent { + public AppUnregisterPurgeEvent(String appId, String user) { + super(appId, user, null); + } +} From 84e043489e5325a87c6b27861796392c8fbe0fc7 Mon Sep 17 00:00:00 2001 From: Yiqi You <2810592075@qq.com> Date: Wed, 7 Feb 2024 16:30:48 +0800 Subject: [PATCH 6/6] feat(spark): Provide dedicated unregister app rpc interface Signed-off-by: Yiqi You <2810592075@qq.com> --- .../java/org/apache/uniffle/server/ShuffleTaskManager.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index d097924481..7977b80164 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -37,7 +37,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.collect.*; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger;