Skip to content

Commit

Permalink
listener: add SO_REUSEPORT support for TcpListenSocket by option and …
Browse files Browse the repository at this point in the history
…UdpListenSocket forcibly (#8884)

Make TCP listeners supporting SO_REUSEPORT, then each worker thread will create
and listen on socket using same address and port.

Signed-off-by: lhuang8 <lhuang8@ebay.com>
  • Loading branch information
l8huang authored and mattklein123 committed Dec 4, 2019
1 parent ec0f23d commit 8534ac8
Show file tree
Hide file tree
Showing 22 changed files with 391 additions and 196 deletions.
15 changes: 14 additions & 1 deletion api/envoy/api/v2/lds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ service ListenerDiscoveryService {
}
}

// [#next-free-field: 21]
// [#next-free-field: 22]
message Listener {
enum DrainType {
// Drain in response to calling /healthcheck/fail admin endpoint (along with the health check
Expand Down Expand Up @@ -238,4 +238,17 @@ message Listener {
// If no configuration is specified, Envoy will not attempt to balance active connections between
// worker threads.
ConnectionBalanceConfig connection_balance_config = 20;

// When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and
// create one socket for each worker thread. This makes inbound connections
// distribute among worker threads roughly evenly in cases where there are a high number
// of connections. When this flag is set to false, all worker threads share one socket.
// For UDP this flag is set to true forcibly.
//
// Before Linux v4.19-rc1, new TCP connections may be rejected during hot restart
// (see `3rd paragraph in 'soreuseport' commit message
// <https://github.com/torvalds/linux/commit/c617f398edd4db2b8567a28e89>`_).
// This issue was fixed by `tcp: Avoid TCP syncookie rejected by SO_REUSEPORT socket
// <https://github.com/torvalds/linux/commit/40a1227ea845a37ab197dd1caffb60b047fa36b1>`_.
bool reuse_port = 21;
}
15 changes: 14 additions & 1 deletion api/envoy/api/v3alpha/lds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ service ListenerDiscoveryService {
}
}

// [#next-free-field: 21]
// [#next-free-field: 22]
message Listener {
option (udpa.api.annotations.versioning).previous_message_type = "envoy.api.v2.Listener";

Expand Down Expand Up @@ -235,4 +235,17 @@ message Listener {
// If no configuration is specified, Envoy will not attempt to balance active connections between
// worker threads.
ConnectionBalanceConfig connection_balance_config = 20;

// When this flag is set to true, listeners set the *SO_REUSEPORT* socket option and
// create one socket for each worker thread. This makes inbound connections
// distribute among worker threads roughly evenly in cases where there are a high number
// of connections. When this flag is set to false, all worker threads share one socket.
// For UDP this flag is set to true forcibly.
//
// Before Linux v4.19-rc1, new TCP connections may be rejected during hot restart
// (see `3rd paragraph in 'soreuseport' commit message
// <https://github.com/torvalds/linux/commit/c617f398edd4db2b8567a28e89>`_).
// This issue was fixed by `tcp: Avoid TCP syncookie rejected by SO_REUSEPORT socket
// <https://github.com/torvalds/linux/commit/40a1227ea845a37ab197dd1caffb60b047fa36b1>`_.
bool reuse_port = 21;
}
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Version history
* http: support :ref:`auto_host_rewrite_header<envoy_api_field_config.filter.http.dynamic_forward_proxy.v2alpha.PerRouteConfig.auto_host_rewrite_header>` in the dynamic forward proxy.
* jwt_authn: added :ref:`bypass_cors_preflight<envoy_api_field_config.filter.http.jwt_authn.v2alpha.JwtAuthentication.bypass_cors_preflight>` to allow bypassing the CORS preflight request.
* lb_subset_config: new fallback policy for selectors: :ref:`KEYS_SUBSET<envoy_api_enum_value_Cluster.LbSubsetConfig.LbSubsetSelector.LbSubsetSelectorFallbackPolicy.KEYS_SUBSET>`
* listeners: added :ref:`reuse_port<envoy_api_field_Listener.reuse_port>` option.
* logger: added :ref:`--log-format-escaped <operations_cli>` command line option to escape newline characters in application logs.
* redis: performance improvement for larger split commands by avoiding string copies.
* router: added support for REQ(header-name) :ref:`header formatter <config_http_conn_man_headers_custom_request_headers>`.
Expand Down
3 changes: 1 addition & 2 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ class ListenSocketFactory {
virtual const Address::InstanceConstSharedPtr& localAddress() const PURE;

/**
* @return the socket if getListenSocket() returns a shared socket among each call,
* nullopt otherwise.
* @return the socket shared by worker threads if any; otherwise return null.
*/
virtual absl::optional<std::reference_wrapper<Socket>> sharedSocket() const PURE;
};
Expand Down
19 changes: 17 additions & 2 deletions include/envoy/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ class LdsApi {

using LdsApiPtr = std::unique_ptr<LdsApi>;

struct ListenSocketCreationParams {
ListenSocketCreationParams(bool bind_to_port, bool duplicate_parent_socket = true)
: bind_to_port(bind_to_port), duplicate_parent_socket(duplicate_parent_socket) {}

// For testing.
bool operator==(const ListenSocketCreationParams& rhs) const;
bool operator!=(const ListenSocketCreationParams& rhs) const;

// whether to actually bind the socket.
bool bind_to_port;
// whether to duplicate socket from hot restart parent.
bool duplicate_parent_socket;
};

/**
* Factory for creating listener components.
*/
Expand All @@ -49,13 +63,14 @@ class ListenerComponentFactory {
* @param address supplies the socket's address.
* @param socket_type the type of socket (stream or datagram) to create.
* @param options to be set on the created socket just before calling 'bind()'.
* @param bind_to_port supplies whether to actually bind the socket.
* @param params used to control how a socket being created.
* @return Network::SocketSharedPtr an initialized and potentially bound socket.
*/
virtual Network::SocketSharedPtr
createListenSocket(Network::Address::InstanceConstSharedPtr address,
Network::Address::SocketType socket_type,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port) PURE;
const Network::Socket::OptionsSharedPtr& options,
const ListenSocketCreationParams& params) PURE;

/**
* Creates a list of filter factories.
Expand Down
7 changes: 7 additions & 0 deletions source/common/network/socket_option_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,12 @@ std::unique_ptr<Socket::Options> SocketOptionFactory::buildRxQueueOverFlowOption
return options;
}

std::unique_ptr<Socket::Options> SocketOptionFactory::buildReusePortOptions() {
std::unique_ptr<Socket::Options> options = std::make_unique<Socket::Options>();
options->push_back(std::make_shared<Network::SocketOptionImpl>(
envoy::api::v2::core::SocketOption::STATE_PREBIND, ENVOY_SOCKET_SO_REUSEPORT, 1));
return options;
}

} // namespace Network
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/network/socket_option_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class SocketOptionFactory : Logger::Loggable<Logger::Id::connection> {
const Protobuf::RepeatedPtrField<envoy::api::v2::core::SocketOption>& socket_options);
static std::unique_ptr<Socket::Options> buildIpPacketInfoOptions();
static std::unique_ptr<Socket::Options> buildRxQueueOverFlowOptions();
static std::unique_ptr<Socket::Options> buildReusePortOptions();
};
} // namespace Network
} // namespace Envoy
6 changes: 6 additions & 0 deletions source/common/network/socket_option_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ namespace Network {
#define ENVOY_SOCKET_SO_MARK Network::SocketOptionName()
#endif

#ifdef SO_REUSEPORT
#define ENVOY_SOCKET_SO_REUSEPORT ENVOY_MAKE_SOCKET_OPTION_NAME(SOL_SOCKET, SO_REUSEPORT)
#else
#define ENVOY_SOCKET_SO_REUSEPORT Network::SocketOptionName()
#endif

#ifdef TCP_KEEPCNT
#define ENVOY_SOCKET_TCP_KEEPCNT ENVOY_MAKE_SOCKET_OPTION_NAME(IPPROTO_TCP, TCP_KEEPCNT)
#else
Expand Down
2 changes: 1 addition & 1 deletion source/server/config_validation/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class ValidationInstance final : Logger::Loggable<Logger::Id::main>,
Network::SocketSharedPtr createListenSocket(Network::Address::InstanceConstSharedPtr,
Network::Address::SocketType,
const Network::Socket::OptionsSharedPtr&,
bool) override {
const ListenSocketCreationParams&) override {
// Returned sockets are not currently used so we can return nothing here safely vs. a
// validation mock.
return nullptr;
Expand Down
118 changes: 72 additions & 46 deletions source/server/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,54 @@
namespace Envoy {
namespace Server {

ListenSocketFactoryImplBase::ListenSocketFactoryImplBase(
ListenerComponentFactory& factory, Network::Address::InstanceConstSharedPtr local_address,
Network::Address::SocketType socket_type, const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port, const std::string& listener_name)
: factory_(factory), local_address_(local_address), socket_type_(socket_type),
options_(options), bind_to_port_(bind_to_port), listener_name_(listener_name) {}

Network::SocketSharedPtr ListenSocketFactoryImplBase::createListenSocketAndApplyOptions() {
ListenSocketFactoryImpl::ListenSocketFactoryImpl(ListenerComponentFactory& factory,
Network::Address::InstanceConstSharedPtr address,
Network::Address::SocketType socket_type,
const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port,
const std::string& listener_name, bool reuse_port)
: factory_(factory), local_address_(address), socket_type_(socket_type), options_(options),
bind_to_port_(bind_to_port), listener_name_(listener_name), reuse_port_(reuse_port) {

bool create_socket = false;
if (local_address_->type() == Network::Address::Type::Ip) {
if (socket_type_ == Network::Address::SocketType::Datagram) {
ASSERT(reuse_port_ == true);
}

if (reuse_port_ == false) {
// create a socket which will be used by all worker threads
create_socket = true;
} else if (local_address_->ip()->port() == 0) {
// port is 0, need to create a socket here for reserving a real port number,
// then all worker threads should use same port.
create_socket = true;
}
} else {
ASSERT(local_address_->type() == Network::Address::Type::Pipe);
// Listeners with Unix domain socket always use shared socket.
create_socket = true;
}

if (create_socket) {
socket_ = createListenSocketAndApplyOptions();
}

if (socket_ && local_address_->ip() && local_address_->ip()->port() == 0) {
local_address_ = socket_->localAddress();
}

ENVOY_LOG(debug, "Set listener {} socket factory local address to {}", listener_name_,
local_address_->asString());
}

Network::SocketSharedPtr ListenSocketFactoryImpl::createListenSocketAndApplyOptions() {
// socket might be nullptr depending on factory_ implementation.
Network::SocketSharedPtr socket =
factory_.createListenSocket(local_address_, socket_type_, options_, bind_to_port_);
Network::SocketSharedPtr socket = factory_.createListenSocket(
local_address_, socket_type_, options_, {bind_to_port_, !reuse_port_});

// Binding is done by now.
ENVOY_LOG(info, "Create listen socket for listener {} on address {}", listener_name_,
ENVOY_LOG(debug, "Create listen socket for listener {} on address {}", listener_name_,
local_address_->asString());
if (socket != nullptr && options_ != nullptr) {
const bool ok = Network::Socket::applyOptions(options_, *socket,
Expand All @@ -59,42 +94,30 @@ Network::SocketSharedPtr ListenSocketFactoryImplBase::createListenSocketAndApply
return socket;
}

void ListenSocketFactoryImplBase::setLocalAddress(
Network::Address::InstanceConstSharedPtr local_address) {
ENVOY_LOG(debug, "Set listener {} socket factory local address to {}", listener_name_,
local_address->asString());
local_address_ = local_address;
}

TcpListenSocketFactory::TcpListenSocketFactory(
ListenerComponentFactory& factory, Network::Address::InstanceConstSharedPtr local_address,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port,
const std::string& listener_name)
: ListenSocketFactoryImplBase(factory, local_address, Network::Address::SocketType::Stream,
options, bind_to_port, listener_name) {
socket_ = createListenSocketAndApplyOptions();
if (socket_ != nullptr && localAddress()->ip() != nullptr && localAddress()->ip()->port() == 0) {
setLocalAddress(socket_->localAddress());
Network::SocketSharedPtr ListenSocketFactoryImpl::getListenSocket() {
if (!reuse_port_) {
return socket_;
}
}

Network::SocketSharedPtr TcpListenSocketFactory::getListenSocket() { return socket_; }

UdpListenSocketFactory::UdpListenSocketFactory(
ListenerComponentFactory& factory, Network::Address::InstanceConstSharedPtr local_address,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port,
const std::string& listener_name)
: ListenSocketFactoryImplBase(factory, local_address, Network::Address::SocketType::Datagram,
options, bind_to_port, listener_name) {}

Network::SocketSharedPtr UdpListenSocketFactory::getListenSocket() {
// TODO(danzh) add support of SO_REUSEPORT. Currently calling this method twice will fail because
// the port is already in use.
Network::SocketSharedPtr socket = createListenSocketAndApplyOptions();
if (socket != nullptr && localAddress()->ip() != nullptr && localAddress()->ip()->port() == 0) {
setLocalAddress(socket->localAddress());
Network::SocketSharedPtr socket;
absl::call_once(steal_once_, [this, &socket]() {
if (socket_) {
// If a listener's port is set to 0, socket_ should be created for reserving a port
// number, it is handed over to the first worker thread came here.
// There are several reasons for doing this:
// - for UDP, once a socket being bound, it begins to receive packets, it can't be
// left unused, and closing it will lost packets received by it.
// - port number should be reserved before adding listener to active_listeners_ list,
// otherwise admin API /listeners might return 0 as listener's port.
socket = std::move(socket_);
}
});

if (socket) {
return socket;
}
return socket;

return createListenSocketAndApplyOptions();
}

ListenerImpl::ListenerImpl(const envoy::api::v2::Listener& config, const std::string& version_info,
Expand All @@ -120,18 +143,21 @@ ListenerImpl::ListenerImpl(const envoy::api::v2::Listener& config, const std::st
listener_filters_timeout_(
PROTOBUF_GET_MS_OR_DEFAULT(config, listener_filters_timeout, 15000)),
continue_on_listener_filters_timeout_(config.continue_on_listener_filters_timeout()) {
Network::Address::SocketType socket_type =
Network::Utility::protobufAddressSocketType(config.address());
if (PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, transparent, false)) {
addListenSocketOptions(Network::SocketOptionFactory::buildIpTransparentOptions());
}
if (PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, freebind, false)) {
addListenSocketOptions(Network::SocketOptionFactory::buildIpFreebindOptions());
}
if ((socket_type == Network::Address::SocketType::Datagram) || config.reuse_port()) {
addListenSocketOptions(Network::SocketOptionFactory::buildReusePortOptions());
}
if (!config.socket_options().empty()) {
addListenSocketOptions(
Network::SocketOptionFactory::buildLiteralOptions(config.socket_options()));
}
Network::Address::SocketType socket_type =
Network::Utility::protobufAddressSocketType(config.address());
if (socket_type == Network::Address::SocketType::Datagram) {
// Needed for recvmsg to return destination address in IP header.
addListenSocketOptions(Network::SocketOptionFactory::buildIpPacketInfoOptions());
Expand Down
Loading

0 comments on commit 8534ac8

Please sign in to comment.