Skip to content

Commit

Permalink
Add NativeExecutionProcess and facilities
Browse files Browse the repository at this point in the history
  • Loading branch information
miaoever authored and tanjialiang committed Nov 28, 2022
1 parent 558677e commit eeb5a7d
Show file tree
Hide file tree
Showing 13 changed files with 768 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class RequestErrorTracker

private final Queue<Throwable> errorsSinceLastSuccess = new ConcurrentLinkedQueue<>();

private RequestErrorTracker(Object id, URI uri, ErrorCodeSupplier errorCode, String nodeErrorMessage, Duration maxErrorDuration, ScheduledExecutorService scheduledExecutor, String jobDescription)
public RequestErrorTracker(Object id, URI uri, ErrorCodeSupplier errorCode, String nodeErrorMessage, Duration maxErrorDuration, ScheduledExecutorService scheduledExecutor, String jobDescription)
{
this.id = requireNonNull(id, "id is null");
this.uri = requireNonNull(uri, "uri is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.block.BlockJsonSerde;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.client.ServerInfo;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockEncoding;
import com.facebook.presto.common.block.BlockEncodingManager;
Expand Down Expand Up @@ -107,6 +108,7 @@
import com.facebook.presto.server.security.ServerSecurityModule;
import com.facebook.presto.spark.classloader_interface.SparkProcessType;
import com.facebook.presto.spark.execution.ForNativeExecutionTask;
import com.facebook.presto.spark.execution.NativeExecutionProcessFactory;
import com.facebook.presto.spark.execution.NativeExecutionTaskFactory;
import com.facebook.presto.spark.execution.PrestoSparkBroadcastTableCacheManager;
import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory;
Expand Down Expand Up @@ -267,6 +269,7 @@ protected void setup(Binder binder)
jsonCodecBinder(binder).bindJsonCodec(PrestoSparkQueryData.class);
jsonCodecBinder(binder).bindListJsonCodec(TaskMemoryReservationSummary.class);
jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class);
jsonCodecBinder(binder).bindJsonCodec(ServerInfo.class);

// smile codecs
smileCodecBinder(binder).bindSmileCodec(TaskSource.class);
Expand Down Expand Up @@ -478,6 +481,7 @@ protected void setup(Binder binder)

binder.bind(RemoteTaskStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(RemoteTaskStats.class).withGeneratedName();
binder.bind(NativeExecutionProcessFactory.class).in(Scopes.SINGLETON);
binder.bind(NativeExecutionTaskFactory.class).in(Scopes.SINGLETON);
httpClientBinder(binder).bindHttpClient("nativeExecution", ForNativeExecutionTask.class)
.withConfigDefaults(config -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.facebook.airlift.log.Logger;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpWorkerClient;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -42,7 +42,7 @@ public class HttpNativeExecutionTaskInfoFetcher
{
private static final Logger log = Logger.get(HttpNativeExecutionTaskInfoFetcher.class);

private final PrestoSparkHttpWorkerClient workerClient;
private final PrestoSparkHttpTaskClient workerClient;
private final ScheduledExecutorService updateScheduledExecutor;
private final AtomicReference<TaskInfo> taskInfo = new AtomicReference<>();
private final Executor executor;
Expand All @@ -53,7 +53,7 @@ public class HttpNativeExecutionTaskInfoFetcher

public HttpNativeExecutionTaskInfoFetcher(
ScheduledExecutorService updateScheduledExecutor,
PrestoSparkHttpWorkerClient workerClient,
PrestoSparkHttpTaskClient workerClient,
Executor executor,
Duration infoFetchInterval)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import com.facebook.presto.operator.PageBufferClient;
import com.facebook.presto.operator.PageTransportErrorException;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpWorkerClient;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.SerializedPage;
Expand Down Expand Up @@ -56,7 +56,7 @@ public class HttpNativeExecutionTaskResultFetcher
private static final DataSize MAX_BUFFER_SIZE = new DataSize(128, DataSize.Unit.MEGABYTE);

private final ScheduledExecutorService scheduler;
private final PrestoSparkHttpWorkerClient workerClient;
private final PrestoSparkHttpTaskClient workerClient;
private final LinkedBlockingDeque<SerializedPage> pageBuffer = new LinkedBlockingDeque<>();
private final AtomicLong bufferMemoryBytes;

Expand All @@ -65,7 +65,7 @@ public class HttpNativeExecutionTaskResultFetcher

public HttpNativeExecutionTaskResultFetcher(
ScheduledExecutorService scheduler,
PrestoSparkHttpWorkerClient workerClient)
PrestoSparkHttpTaskClient workerClient)
{
this.scheduler = requireNonNull(scheduler, "scheduler is null");
this.workerClient = requireNonNull(workerClient, "workerClient is null");
Expand Down Expand Up @@ -131,7 +131,7 @@ private static class HttpNativeExecutionTaskResultFetcherRunner
private static final DataSize MAX_RESPONSE_SIZE = new DataSize(32, DataSize.Unit.MEGABYTE);
private static final int MAX_TRANSPORT_ERROR_RETRIES = 5;

private final PrestoSparkHttpWorkerClient client;
private final PrestoSparkHttpTaskClient client;
private final LinkedBlockingDeque<SerializedPage> pageBuffer;
private final AtomicLong bufferMemoryBytes;
private final CompletableFuture<Void> future;
Expand All @@ -140,7 +140,7 @@ private static class HttpNativeExecutionTaskResultFetcherRunner
private long token;

public HttpNativeExecutionTaskResultFetcherRunner(
PrestoSparkHttpWorkerClient client,
PrestoSparkHttpTaskClient client,
CompletableFuture<Void> future,
LinkedBlockingDeque<SerializedPage> pageBuffer,
AtomicLong bufferMemoryBytes)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spark.execution;

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.client.ServerInfo;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.server.RequestErrorTracker;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpServerClient;
import com.facebook.presto.spark.execution.property.NativeExecutionConnectorConfig;
import com.facebook.presto.spark.execution.property.NativeExecutionNodeConfig;
import com.facebook.presto.spark.execution.property.NativeExecutionSystemConfig;
import com.facebook.presto.spark.execution.property.WorkerProperty;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.sql.planner.PlanFragment;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import org.apache.spark.SparkFiles;

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;

import static com.facebook.airlift.http.client.HttpStatus.OK;
import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NATIVE_EXECUTION_BINARY_NOT_EXIST;
import static com.facebook.presto.spi.StandardErrorCode.NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NATIVE_EXECUTION_TASK_ERROR;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class NativeExecutionProcess
implements AutoCloseable
{
private static final Logger log = Logger.get(NativeExecutionProcess.class);
private static final String NATIVE_EXECUTION_TASK_ERROR_MESSAGE = "Encountered too many errors talking to native process. The process may have crashed or be under too much load.";
private static final String WORKER_CONFIG_FILE = "/config.properties";
private static final String WORKER_NODE_CONFIG_FILE = "/node.properties";
private static final String WORKER_CONNECTOR_CONFIG_FILE = "/catalog/";

private final Session session;
private final PrestoSparkHttpServerClient serverClient;
private final URI location;
private final int port;
private final TaskManagerConfig taskManagerConfig;
private final NativeExecutionSystemConfig systemConfig;
private final NativeExecutionNodeConfig nodeConfig;
private final NativeExecutionConnectorConfig connectorConfig;
private final ScheduledExecutorService errorRetryScheduledExecutor;
private final RequestErrorTracker errorTracker;
private final HttpClient httpClient;
private final NativeExecutionTask nativeExecutionTask;

private Process process;

public NativeExecutionProcess(
Session session,
URI uri,
TaskId taskId,
PlanFragment planFragment,
List<TaskSource> sources,
TableWriteInfo tableWriteInfo,
HttpClient httpClient,
ScheduledExecutorService errorRetryScheduledExecutor,
JsonCodec<ServerInfo> serverInfoCodec,
NativeExecutionTaskFactory taskFactory,
Duration maxErrorDuration,
TaskManagerConfig taskManagerConfig,
NativeExecutionSystemConfig systemConfig,
NativeExecutionNodeConfig nodeConfig,
NativeExecutionConnectorConfig connectorConfig)
throws IOException
{
this.port = getAvailableTcpPort();
this.session = requireNonNull(session, "session is null");
this.location = getBaseUriWithPort(requireNonNull(uri, "uri is null"), port);
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.serverClient = new PrestoSparkHttpServerClient(
this.httpClient,
location,
serverInfoCodec);
this.taskManagerConfig = requireNonNull(taskManagerConfig, "taskManagerConfig is null");
this.systemConfig = requireNonNull(systemConfig, "systemConfig is null");
this.nodeConfig = requireNonNull(nodeConfig, "nodeConfig is null");
this.connectorConfig = requireNonNull(connectorConfig, "connectorConfig is null");
this.nativeExecutionTask = taskFactory.createNativeExecutionTask(session, location, taskId, planFragment, sources, tableWriteInfo);
this.errorRetryScheduledExecutor = requireNonNull(errorRetryScheduledExecutor, "errorRetryScheduledExecutor is null");
this.errorTracker = new RequestErrorTracker(
"NativeExecution",
uri,
NATIVE_EXECUTION_TASK_ERROR,
NATIVE_EXECUTION_TASK_ERROR_MESSAGE,
maxErrorDuration,
errorRetryScheduledExecutor,
"getting native process status");
}

/**
* Starts the external native execution process. The method will be blocked by connecting to the native process's /v1/info endpoint with backoff retries until timeout.
*/
public void start()
throws ExecutionException, InterruptedException, IOException
{
String executablePath = getProcessWorkingPath(SystemSessionProperties.getNativeExecutionExecutablePath(session));
String configPath = Paths.get(getProcessWorkingPath("./"), String.valueOf(port)).toAbsolutePath().toString();

populateConfigurationFiles(configPath);
ProcessBuilder processBuilder = new ProcessBuilder(executablePath, "--v", "1", "--etc_dir", configPath);
processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
try {
log.info("Launching %s \nConfig path: %s\n", executablePath, configPath);
process = processBuilder.start();
}
catch (IOException e) {
log.error(format("Cannot start %s, error message: %s", processBuilder.command(), e.getMessage()));
throw new PrestoException(NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR, format("Cannot start %s", processBuilder.command()), e);
}

// getServerInfoWithRetry will return a Future on the getting the ServerInfo from the native process, we intentionally block on the Future till
// the native process successfully response the ServerInfo to ensure the process has been launched and initialized correctly.
getServerInfoWithRetry().get();
}

public NativeExecutionTask getTask()
{
return nativeExecutionTask;
}

@VisibleForTesting
public SettableFuture<ServerInfo> getServerInfoWithRetry()
{
SettableFuture<ServerInfo> future = SettableFuture.create();
doGetServerInfo(future);
return future;
}

@Override
public void close()
{
if (process != null && process.isAlive()) {
process.destroy();
}
}

private static URI getBaseUriWithPort(URI baseUri, int port)
{
return uriBuilderFrom(baseUri)
.port(port)
.build();
}

private static int getAvailableTcpPort()
throws IOException
{
ServerSocket socket = new ServerSocket(0);
int port = socket.getLocalPort();
socket.close();
return port;
}

private String getNativeExecutionCatalogName(Session session)
{
checkArgument(session.getCatalog().isPresent(), "Catalog isn't set in the session.");
return session.getCatalog().get();
}

private void populateConfigurationFiles(String configBasePath)
throws IOException
{
// The reason we have to pick and assign the port per worker is in our prod environment,
// there is no port isolation among all the containers running on the same host, so we have
// to pick unique port per worker to avoid port collision. This config will be passed down to
// the native execution process eventually for process initialization.
systemConfig.setHttpServerPort(port);
WorkerProperty.populateProperty(systemConfig.getAllProperties(), Paths.get(configBasePath, WORKER_CONFIG_FILE));
WorkerProperty.populateProperty(nodeConfig.getAllProperties(), Paths.get(configBasePath, WORKER_NODE_CONFIG_FILE));
WorkerProperty.populateProperty(
connectorConfig.getAllProperties(),
Paths.get(configBasePath, format("%s%s.properties", WORKER_CONNECTOR_CONFIG_FILE, getNativeExecutionCatalogName(session))));
}

private void doGetServerInfo(SettableFuture<ServerInfo> future)
{
addCallback(serverClient.getServerInfo(), new FutureCallback<BaseResponse<ServerInfo>>()
{
@Override
public void onSuccess(@Nullable BaseResponse<ServerInfo> response)
{
if (response.getStatusCode() != OK.code()) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Request failed with HTTP status " + response.getStatusCode());
}
future.set(response.getValue());
}

@Override
public void onFailure(Throwable failedReason)
{
if (failedReason instanceof RejectedExecutionException && httpClient.isClosed()) {
log.error(format("Unable to start the native process. HTTP client is closed. Reason: %s", failedReason.getMessage()));
future.setException(failedReason);
return;
}
// record failure
try {
errorTracker.requestFailed(failedReason);
}
catch (PrestoException e) {
future.setException(e);
return;
}
// if throttled due to error, asynchronously wait for timeout and try again
ListenableFuture<?> errorRateLimit = errorTracker.acquireRequestPermit();
if (errorRateLimit.isDone()) {
doGetServerInfo(future);
}
else {
errorRateLimit.addListener(() -> doGetServerInfo(future), errorRetryScheduledExecutor);
}
}
}, directExecutor());
}

private String getProcessWorkingPath(String path)
{
File absolutePath = new File(path);
File workingDir = new File(SparkFiles.getRootDirectory());
if (!absolutePath.isAbsolute()) {
absolutePath = new File(workingDir, path);
}

if (!absolutePath.exists()) {
log.error(format("File doesn't exist %s", absolutePath.getAbsolutePath()));
throw new PrestoException(NATIVE_EXECUTION_BINARY_NOT_EXIST, format("File doesn't exist %s", absolutePath.getAbsolutePath()));
}

return absolutePath.getAbsolutePath();
}
}
Loading

0 comments on commit eeb5a7d

Please sign in to comment.