Skip to content

Commit

Permalink
quiche: add multi-worker support for QUIC via BPF (envoyproxy#9424)
Browse files Browse the repository at this point in the history
Signed-off-by: Dan Zhang <danzh@google.com>
  • Loading branch information
danzh2010 authored Feb 7, 2020
1 parent 4300ad4 commit e176b30
Show file tree
Hide file tree
Showing 28 changed files with 358 additions and 56 deletions.
2 changes: 1 addition & 1 deletion include/envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class ActiveUdpListenerFactory {
*/
virtual ConnectionHandler::ActiveListenerPtr
createActiveUdpListener(ConnectionHandler& parent, Event::Dispatcher& disptacher,
Network::ListenerConfig& config) const PURE;
Network::ListenerConfig& config) PURE;

/**
* @return true if the UDP passing through listener doesn't form stateful connections.
Expand Down
2 changes: 1 addition & 1 deletion include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class ListenerConfig {
* @return factory pointer if listening on UDP socket, otherwise return
* nullptr.
*/
virtual const ActiveUdpListenerFactory* udpListenerFactory() PURE;
virtual ActiveUdpListenerFactory* udpListenerFactory() PURE;

/**
* @return traffic direction of the listener.
Expand Down
4 changes: 3 additions & 1 deletion include/envoy/server/active_udp_listener_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ class ActiveUdpListenerConfigFactory : public Config::UntypedFactory {

/**
* Create an ActiveUdpListenerFactory object according to given message.
* @param message specifies QUIC protocol options in a protobuf.
* @param concurrency is the number of listeners instances to be created.
*/
virtual Network::ActiveUdpListenerFactoryPtr
createActiveUdpListenerFactory(const Protobuf::Message& message) PURE;
createActiveUdpListenerFactory(const Protobuf::Message& message, uint32_t concurrency) PURE;

std::string category() const override { return "envoy.udp_listeners"; }
};
Expand Down
7 changes: 7 additions & 0 deletions source/common/network/socket_option_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ static_assert(IP_RECVDSTADDR == IP_SENDSRCADDR);
// receiving destination address.
#define ENVOY_SELF_IPV6_ADDR ENVOY_MAKE_SOCKET_OPTION_NAME(IPPROTO_IPV6, IPV6_RECVPKTINFO)

#ifdef SO_ATTACH_REUSEPORT_CBPF
#define ENVOY_ATTACH_REUSEPORT_CBPF \
ENVOY_MAKE_SOCKET_OPTION_NAME(SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF)
#else
#define ENVOY_ATTACH_REUSEPORT_CBPF Network::SocketOptionName()
#endif

