Skip to content

Commit

Permalink
[native] Implement Graceful Shutdown in Native worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe-Abraham authored and majetideepak committed Nov 15, 2024
1 parent e1bddb3 commit 6e87912
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 1 deletion.
31 changes: 30 additions & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/http/HttpConstants.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
#include "presto_cpp/main/http/filters/InternalAuthenticationFilter.h"
Expand Down Expand Up @@ -369,6 +370,14 @@ void PrestoServer::run() {
json infoStateJson = convertNodeState(server->nodeState());
http::sendOkResponse(downstream, infoStateJson);
});
httpServer_->registerPut(
"/v1/info/state",
[server = this](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
server->handleGracefulShutdown(body, downstream);
});
httpServer_->registerGet(
"/v1/status",
[server = this](
Expand Down Expand Up @@ -907,7 +916,6 @@ void PrestoServer::stop() {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Waiting for " << shutdownOnsetSec
<< " second(s) before proceeding with the shutdown...";

// Give coordinator some time to receive our new node state and stop sending
// any tasks.
std::this_thread::sleep_for(std::chrono::seconds(shutdownOnsetSec));
Expand Down Expand Up @@ -1427,6 +1435,27 @@ void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(fetchNodeStatus()));
}

void PrestoServer::handleGracefulShutdown(
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
std::string bodyContent =
folly::trimWhitespace(body[0]->moveToFbString()).toString();
if (body.size() == 1 && bodyContent == http::kShuttingDown) {
LOG(INFO) << "Shutdown requested";
if (nodeState() == NodeState::kActive) {
// Run stop() in a separate thread to avoid blocking the main HTTP handler
// and ensure a timely 200 OK response to the client.
std::thread([this]() { this->stop(); }).detach();
} else {
LOG(INFO) << "Node is inactive or shutdown is already requested";
}
http::sendOkResponse(downstream);
} else {
LOG(ERROR) << "Bad Request. Received body content: " << bodyContent;
http::sendErrorResponse(downstream, "Bad Request", http::kHttpBadRequest);
}
}

void PrestoServer::registerSidecarEndpoints() {
VELOX_CHECK(httpServer_);
httpServer_->registerGet(
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ class PrestoServer {

void reportNodeStatus(proxygen::ResponseHandler* downstream);

void handleGracefulShutdown(
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream);

protocol::NodeStatus fetchNodeStatus();

void populateMemAndCPUInfo();
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/http/HttpConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ namespace facebook::presto::http {
const uint16_t kHttpOk = 200;
const uint16_t kHttpAccepted = 202;
const uint16_t kHttpNoContent = 204;
const uint16_t kHttpBadRequest = 400;
const uint16_t kHttpUnauthorized = 401;
const uint16_t kHttpNotFound = 404;
const uint16_t kHttpInternalServerError = 500;

const char kMimeTypeApplicationJson[] = "application/json";
const char kMimeTypeApplicationThrift[] = "application/x-thrift+binary";
const char kShuttingDown[] = "\"SHUTTING_DOWN\"";
static const char kPrestoInternalBearer[] = "X-Presto-Internal-Bearer";
} // namespace facebook::presto::http
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.nativeworker;

import com.facebook.presto.spi.NodeState;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.DistributedQueryRunner;
import org.testng.annotations.Test;

import static com.facebook.presto.spi.NodeState.SHUTTING_DOWN;
import static org.testng.Assert.assertEquals;

public class TestPrestoNativeGracefulShutdown
{
@Test
public void testGracefulShutdown() throws Exception
{
QueryRunner queryRunner = PrestoNativeQueryRunnerUtils.createNativeQueryRunner(true);
DistributedQueryRunner distributedQueryRunner = (DistributedQueryRunner) queryRunner;

int responseCode = distributedQueryRunner.sendWorkerRequest(0, "INVALID_BODY");
assertEquals(responseCode, 400, "Expected a 400 Bad Request response for invalid body");

responseCode = distributedQueryRunner.sendWorkerRequest(0, "");
assertEquals(responseCode, 400, "Expected a 400 Bad Request response for empty body");

responseCode = distributedQueryRunner.sendWorkerRequest(0, "\"SHUTTING_DOWN\"");
assertEquals(responseCode, 200, "Expected a 200 OK response for valid shutdown request");
NodeState state = distributedQueryRunner.getWorkerInfoState(0);
assertEquals(state.getValue(), SHUTTING_DOWN.getValue());

queryRunner.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@
import org.jdbi.v3.core.Jdbi;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -413,6 +417,39 @@ private NodeState getCoordinatorInfoState(int coordinator)
return state;
}

public NodeState getWorkerInfoState(int worker)
{
URI uri = URI.create(getWorker(worker).getBaseUrl().toString() + "/v1/info/state");
Request request = prepareGet()
.setHeader(PRESTO_USER, DEFAULT_USER)
.setUri(uri)
.build();

NodeState state = client.execute(request, createJsonResponseHandler(jsonCodec(NodeState.class)));
return state;
}

public int sendWorkerRequest(int worker, String body)
{
try {
URL url = new URL(getWorker(worker).getBaseUrl().toString() + "/v1/info/state");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("PUT");
connection.setDoOutput(true);
connection.setRequestProperty("Content-Type", "application/json");

try (OutputStream os = connection.getOutputStream()) {
byte[] input = body.getBytes(StandardCharsets.UTF_8);
os.write(input, 0, input.length);
}
return connection.getResponseCode();
}
catch (Exception e) {
e.printStackTrace();
return 500;
}
}

private static TestingPrestoServer createTestingPrestoServer(
URI discoveryUri,
boolean resourceManager,
Expand Down Expand Up @@ -601,6 +638,12 @@ public TestingPrestoServer getCoordinator(int coordinator)
return coordinators.get(coordinator);
}

private TestingPrestoServer getWorker(int worker)
{
checkState(worker < servers.size(), format("Expected worker index %d < %d", worker, servers.size()));
return servers.get(worker);
}

public List<TestingPrestoServer> getCoordinators()
{
return coordinators;
Expand Down

0 comments on commit 6e87912

Please sign in to comment.