Skip to content

Commit

Permalink
[native] Add Arrow Flight Connector
Browse files Browse the repository at this point in the history
The native Arrow Flight connector can be used to connect to any Arrow Flight
enabled Data Source. The metadata layer is handled by the Presto coordinator
and does not need to be re-implemented in C++. Any Java connector that inherits
from `presto-base-arrow-flight` can use this connector as it's counterpart for
the Prestissimo layer.

Different Arrow-Flight enabled data sources can differ in authentication styles.
A plugin-style interface is provided to handle such cases with custom
authentication code by extending `arrow_flight::auth::Authenticator`.

RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0004-arrow-flight-connector.md#prestissimo-implementation

Co-authored-by: Ashwin Kumar <Ashwin.Kumar6@ibm.com>
Co-authored-by: Rijin-N <rijin.n@ibm.com>
Co-authored-by: Nischay Yadav <Nischay.Yadav@ibm.com>
  • Loading branch information
3 people authored and BryanCutler committed Feb 15, 2025
1 parent 478d450 commit 4661ec7
Show file tree
Hide file tree
Showing 74 changed files with 3,458 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
public class ArrowFlightConfig
{
private String server;
private Boolean verifyServer;
private boolean verifyServer = true;
private String flightServerSSLCertificate;
private Boolean arrowFlightServerSslEnabled;
private boolean arrowFlightServerSslEnabled;
private Integer arrowFlightPort;

public String getFlightServerName()
Expand All @@ -35,13 +35,13 @@ public ArrowFlightConfig setFlightServerName(String server)
return this;
}

public Boolean getVerifyServer()
public boolean getVerifyServer()
{
return verifyServer;
}