class SocketOptionImpl : public Socket::Option, Logger::Loggable<Logger::Id::connection> {
public:
SocketOptionImpl(envoy::config::core::v3::SocketOption::SocketState in_state,
Expand Down
110 changes: 106 additions & 4 deletions source/extensions/quic_listeners/quiche/active_quic_listener.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#include "extensions/quic_listeners/quiche/active_quic_listener.h"

#if defined(__linux__)
#include <linux/filter.h>
#endif

#include <vector>

#include "extensions/quic_listeners/quiche/envoy_quic_alarm_factory.h"
#include "extensions/quic_listeners/quiche/envoy_quic_connection_helper.h"
#include "extensions/quic_listeners/quiche/envoy_quic_dispatcher.h"
Expand All @@ -13,19 +19,32 @@ namespace Quic {
ActiveQuicListener::ActiveQuicListener(Event::Dispatcher& dispatcher,
Network::ConnectionHandler& parent,
Network::ListenerConfig& listener_config,
const quic::QuicConfig& quic_config)
const quic::QuicConfig& quic_config,
Network::Socket::OptionsSharedPtr options)
: ActiveQuicListener(dispatcher, parent,
listener_config.listenSocketFactory().getListenSocket(), listener_config,
quic_config) {}
quic_config, std::move(options)) {}

ActiveQuicListener::ActiveQuicListener(Event::Dispatcher& dispatcher,
Network::ConnectionHandler& parent,
Network::SocketSharedPtr listen_socket,
Network::ListenerConfig& listener_config,
const quic::QuicConfig& quic_config)
const quic::QuicConfig& quic_config,
Network::Socket::OptionsSharedPtr options)
: Server::ConnectionHandlerImpl::ActiveListenerImplBase(parent, listener_config),
dispatcher_(dispatcher), version_manager_(quic::CurrentSupportedVersions()),
listen_socket_(*listen_socket) {
if (options != nullptr) {
const bool ok = Network::Socket::applyOptions(
options, listen_socket_, envoy::config::core::v3::SocketOption::STATE_BOUND);
if (!ok) {
ENVOY_LOG(warn, "Failed to apply socket options to socket {} on listener {} after binding",
listen_socket_.ioHandle().fd(), listener_config.name());
throw EnvoyException("Failed to apply socket options.");
}
listen_socket_.addOptions(options);
}

udp_listener_ = dispatcher_.createUdpListener(std::move(listen_socket), *this);
quic::QuicRandom* const random = quic::QuicRandom::GetInstance();
random->RandBytes(random_seed_, sizeof(random_seed_));
Expand All @@ -41,7 +60,7 @@ ActiveQuicListener::ActiveQuicListener(Event::Dispatcher& dispatcher,
quic_dispatcher_ = std::make_unique<EnvoyQuicDispatcher>(
crypto_config_.get(), quic_config, &version_manager_, std::move(connection_helper),
std::move(alarm_factory), quic::kQuicDefaultConnectionIdLength, parent, config_, stats_,
dispatcher, listen_socket_);
per_worker_stats_, dispatcher, listen_socket_);
quic_dispatcher_->InitializeWithWriter(new EnvoyQuicPacketWriter(listen_socket_));
}

Expand Down Expand Up @@ -82,5 +101,88 @@ void ActiveQuicListener::onWriteReady(const Network::Socket& /*socket*/) {
quic_dispatcher_->OnCanWrite();
}

ActiveQuicListenerFactory::ActiveQuicListenerFactory(
const envoy::config::listener::v3::QuicProtocolOptions& config, uint32_t concurrency)
: concurrency_(concurrency) {
uint64_t idle_network_timeout_ms =
config.has_idle_timeout() ? DurationUtil::durationToMilliseconds(config.idle_timeout())
: 300000;
quic_config_.SetIdleNetworkTimeout(
quic::QuicTime::Delta::FromMilliseconds(idle_network_timeout_ms),
quic::QuicTime::Delta::FromMilliseconds(idle_network_timeout_ms));
int32_t max_time_before_crypto_handshake_ms =
config.has_crypto_handshake_timeout()
? DurationUtil::durationToMilliseconds(config.crypto_handshake_timeout())
: 20000;
quic_config_.set_max_time_before_crypto_handshake(
quic::QuicTime::Delta::FromMilliseconds(max_time_before_crypto_handshake_ms));
int32_t max_streams = PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_concurrent_streams, 100);
quic_config_.SetMaxIncomingBidirectionalStreamsToSend(max_streams);
quic_config_.SetMaxIncomingUnidirectionalStreamsToSend(max_streams);
}

