Skip to content

Commit

Permalink
stream info: cleanup address handling
Browse files Browse the repository at this point in the history
Consolidate all downstream address handling setting into a single
function. Also remove duplicate setting in the connection handler.
This should make this logic less error prone than it was previously.

Fixes #14133

Signed-off-by: Matt Klein <mklein@lyft.com>
  • Loading branch information
mattklein123 committed Dec 15, 2020
1 parent 93ee668 commit 06008a9
Show file tree
Hide file tree
Showing 21 changed files with 93 additions and 119 deletions.
24 changes: 3 additions & 21 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ enum class ConnectionCloseType {
/**
* An abstract raw connection. Free the connection or call close() to disconnect.
*/
class Connection : public Event::DeferredDeletable, public FilterManager {
class Connection : public Event::DeferredDeletable,
public FilterManager,
public ConnectedSocketAddressProvider {
public:
enum class State { Open, Closing, Closed };

Expand Down Expand Up @@ -187,18 +189,6 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
*/
virtual bool readEnabled() const PURE;

/**
* @return The address of the remote client. Note that this method will never return nullptr.
*/
virtual const Network::Address::InstanceConstSharedPtr& remoteAddress() const PURE;

/**
* @return The address of the remote directly connected peer. Note that this method
* will never return nullptr. This address is not affected or modified by PROXY protocol
* or any other listener filter.
*/
virtual const Network::Address::InstanceConstSharedPtr& directRemoteAddress() const PURE;

/**
* Credentials of the peer of a socket as decided by SO_PEERCRED.
*/
Expand All @@ -223,14 +213,6 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
*/
virtual absl::optional<UnixDomainSocketPeerCredentials> unixSocketPeerCredentials() const PURE;

/**
* @return the local address of the connection. For client connections, this is the origin
* address. For server connections, this is the local destination address. For server connections
* it can be different from the proxy address if the downstream connection has been redirected or
* the proxy is operating in transparent mode. Note that this method will never return nullptr.
*/
virtual const Network::Address::InstanceConstSharedPtr& localAddress() const PURE;

/**
* Set the stats to update for various connection state changes. Note that for performance reasons
* these stats are eventually consistent and may not always accurately represent the connection
Expand Down
20 changes: 12 additions & 8 deletions include/envoy/network/listen_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,10 @@ namespace Envoy {
namespace Network {

/**
* A socket passed to a connection. For server connections this represents the accepted socket, and
* for client connections this represents the socket being connected to a remote address.
*
* TODO(jrajahalme): Hide internals (e.g., fd) from listener filters by providing callbacks filters
* may need (set/getsockopt(), peek(), recv(), etc.)
* Interface for providing a connected socket's remote addresses.
*/
class ConnectionSocket : public virtual Socket {
class ConnectedSocketAddressProvider : public virtual SocketAddressProvider {
public:
~ConnectionSocket() override = default;

/**
* @return the remote address of the socket.
*/
Expand All @@ -39,7 +33,17 @@ class ConnectionSocket : public virtual Socket {
* connected peer, and cannot be modified by listener filters.
*/
virtual const Address::InstanceConstSharedPtr& directRemoteAddress() const PURE;
};

/**
* A socket passed to a connection. For server connections this represents the accepted socket, and
* for client connections this represents the socket being connected to a remote address.
*
* TODO(jrajahalme): Hide internals (e.g., fd) from listener filters by providing callbacks filters
* may need (set/getsockopt(), peek(), recv(), etc.)
*/
class ConnectionSocket : public virtual Socket, public ConnectedSocketAddressProvider {
public:
/**
* Restores the local address of the socket. On accepted sockets the local address defaults to the
* one at which the connection was received at, which is the same as the listener's address, if
Expand Down
20 changes: 13 additions & 7 deletions include/envoy/network/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,27 @@ struct SocketOptionName {
Network::SocketOptionName(level, option, #level "/" #option)

/**
* Base class for Sockets
* Interface for providing a socket's local address.
*/
class Socket {
class SocketAddressProvider {
public:
virtual ~Socket() = default;
virtual ~SocketAddressProvider() = default;

/**
* Type of sockets supported. See man 2 socket for more details
* @return the local address of the socket.
*/
enum class Type { Stream, Datagram };
virtual const Address::InstanceConstSharedPtr& localAddress() const PURE;
};

/**
* Base class for Sockets
*/
class Socket : public virtual SocketAddressProvider {
public:
/**
* @return the local address of the socket.
* Type of sockets supported. See man 2 socket for more details
*/
virtual const Address::InstanceConstSharedPtr& localAddress() const PURE;
enum class Type { Stream, Datagram };

/**
* Set the local address of the socket. On accepted sockets the local address defaults to the
Expand Down
1 change: 1 addition & 0 deletions include/envoy/stream_info/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ envoy_cc_library(
"//include/envoy/http:header_map_interface",
"//include/envoy/http:protocol_interface",
"//include/envoy/http:request_id_extension_interface",
"//include/envoy/network:listen_socket_interface",
"//include/envoy/ssl:connection_interface",
"//include/envoy/upstream:host_description_interface",
"//source/common/common:assert_lib",
Expand Down
21 changes: 7 additions & 14 deletions include/envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "envoy/http/header_map.h"
#include "envoy/http/protocol.h"
#include "envoy/http/request_id_extension.h"
#include "envoy/network/listen_socket.h"
#include "envoy/ssl/connection.h"
#include "envoy/stream_info/filter_state.h"
#include "envoy/upstream/host_description.h"
Expand Down Expand Up @@ -445,25 +446,11 @@ class StreamInfo {
*/
virtual void healthCheck(bool is_health_check) PURE;

/**
* @param downstream_local_address sets the local address of the downstream connection. Note that
* it can be different than the local address of the upstream connection.
*/
virtual void setDownstreamLocalAddress(
const Network::Address::InstanceConstSharedPtr& downstream_local_address) PURE;

/**
* @return the downstream local address. Note that this will never be nullptr.
*/
virtual const Network::Address::InstanceConstSharedPtr& downstreamLocalAddress() const PURE;

/**
* @param downstream_direct_remote_address sets the direct physical address of downstream
* connection.
*/
virtual void setDownstreamDirectRemoteAddress(
const Network::Address::InstanceConstSharedPtr& downstream_direct_remote_address) PURE;

/**
* @return the downstream directly connected address. This will never be nullptr. This is
* equivalent to the address of the physical connection.
Expand All @@ -484,6 +471,12 @@ class StreamInfo {
*/
virtual const Network::Address::InstanceConstSharedPtr& downstreamRemoteAddress() const PURE;

/**
* Set the stream's downstream addresses as a set.
*/
virtual void
setDownstreamAddresses(const Network::ConnectedSocketAddressProvider& address_provider) PURE;

/**
* @param connection_info sets the downstream ssl connection.
*/
Expand Down
8 changes: 2 additions & 6 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -618,16 +618,12 @@ ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connect
} else {
connection_manager_.stats_.named_.downstream_rq_http1_total_.inc();
}
filter_manager_.streamInfo().setDownstreamLocalAddress(
connection_manager_.read_callbacks_->connection().localAddress());
filter_manager_.streamInfo().setDownstreamDirectRemoteAddress(
connection_manager_.read_callbacks_->connection().directRemoteAddress());
// Initially, the downstream remote address is the source address of the
// downstream connection. That can change later in the request's lifecycle,
// based on XFF processing, but setting the downstream remote address here
// prevents surprises for logging code in edge cases.
filter_manager_.streamInfo().setDownstreamRemoteAddress(
connection_manager_.read_callbacks_->connection().remoteAddress());
filter_manager_.streamInfo().setDownstreamAddresses(
connection_manager_.read_callbacks_->connection());

filter_manager_.streamInfo().setDownstreamSslConnection(
connection_manager_.read_callbacks_->connection().ssl());
Expand Down
17 changes: 7 additions & 10 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,20 +184,10 @@ struct StreamInfoImpl : public StreamInfo {

void healthCheck(bool is_health_check) override { health_check_request_ = is_health_check; }

void setDownstreamLocalAddress(
const Network::Address::InstanceConstSharedPtr& downstream_local_address) override {
downstream_local_address_ = downstream_local_address;
}

const Network::Address::InstanceConstSharedPtr& downstreamLocalAddress() const override {
return downstream_local_address_;
}

void setDownstreamDirectRemoteAddress(
const Network::Address::InstanceConstSharedPtr& downstream_direct_remote_address) override {
downstream_direct_remote_address_ = downstream_direct_remote_address;
}

const Network::Address::InstanceConstSharedPtr& downstreamDirectRemoteAddress() const override {
return downstream_direct_remote_address_;
}
Expand All @@ -211,6 +201,13 @@ struct StreamInfoImpl : public StreamInfo {
return downstream_remote_address_;
}

void
setDownstreamAddresses(const Network::ConnectedSocketAddressProvider& address_provider) override {
downstream_local_address_ = address_provider.localAddress();
downstream_direct_remote_address_ = address_provider.directRemoteAddress();
downstream_remote_address_ = address_provider.remoteAddress();
}

void
setDownstreamSslConnection(const Ssl::ConnectionInfoConstSharedPtr& connection_info) override {
downstream_ssl_info_ = connection_info;
Expand Down
3 changes: 1 addition & 2 deletions source/common/upstream/health_checker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,7 @@ void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onInterval() {
Router::FilterUtility::setUpstreamScheme(
*request_headers, host_->transportSocketFactory().implementsSecureTransport());
StreamInfo::StreamInfoImpl stream_info(protocol_, parent_.dispatcher_.timeSource());
stream_info.setDownstreamLocalAddress(local_address_);
stream_info.setDownstreamRemoteAddress(local_address_);
stream_info.setDownstreamAddresses(*this);
stream_info.onUpstreamHostSelected(host_);
parent_.request_headers_parser_->evaluateHeaders(*request_headers, stream_info);
auto status = request_encoder->encodeHeaders(*request_headers, true);
Expand Down
15 changes: 14 additions & 1 deletion source/common/upstream/health_checker_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/config/core/v3/health_check.pb.h"
#include "envoy/data/core/v3/health_check_event.pb.h"
#include "envoy/grpc/status.h"
#include "envoy/network/listen_socket.h"
#include "envoy/type/v3/http.pb.h"
#include "envoy/type/v3/range.pb.h"

Expand Down Expand Up @@ -71,7 +72,8 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase {
private:
struct HttpActiveHealthCheckSession : public ActiveHealthCheckSession,
public Http::ResponseDecoder,
public Http::StreamCallbacks {
public Http::StreamCallbacks,
public Network::ConnectedSocketAddressProvider {
HttpActiveHealthCheckSession(HttpHealthCheckerImpl& parent, const HostSharedPtr& host);
~HttpActiveHealthCheckSession() override;

Expand Down Expand Up @@ -104,6 +106,17 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase {
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

// Network::ConnectedSocketAddressProvider
const Network::Address::InstanceConstSharedPtr& localAddress() const override {
return local_address_;
}
const Network::Address::InstanceConstSharedPtr& directRemoteAddress() const override {
return local_address_;
}
const Network::Address::InstanceConstSharedPtr& remoteAddress() const override {
return local_address_;
}

void onEvent(Network::ConnectionEvent event);

class ConnectionCallbackImpl : public Network::ConnectionCallbacks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,7 @@ ActiveMessage::ActiveMessage(ConnectionManager& parent)
stream_info_(parent.timeSystem()), pending_stream_decoded_(false),
local_response_sent_(false) {
parent_.stats().request_active_.inc();
stream_info_.setDownstreamLocalAddress(parent_.connection().localAddress());
stream_info_.setDownstreamRemoteAddress(parent_.connection().remoteAddress());
stream_info_.setDownstreamDirectRemoteAddress(parent_.connection().directRemoteAddress());
stream_info_.setDownstreamAddresses(parent_.connection());
}

ActiveMessage::~ActiveMessage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,7 @@ class ConnectionManager : public Network::ReadFilter,
stream_info_(parent_.time_source_), local_response_sent_{false}, pending_transport_end_{
false} {
parent_.stats_.request_active_.inc();

stream_info_.setDownstreamLocalAddress(parent_.read_callbacks_->connection().localAddress());
stream_info_.setDownstreamRemoteAddress(
parent_.read_callbacks_->connection().remoteAddress());
stream_info_.setDownstreamDirectRemoteAddress(
parent_.read_callbacks_->connection().directRemoteAddress());
stream_info_.setDownstreamAddresses(parent_.read_callbacks_->connection());
}
~ActiveRpc() override {
request_timer_->complete();
Expand Down
3 changes: 1 addition & 2 deletions source/server/connection_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,7 @@ void ConnectionHandlerImpl::ActiveTcpListener::newConnection(
Network::ConnectionSocketPtr&& socket, std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
// Refresh addresses in case they are modified by listener filters, such as proxy protocol or
// original_dst.
stream_info->setDownstreamLocalAddress(socket->localAddress());
stream_info->setDownstreamRemoteAddress(socket->remoteAddress());
stream_info->setDownstreamAddresses(*socket);

// Find matching filter chain.
const auto filter_chain = config_->filterChainManager().findFilterChain(*socket);
Expand Down
3 changes: 0 additions & 3 deletions source/server/connection_handler_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,6 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler,
listener_.parent_.dispatcher_.timeSource(),
StreamInfo::FilterState::LifeSpan::Connection)) {
listener_.stats_.downstream_pre_cx_active_.inc();
stream_info_->setDownstreamLocalAddress(socket_->localAddress());
stream_info_->setDownstreamRemoteAddress(socket_->remoteAddress());
stream_info_->setDownstreamDirectRemoteAddress(socket_->directRemoteAddress());
}
~ActiveTcpSocket() override {
accept_filters_.clear();
Expand Down
1 change: 1 addition & 0 deletions test/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ envoy_cc_test(
"//source/common/stats:stats_lib",
"//source/common/tracing:http_tracer_lib",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/network:network_mocks",
"//test/mocks/tracing:tracing_mocks",
"//test/proto:helloworld_proto_cc_proto",
"//test/test_common:test_time_lib",
Expand Down
6 changes: 4 additions & 2 deletions test/common/grpc/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ TEST_F(EnvoyAsyncClientImplTest, MetadataIsInitialized) {
.WillOnce(Invoke([&http_callbacks](Http::HeaderMap&, bool) { http_callbacks->onReset(); }));

// Prepare the parent context of this call.
NiceMock<Network::MockConnectionSocket> socket;
socket.local_address_ =
std::make_shared<Network::Address::Ipv4Instance>(expected_downstream_local_address);
StreamInfo::StreamInfoImpl stream_info{test_time_.timeSystem()};
stream_info.setDownstreamLocalAddress(
std::make_shared<Network::Address::Ipv4Instance>(expected_downstream_local_address));
stream_info.setDownstreamAddresses(socket);
Http::AsyncClient::ParentContext parent_context{&stream_info};

Http::AsyncClient::StreamOptions stream_options;
Expand Down
7 changes: 5 additions & 2 deletions test/common/grpc/google_async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "common/stream_info/stream_info_impl.h"

#include "test/mocks/grpc/mocks.h"
#include "test/mocks/network/mocks.h"
#include "test/mocks/tracing/mocks.h"
#include "test/proto/helloworld.pb.h"
#include "test/test_common/test_time.h"
Expand Down Expand Up @@ -121,9 +122,11 @@ TEST_F(EnvoyGoogleAsyncClientImplTest, MetadataIsInitialized) {
EXPECT_CALL(grpc_callbacks, onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, ""));

// Prepare the parent context of this call.
NiceMock<Network::MockConnectionSocket> socket;
socket.local_address_ =
std::make_shared<Network::Address::Ipv4Instance>(expected_downstream_local_address);
StreamInfo::StreamInfoImpl stream_info{test_time_.timeSystem()};
stream_info.setDownstreamLocalAddress(
std::make_shared<Network::Address::Ipv4Instance>(expected_downstream_local_address));
stream_info.setDownstreamAddresses(socket);
Http::AsyncClient::ParentContext parent_context{&stream_info};

Http::AsyncClient::StreamOptions stream_options;
Expand Down
16 changes: 6 additions & 10 deletions test/common/stream_info/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,9 @@ class TestStreamInfo : public StreamInfo::StreamInfo {
}
bool healthCheck() const override { return health_check_request_; }
void healthCheck(bool is_health_check) override { health_check_request_ = is_health_check; }

void setDownstreamLocalAddress(
const Network::Address::InstanceConstSharedPtr& downstream_local_address) override {
downstream_local_address_ = downstream_local_address;
}
const Network::Address::InstanceConstSharedPtr& downstreamLocalAddress() const override {
return downstream_local_address_;
}
void setDownstreamDirectRemoteAddress(
const Network::Address::InstanceConstSharedPtr& downstream_direct_remote_address) override {
downstream_direct_remote_address_ = downstream_direct_remote_address;
}
const Network::Address::InstanceConstSharedPtr& downstreamDirectRemoteAddress() const override {
return downstream_direct_remote_address_;
}
Expand All @@ -95,7 +86,12 @@ class TestStreamInfo : public StreamInfo::StreamInfo {
const Network::Address::InstanceConstSharedPtr& downstreamRemoteAddress() const override {
return downstream_remote_address_;
}

void
setDownstreamAddresses(const Network::ConnectedSocketAddressProvider& address_provider) override {
downstream_local_address_ = address_provider.localAddress();
downstream_direct_remote_address_ = address_provider.directRemoteAddress();
downstream_remote_address_ = address_provider.remoteAddress();
}
void
setDownstreamSslConnection(const Ssl::ConnectionInfoConstSharedPtr& connection_info) override {
downstream_connection_info_ = connection_info;
Expand Down
Loading

0 comments on commit 06008a9

Please sign in to comment.