@Config("arrow-flight.server.verify")
public ArrowFlightConfig setVerifyServer(Boolean verifyServer)
public ArrowFlightConfig setVerifyServer(boolean verifyServer)
{
this.verifyServer = verifyServer;
return this;
Expand Down Expand Up @@ -71,13 +71,13 @@ public ArrowFlightConfig setFlightServerSSLCertificate(String flightServerSSLCer
return this;
}

public Boolean getArrowFlightServerSslEnabled()
public boolean getArrowFlightServerSslEnabled()
{
return arrowFlightServerSslEnabled;
}

@Config("arrow-flight.server-ssl-enabled")
public ArrowFlightConfig setArrowFlightServerSslEnabled(Boolean arrowFlightServerSslEnabled)
public ArrowFlightConfig setArrowFlightServerSslEnabled(boolean arrowFlightServerSslEnabled)
{
this.arrowFlightServerSslEnabled = arrowFlightServerSslEnabled;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ public BaseArrowFlightClientHandler(BufferAllocator allocator, ArrowFlightConfig
protected FlightClient createFlightClient()
{
Location location;
if (config.getArrowFlightServerSslEnabled() != null && !config.getArrowFlightServerSslEnabled()) {
location = Location.forGrpcInsecure(config.getFlightServerName(), config.getArrowFlightPort());
if (config.getArrowFlightServerSslEnabled()) {
location = Location.forGrpcTls(config.getFlightServerName(), config.getArrowFlightPort());
}
else {
location = Location.forGrpcTls(config.getFlightServerName(), config.getArrowFlightPort());
location = Location.forGrpcInsecure(config.getFlightServerName(), config.getArrowFlightPort());
}
return createFlightClient(location);
}
Expand All @@ -67,10 +67,8 @@ protected FlightClient createFlightClient(Location location)
try {
Optional<InputStream> trustedCertificate = Optional.empty();
FlightClient.Builder flightClientBuilder = FlightClient.builder(allocator, location);
if (config.getVerifyServer() != null && !config.getVerifyServer()) {
flightClientBuilder.verifyServer(false);
}
else if (config.getFlightServerSSLCertificate() != null) {
flightClientBuilder.verifyServer(config.getVerifyServer());
if (config.getFlightServerSSLCertificate() != null) {
trustedCertificate = Optional.of(newInputStream(Paths.get(config.getFlightServerSSLCertificate())));
flightClientBuilder.trustedCertificates(trustedCertificate.get()).useTls();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.File;
import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.testing.TestingSession.testSessionBuilder;

Expand All @@ -53,11 +54,15 @@ private static DistributedQueryRunner createQueryRunner(
throws Exception
{
Session session = testSessionBuilder()
.setCatalog("arrow")
.setCatalog("arrowflight")
.setSchema("tpch")
.build();

DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).setExtraProperties(extraProperties).build();
DistributedQueryRunner.Builder queryRunnerBuilder = DistributedQueryRunner.builder(session);
Optional<Integer> workerCount = getProperty("WORKER_COUNT").map(Integer::parseInt);
workerCount.ifPresent(queryRunnerBuilder::setNodeCount);

DistributedQueryRunner queryRunner = queryRunnerBuilder.setExtraProperties(extraProperties).build();

try {
queryRunner.installPlugin(new TestingArrowFlightPlugin());
Expand All @@ -66,10 +71,9 @@ private static DistributedQueryRunner createQueryRunner(
.putAll(catalogProperties)
.put("arrow-flight.server", "localhost")
.put("arrow-flight.server-ssl-enabled", "true")
.put("arrow-flight.server-ssl-certificate", "src/test/resources/server.crt")
.put("arrow-flight.server.verify", "true");
.put("arrow-flight.server-ssl-certificate", "src/test/resources/server.crt");

queryRunner.createCatalog("arrow", "arrow", properties.build());
queryRunner.createCatalog("arrowflight", "arrow-flight", properties.build());

return queryRunner;
}
Expand All @@ -78,6 +82,19 @@ private static DistributedQueryRunner createQueryRunner(
}
}

private static Optional<String> getProperty(String name)
{
String systemPropertyValue = System.getProperty(name);
if (systemPropertyValue != null) {
return Optional.of(systemPropertyValue);
}
String environmentVariableValue = System.getenv(name);
if (environmentVariableValue != null) {
return Optional.of(environmentVariableValue);
}
return Optional.empty();
}

public static void main(String[] args)
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ private static MapType createMapType(Type keyType, Type valueType)
private static FlightClient createFlightClient(BufferAllocator allocator) throws IOException
{
InputStream trustedCertificate = new ByteArrayInputStream(Files.readAllBytes(Paths.get("src/test/resources/server.crt")));
return FlightClient.builder(allocator, getServerLocation()).verifyServer(true).useTls().trustedCertificates(trustedCertificate).build();
return FlightClient.builder(allocator, getServerLocation()).useTls().trustedCertificates(trustedCertificate).build();
}

private void addTableToServer(FlightClient client, VectorSchemaRoot root, String tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ public class TestingArrowFlightPlugin
{
public TestingArrowFlightPlugin()
{
super("arrow", new TestingArrowModule(), new JsonModule());
super("arrow-flight", new TestingArrowModule(), new JsonModule());
}
}
2 changes: 2 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR "Enable Arrow Flight connector" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ endif
ifneq ($(PRESTO_MEMORY_CHECKER_TYPE),)
EXTRA_CMAKE_FLAGS += -DPRESTO_MEMORY_CHECKER_TYPE=$(PRESTO_MEMORY_CHECKER_TYPE)
endif
ifneq ($(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR),)
EXTRA_CMAKE_FLAGS += -DPRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR=$(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
endif

CMAKE_FLAGS := -DTREAT_WARNINGS_AS_ERRORS=${TREAT_WARNINGS_AS_ERRORS}
CMAKE_FLAGS += -DENABLE_ALL_WARNINGS=${ENABLE_WALL}
Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ follow these steps:
* For development, use `make debug` to build a non-optimized debug version.
* Use `make unittest` to build and run tests.

#### Arrow Flight Connector
To enable Arrow Flight connector support, set the environment variable:
`PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR = "ON"`.

The Arrow Flight connector requires the Arrow Flight library. You can install this dependency
by running the following script from the `presto/presto-native-execution` directory:

`./scripts/setup-adapters.sh arrow_flight`

### Makefile Targets
A reminder of the available Makefile targets can be obtained using `make help`
```
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_subdirectory(types)
add_subdirectory(http)
add_subdirectory(common)
add_subdirectory(thrift)
add_subdirectory(connectors)

add_library(
presto_server_lib
Expand All @@ -29,7 +30,6 @@ add_library(
QueryContextManager.cpp
ServerOperation.cpp
SignalHandler.cpp
SystemConnector.cpp
SessionProperties.cpp
TaskManager.cpp
TaskResource.cpp
Expand All @@ -48,6 +48,7 @@ target_link_libraries(
presto_common
presto_exception
presto_function_metadata
presto_connector
presto_http
presto_operators
presto_velox_conversion
Expand Down
54 changes: 5 additions & 49 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
#include "presto_cpp/main/PeriodicMemoryChecker.h"
#include "presto_cpp/main/PeriodicTaskManager.h"
#include "presto_cpp/main/SignalHandler.h"
#include "presto_cpp/main/SystemConnector.h"
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/connectors/Registration.h"
#include "presto_cpp/main/connectors/SystemConnector.h"
#include "presto_cpp/main/http/HttpConstants.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
Expand All @@ -48,13 +49,11 @@
#include "velox/common/memory/MmapAllocator.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"
#include "velox/connectors/tpch/TpchConnector.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h"
#include "velox/dwio/dwrf/RegisterDwrfWriter.h"
#include "velox/dwio/parquet/RegisterParquetReader.h"
Expand Down Expand Up @@ -87,7 +86,6 @@ constexpr char const* kHttps = "https";
constexpr char const* kTaskUriFormat =
"{}://{}:{}"; // protocol, address and port
constexpr char const* kConnectorName = "connector.name";
constexpr char const* kHiveHadoop2ConnectorName = "hive-hadoop2";

protocol::NodeState convertNodeState(presto::NodeState nodeState) {
switch (nodeState) {
Expand Down Expand Up @@ -253,33 +251,9 @@ void PrestoServer::run() {
registerMemoryArbitrators();
registerShuffleInterfaceFactories();
registerCustomOperators();
registerConnectorFactories();

// Register Velox connector factory for iceberg.
// The iceberg catalog is handled by the hive connector factory.
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
"iceberg"));

registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>("hive"));
registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>("hive-hadoop2"));
registerPrestoToVeloxConnector(
std::make_unique<IcebergPrestoToVeloxConnector>("iceberg"));
registerPrestoToVeloxConnector(
std::make_unique<TpchPrestoToVeloxConnector>("tpch"));
// Presto server uses system catalog or system schema in other catalogs
// in different places in the code. All these resolve to the SystemConnector.
// Depending on where the operator or column is used, different prefixes can
// be used in the naming. So the protocol class is mapped
// to all the different prefixes for System tables/columns.
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system@system"));

// Register Presto connector factories and connectors
presto::registerConnectors();

velox::exec::OutputBufferManager::initialize({});
initializeVeloxMemory();
Expand Down Expand Up @@ -1164,24 +1138,6 @@ PrestoServer::getAdditionalHttpServerFilters() {
return filters;
}

void PrestoServer::registerConnectorFactories() {
// These checks for connector factories can be removed after we remove the
// registrations from the Velox library.
if (!velox::connector::hasConnectorFactory(
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>());
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
kHiveHadoop2ConnectorName));
}
if (!velox::connector::hasConnectorFactory(
velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::tpch::TpchConnectorFactory>());
}
}

std::vector<std::string> PrestoServer::registerConnectors(
const fs::path& configDirectoryPath) {
static const std::string kPropertiesExtension = ".properties";
Expand Down
2 changes: 0 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ class PrestoServer {

virtual void unregisterFileReadersAndWriters();

virtual void registerConnectorFactories();

/// Invoked by presto shutdown procedure to unregister connectors.
virtual void unregisterConnectors();

Expand Down
21 changes: 21 additions & 0 deletions presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
add_subdirectory(arrow_flight)
endif()

add_library(presto_connector Registration.cpp PrestoToVeloxConnector.cpp
SystemConnector.cpp)

if(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
target_link_libraries(presto_connector presto_flight_connector)
endif()
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
* limitations under the License.
*/

#include "presto_cpp/main/types/PrestoToVeloxConnector.h"
#include "PrestoToVeloxConnector.h"
#include "presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h"
#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h"
#include "presto_cpp/presto_protocol/connector/tpch/TpchConnectorProtocol.h"

#include <velox/type/fbhive/HiveTypeParser.h>
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/HiveDataSink.h"
Expand All @@ -27,6 +26,7 @@
#include "velox/connectors/tpch/TpchConnector.h"
#include "velox/connectors/tpch/TpchConnectorSplit.h"
#include "velox/type/Filter.h"
#include "velox/velox/type/fbhive/HiveTypeParser.h"

namespace facebook::presto {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
#pragma once

#include "PrestoToVeloxExpr.h"
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
#include "presto_cpp/main/types/TypeParser.h"
#include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h"
#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h"
Expand Down
Loading

0 comments on commit 4661ec7

Please sign in to comment.