Network::ConnectionHandler::ActiveListenerPtr
ActiveQuicListenerFactory::createActiveUdpListener(Network::ConnectionHandler& parent,
Event::Dispatcher& disptacher,
Network::ListenerConfig& config) {
std::unique_ptr<Network::Socket::Options> options = std::make_unique<Network::Socket::Options>();
#if defined(SO_ATTACH_REUSEPORT_CBPF) && defined(__linux__)
// This BPF filter reads the 1st word of QUIC connection id in the UDP payload and mods it by the
// number of workers to get the socket index in the SO_REUSEPORT socket groups. QUIC packets
// should be at least 9 bytes, with the 1st byte indicating one of the below QUIC packet headers:
// 1) IETF QUIC long header: most significant bit is 1. The connection id starts from the 7th
// byte.
// 2) IETF QUIC short header: most significant bit is 0. The connection id starts from 2nd
// byte.
// 3) Google QUIC header: most significant bit is 0. The connection id starts from 2nd
// byte.
// Any packet that doesn't belong to any of the three packet header types are dispatched
// based on 5-tuple source/destination addresses.
std::vector<sock_filter> filter = {
{0x80, 0, 0, 0000000000}, // ld len
{0x35, 0, 9, 0x00000009}, // jlt #0x9, packet_too_short
{0x30, 0, 0, 0000000000}, // ldb [0]
{0x54, 0, 0, 0x00000080}, // and #0x80
{0x15, 0, 2, 0000000000}, // jne #0, ietf_long_header
{0x20, 0, 0, 0x00000001}, // ld [1]
{0x05, 0, 0, 0x00000005}, // ja return
{0x80, 0, 0, 0000000000}, // ietf_long_header: ld len
{0x35, 0, 2, 0x0000000e}, // jlt #0xe, packet_too_short
{0x20, 0, 0, 0x00000006}, // ld [6]
{0x05, 0, 0, 0x00000001}, // ja return
{0x20, 0, 0, // packet_too_short: ld rxhash
static_cast<uint32_t>(SKF_AD_OFF + SKF_AD_RXHASH)},
{0x94, 0, 0, concurrency_}, // return: mod #socket_count
{0x16, 0, 0, 0000000000}, // ret a
};
sock_fprog prog;
// This option only needs to be applied once to any one of the sockets in SO_REUSEPORT socket
// group. One of the listener will be created with this socket option.
absl::call_once(install_bpf_once_, [&]() {
if (concurrency_ > 1) {
prog.len = filter.size();
prog.filter = filter.data();
options->push_back(std::make_shared<Network::SocketOptionImpl>(
envoy::config::core::v3::SocketOption::STATE_BOUND, ENVOY_ATTACH_REUSEPORT_CBPF,
absl::string_view(reinterpret_cast<char*>(&prog), sizeof(prog))));
}
});
#else
if (concurrency_ > 1) {
#ifdef __APPLE__
// Not support multiple listeners in Mac OS unless someone cares. This is because SO_REUSEPORT
// doesn't behave as expected in Mac OS.(#8794)
ENVOY_LOG(error, "Because SO_REUSEPORT doesn't guarantee stable hashing from network 5 tuple "
"to socket in Mac OS. QUIC connection is not stable with concurrency > 1");
#else
ENVOY_LOG(warn, "BPF filter is not supported on this platform. QUIC won't support connection "
"migration and NAT port rebinding.");
#endif
}
#endif
return std::make_unique<ActiveQuicListener>(disptacher, parent, config, quic_config_,
std::move(options));
}

} // namespace Quic
} // namespace Envoy
35 changes: 12 additions & 23 deletions source/extensions/quic_listeners/quiche/active_quic_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "envoy/network/connection_handler.h"
#include "envoy/network/listener.h"

#include "common/network/socket_option_impl.h"
#include "common/protobuf/utility.h"

#include "server/connection_handler_impl.h"
Expand All @@ -23,11 +24,13 @@ class ActiveQuicListener : public Network::UdpListenerCallbacks,
static const size_t kNumSessionsToCreatePerLoop = 16;

ActiveQuicListener(Event::Dispatcher& dispatcher, Network::ConnectionHandler& parent,
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config);
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config,
Network::Socket::OptionsSharedPtr options);

ActiveQuicListener(Event::Dispatcher& dispatcher, Network::ConnectionHandler& parent,
Network::SocketSharedPtr listen_socket,
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config);
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config,
Network::Socket::OptionsSharedPtr options);

~ActiveQuicListener() override;

Expand Down Expand Up @@ -61,38 +64,24 @@ class ActiveQuicListener : public Network::UdpListenerCallbacks,
using ActiveQuicListenerPtr = std::unique_ptr<ActiveQuicListener>;

