diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index faa1f247cc..337869d228 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -60,6 +60,7 @@ import org.apache.uniffle.client.request.RssReportShuffleResultRequest; import org.apache.uniffle.client.request.RssSendCommitRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; +import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest; import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; import org.apache.uniffle.client.response.ClientResponse; import org.apache.uniffle.client.response.RssAppHeartBeatResponse; @@ -73,6 +74,7 @@ import org.apache.uniffle.client.response.RssReportShuffleResultResponse; import org.apache.uniffle.client.response.RssSendCommitResponse; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; +import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse; import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; import org.apache.uniffle.client.response.SendShuffleDataResult; import org.apache.uniffle.client.util.ClientUtils; @@ -984,6 +986,8 @@ public void unregisterShuffle(String appId, int shuffleId) { @Override public void unregisterShuffle(String appId) { + RssUnregisterShuffleByAppIdRequest request = new RssUnregisterShuffleByAppIdRequest(appId); + if (appId == null) { return; } @@ -991,7 +995,41 @@ public void unregisterShuffle(String appId) { if (appServerMap == null) { return; } - appServerMap.keySet().forEach(shuffleId -> unregisterShuffle(appId, shuffleId)); + Set shuffleServerInfos = getAllShuffleServers(appId); + + ExecutorService executorService = null; + try { + executorService = + ThreadUtils.getDaemonFixedThreadPool( + Math.min(unregisterThreadPoolSize, shuffleServerInfos.size()), "unregister-shuffle"); + + ThreadUtils.executeTasks( + executorService, + shuffleServerInfos, + shuffleServerInfo -> { + try { + ShuffleServerClient client = + ShuffleServerClientFactory.getInstance() + .getShuffleServerClient(clientType, shuffleServerInfo, rssConf); + RssUnregisterShuffleByAppIdResponse response = + client.unregisterShuffleByAppId(request); + if (response.getStatusCode() != StatusCode.SUCCESS) { + LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo); + } + } catch (Exception e) { + LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e); + } + return null; + }, + unregisterRequestTimeSec, + "unregister shuffle server"); + + } finally { + if (executorService != null) { + executorService.shutdownNow(); + } + shuffleServerInfoMap.remove(appId); + } } private void throwExceptionIfNecessary(ClientResponse response, String errorMsg) { diff --git a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java index eb4cd398f9..8be3d67cb7 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java @@ -28,6 +28,7 @@ import org.apache.uniffle.client.request.RssReportShuffleResultRequest; import org.apache.uniffle.client.request.RssSendCommitRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; +import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest; import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; import org.apache.uniffle.client.response.RssAppHeartBeatResponse; import org.apache.uniffle.client.response.RssFinishShuffleResponse; @@ -39,6 +40,7 @@ import org.apache.uniffle.client.response.RssReportShuffleResultResponse; import org.apache.uniffle.client.response.RssSendCommitResponse; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; +import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse; import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; public interface ShuffleServerClient { @@ -47,6 +49,9 @@ public interface ShuffleServerClient { RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest request); + RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId( + RssUnregisterShuffleByAppIdRequest request); + RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request); RssSendCommitResponse sendCommit(RssSendCommitRequest request); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 6bae6f62c3..7297aec9d7 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -45,6 +45,7 @@ import org.apache.uniffle.client.request.RssReportShuffleResultRequest; import org.apache.uniffle.client.request.RssSendCommitRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; +import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest; import org.apache.uniffle.client.request.RssUnregisterShuffleRequest; import org.apache.uniffle.client.response.RssAppHeartBeatResponse; import org.apache.uniffle.client.response.RssFinishShuffleResponse; @@ -56,6 +57,7 @@ import org.apache.uniffle.client.response.RssReportShuffleResultResponse; import org.apache.uniffle.client.response.RssSendCommitResponse; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; +import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse; import org.apache.uniffle.client.response.RssUnregisterShuffleResponse; import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.PartitionRange; @@ -328,6 +330,37 @@ public long requirePreAllocation( return result; } + private RssProtos.ShuffleUnregisterByAppIdResponse doUnregisterShuffleByAppId(String appId) { + RssProtos.ShuffleUnregisterByAppIdRequest request = + RssProtos.ShuffleUnregisterByAppIdRequest.newBuilder().setAppId(appId).build(); + return blockingStub.unregisterShuffleByAppId(request); + } + + @Override + public RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId( + RssUnregisterShuffleByAppIdRequest request) { + RssProtos.ShuffleUnregisterByAppIdResponse rpcResponse = + doUnregisterShuffleByAppId(request.getAppId()); + + RssUnregisterShuffleByAppIdResponse response; + RssProtos.StatusCode statusCode = rpcResponse.getStatus(); + + switch (statusCode) { + case SUCCESS: + response = new RssUnregisterShuffleByAppIdResponse(StatusCode.SUCCESS); + break; + default: + String msg = + String.format( + "Errors on unregister app to %s:%s for appId[%s], error: %s", + host, port, request.getAppId(), rpcResponse.getRetMsg()); + LOG.error(msg); + throw new RssException(msg); + } + + return response; + } + private RssProtos.ShuffleUnregisterResponse doUnregisterShuffle(String appId, int shuffleId) { RssProtos.ShuffleUnregisterRequest request = RssProtos.ShuffleUnregisterRequest.newBuilder() diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java new file mode 100644 index 0000000000..0992355a5e --- /dev/null +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client.request; + +public class RssUnregisterShuffleByAppIdRequest { + private String appId; + + public RssUnregisterShuffleByAppIdRequest(String appId) { + this.appId = appId; + } + + public String getAppId() { + return appId; + } +} diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java new file mode 100644 index 0000000000..5c01e84e94 --- /dev/null +++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client.response; + +import org.apache.uniffle.common.rpc.StatusCode; + +public class RssUnregisterShuffleByAppIdResponse extends ClientResponse { + + public RssUnregisterShuffleByAppIdResponse(StatusCode statusCode) { + super(statusCode); + } +} diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 9f601f2428..aab38efd4b 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -26,6 +26,7 @@ package rss.common; service ShuffleServer { rpc registerShuffle (ShuffleRegisterRequest) returns (ShuffleRegisterResponse); rpc unregisterShuffle(ShuffleUnregisterRequest) returns (ShuffleUnregisterResponse); + rpc unregisterShuffleByAppId(ShuffleUnregisterByAppIdRequest) returns (ShuffleUnregisterByAppIdResponse); rpc sendShuffleData (SendShuffleDataRequest) returns (SendShuffleDataResponse); rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns (GetLocalShuffleIndexResponse); rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns (GetLocalShuffleDataResponse); @@ -197,6 +198,15 @@ message ShuffleRegisterResponse { string retMsg = 2; } +message ShuffleUnregisterByAppIdRequest { + string appId = 1; +} + +message ShuffleUnregisterByAppIdResponse { + StatusCode status = 1; + string retMsg = 2; +} + message SendShuffleDataRequest { string appId = 1; int32 shuffleId = 2; diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 65c2b998bc..ac9b95c344 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -99,6 +99,30 @@ public ShuffleServerGrpcService(ShuffleServer shuffleServer) { this.shuffleServer = shuffleServer; } + @Override + public void unregisterShuffleByAppId( + RssProtos.ShuffleUnregisterByAppIdRequest request, + StreamObserver responseStreamObserver) { + String appId = request.getAppId(); + + StatusCode result = StatusCode.SUCCESS; + String responseMessage = "OK"; + try { + shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId); + + } catch (Exception e) { + result = StatusCode.INTERNAL_ERROR; + } + + RssProtos.ShuffleUnregisterByAppIdResponse reply = + RssProtos.ShuffleUnregisterByAppIdResponse.newBuilder() + .setStatus(result.toProto()) + .setRetMsg(responseMessage) + .build(); + responseStreamObserver.onNext(reply); + responseStreamObserver.onCompleted(); + } + @Override public void unregisterShuffle( RssProtos.ShuffleUnregisterRequest request, diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 04d62dabee..7977b80164 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -70,6 +70,7 @@ import org.apache.uniffle.server.buffer.ShuffleBuffer; import org.apache.uniffle.server.buffer.ShuffleBufferManager; import org.apache.uniffle.server.event.AppPurgeEvent; +import org.apache.uniffle.server.event.AppUnregisterPurgeEvent; import org.apache.uniffle.server.event.PurgeEvent; import org.apache.uniffle.server.event.ShufflePurgeEvent; import org.apache.uniffle.server.storage.StorageManager; @@ -183,6 +184,12 @@ public ShuffleTaskManager( (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND; ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime); } + if (event instanceof AppUnregisterPurgeEvent) { + removeResources(event.getAppId(), false); + double usedTime = + (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND; + ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime); + } if (event instanceof ShufflePurgeEvent) { removeResourcesByShuffleIds(event.getAppId(), event.getShuffleIds()); double usedTime = @@ -842,6 +849,10 @@ public void removeShuffleDataAsync(String appId, int shuffleId) { new ShufflePurgeEvent(appId, getUserByAppId(appId), Arrays.asList(shuffleId))); } + public void removeShuffleDataAsync(String appId) { + expiredAppIdQueue.add(new AppUnregisterPurgeEvent(appId, getUserByAppId(appId))); + } + @VisibleForTesting void removeShuffleDataSync(String appId, int shuffleId) { removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId)); diff --git a/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java b/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java new file mode 100644 index 0000000000..04d6318ceb --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.event; + +public class AppUnregisterPurgeEvent extends PurgeEvent { + public AppUnregisterPurgeEvent(String appId, String user) { + super(appId, user, null); + } +}