// A factory to create ActiveQuicListener based on given config.
class ActiveQuicListenerFactory : public Network::ActiveUdpListenerFactory {
class ActiveQuicListenerFactory : public Network::ActiveUdpListenerFactory,
Logger::Loggable<Logger::Id::quic> {
public:
ActiveQuicListenerFactory(const envoy::config::listener::v3::QuicProtocolOptions& config) {
uint64_t idle_network_timeout_ms =
config.has_idle_timeout() ? DurationUtil::durationToMilliseconds(config.idle_timeout())
: 300000;
quic_config_.SetIdleNetworkTimeout(
quic::QuicTime::Delta::FromMilliseconds(idle_network_timeout_ms),
quic::QuicTime::Delta::FromMilliseconds(idle_network_timeout_ms));
int32_t max_time_before_crypto_handshake_ms =
config.has_crypto_handshake_timeout()
? DurationUtil::durationToMilliseconds(config.crypto_handshake_timeout())
: 20000;
quic_config_.set_max_time_before_crypto_handshake(
quic::QuicTime::Delta::FromMilliseconds(max_time_before_crypto_handshake_ms));
int32_t max_streams = PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_concurrent_streams, 100);
quic_config_.SetMaxIncomingBidirectionalStreamsToSend(max_streams);
quic_config_.SetMaxIncomingUnidirectionalStreamsToSend(max_streams);
}
ActiveQuicListenerFactory(const envoy::config::listener::v3::QuicProtocolOptions& config,
uint32_t concurrency);

// Network::ActiveUdpListenerFactory.
Network::ConnectionHandler::ActiveListenerPtr
createActiveUdpListener(Network::ConnectionHandler& parent, Event::Dispatcher& disptacher,
Network::ListenerConfig& config) const override {
return std::make_unique<ActiveQuicListener>(disptacher, parent, config, quic_config_);
}
Network::ListenerConfig& config) override;
bool isTransportConnectionless() const override { return false; }

private:
friend class ActiveQuicListenerFactoryPeer;

quic::QuicConfig quic_config_;
const uint32_t concurrency_;
absl::once_flag install_bpf_once_;
};

} // namespace Quic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ ProtobufTypes::MessagePtr ActiveQuicListenerConfigFactory::createEmptyConfigProt
}

Network::ActiveUdpListenerFactoryPtr
ActiveQuicListenerConfigFactory::createActiveUdpListenerFactory(const Protobuf::Message& message) {
ActiveQuicListenerConfigFactory::createActiveUdpListenerFactory(const Protobuf::Message& message,
uint32_t concurrency) {
auto& config = dynamic_cast<const envoy::config::listener::v3::QuicProtocolOptions&>(message);
return std::make_unique<ActiveQuicListenerFactory>(config);
return std::make_unique<ActiveQuicListenerFactory>(config, concurrency);
}

std::string ActiveQuicListenerConfigFactory::name() const { return QuicListenerName; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ActiveQuicListenerConfigFactory : public Server::ActiveUdpListenerConfigFa
ProtobufTypes::MessagePtr createEmptyConfigProto() override;

Network::ActiveUdpListenerFactoryPtr
createActiveUdpListenerFactory(const Protobuf::Message&) override;
createActiveUdpListenerFactory(const Protobuf::Message&, uint32_t concurrency) override;

std::string name() const override;
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "extensions/quic_listeners/quiche/envoy_quic_client_connection.h"

#include <memory>

#include "envoy/config/core/v3/base.pb.h"

#include "common/network/listen_socket_impl.h"
Expand Down Expand Up @@ -92,6 +94,14 @@ void EnvoyQuicClientConnection::setUpConnectionSocket() {
}
}

void EnvoyQuicClientConnection::switchConnectionSocket(
Network::ConnectionSocketPtr&& connection_socket) {
auto writer = std::make_unique<EnvoyQuicPacketWriter>(*connection_socket);
setConnectionSocket(std::move(connection_socket));
setUpConnectionSocket();
SetQuicPacketWriter(writer.release(), true);
}

void EnvoyQuicClientConnection::onFileEvent(uint32_t events) {
ENVOY_CONN_LOG(trace, "socket event: {}", *this, events);
ASSERT(events & (Event::FileReadyType::Read | Event::FileReadyType::Write));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class EnvoyQuicClientConnection : public EnvoyQuicConnection, public Network::Ud
// Register file event and apply socket options.
void setUpConnectionSocket();

// Switch underlying socket with the given one. This is used in connection migration.
void switchConnectionSocket(Network::ConnectionSocketPtr&& connection_socket);

private:
EnvoyQuicClientConnection(const quic::QuicConnectionId& server_connection_id,
quic::QuicConnectionHelperInterface& helper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class EnvoyQuicConnection : public quic::QuicConnection,
return *envoy_connection_;
}

void setConnectionSocket(Network::ConnectionSocketPtr&& connection_socket) {
connection_socket_ = std::move(connection_socket);
}

private:
// TODO(danzh): populate stats.
std::unique_ptr<Network::Connection::ConnectionStats> connection_stats_;
Expand Down
12 changes: 10 additions & 2 deletions source/extensions/quic_listeners/quiche/envoy_quic_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ EnvoyQuicDispatcher::EnvoyQuicDispatcher(
std::unique_ptr<quic::QuicAlarmFactory> alarm_factory,
uint8_t expected_server_connection_id_length, Network::ConnectionHandler& connection_handler,
Network::ListenerConfig& listener_config, Server::ListenerStats& listener_stats,
Event::Dispatcher& dispatcher, Network::Socket& listen_socket)
Server::PerHandlerListenerStats& per_worker_stats, Event::Dispatcher& dispatcher,
Network::Socket& listen_socket)
: quic::QuicDispatcher(&quic_config, crypto_config, version_manager, std::move(helper),
std::make_unique<EnvoyQuicCryptoServerStreamHelper>(),
std::move(alarm_factory), expected_server_connection_id_length),
connection_handler_(connection_handler), listener_config_(listener_config),
listener_stats_(listener_stats), dispatcher_(dispatcher), listen_socket_(listen_socket) {
listener_stats_(listener_stats), per_worker_stats_(per_worker_stats), dispatcher_(dispatcher),
listen_socket_(listen_socket) {
// Set send buffer twice of max flow control window to ensure that stream send
// buffer always takes all the data.
// The max amount of data buffered is the per-stream high watermark + the max
Expand All @@ -37,6 +39,8 @@ void EnvoyQuicDispatcher::OnConnectionClosed(quic::QuicConnectionId connection_i
const std::string& error_details,
quic::ConnectionCloseSource source) {
quic::QuicDispatcher::OnConnectionClosed(connection_id, error, error_details, source);
listener_stats_.downstream_cx_active_.dec();
per_worker_stats_.downstream_cx_active_.dec();
connection_handler_.decNumConnections();
}

Expand All @@ -59,6 +63,10 @@ std::unique_ptr<quic::QuicSession> EnvoyQuicDispatcher::CreateQuicSession(
// thing to pay attention is that if the retrieval fails, connection needs to
// be closed, and it should be added to time wait list instead of session map.
connection_handler_.incNumConnections();
listener_stats_.downstream_cx_active_.inc();
listener_stats_.downstream_cx_total_.inc();
per_worker_stats_.downstream_cx_active_.inc();
per_worker_stats_.downstream_cx_total_.inc();
return quic_session;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ class EnvoyQuicDispatcher : public quic::QuicDispatcher {
uint8_t expected_server_connection_id_length,
Network::ConnectionHandler& connection_handler,
Network::ListenerConfig& listener_config,
Server::ListenerStats& listener_stats, Event::Dispatcher& dispatcher,
Network::Socket& listen_socket);
Server::ListenerStats& listener_stats,
Server::PerHandlerListenerStats& per_worker_stats,
Event::Dispatcher& dispatcher, Network::Socket& listen_socket);

void OnConnectionClosed(quic::QuicConnectionId connection_id, quic::QuicErrorCode error,
const std::string& error_details,
Expand All @@ -72,6 +73,7 @@ class EnvoyQuicDispatcher : public quic::QuicDispatcher {
Network::ConnectionHandler& connection_handler_;
Network::ListenerConfig& listener_config_;
Server::ListenerStats& listener_stats_;
Server::PerHandlerListenerStats& per_worker_stats_;
Event::Dispatcher& dispatcher_;
Network::Socket& listen_socket_;
};
Expand Down
Loading

0 comments on commit e176b30

Please sign in to comment.