Skip to content

Commit

Permalink
Fix Blockhole implemention for e2e tests
Browse files Browse the repository at this point in the history
Based on Fu Wei's idea discussed in the issue [1], we employ the
blocking on L7 but without using external tools.

[Problem]

A peer will (a) receive traffic from its peers and (b) initiate
connections to its peers (via stream and pipeline).

Thus, the current mechanism of only blocking peer traffic via the peer's
existing proxy is insufficient, since only scenario (a) is handled, and
scenario (b) is not blocked at all.

[Solution - main idea]

Let's first agree on the naming of the existing proxy as a
"reverse proxy", since it sits in front of every peer and ingest the
traffic into each of them.

We introduce a "forward proxy" for each peer, which will be proxying all
the connections initiated from a peer to its peers.

The modified architecture will look something like this:
```
A -- A's forward proxy - B's reverse proxy - B
     ^ newly introduced  ^ in the original codebase
```

By adding this forward proxy, we can block all in and out traffic that
is initiated from a peer to others, without having to resort to external
tools, such as iptables. It's verified that the blocking of traffic is
complete, compared to previous solutions [2][3].

[Implementation]

The main subtasks are
- set up an environment variable `FORWARD_PROXY`
- implement forward proxy by extending the existing proxy server code
- implement enable/disable of the forward proxy in the e2e test

[Test]
make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionLeader$ go.etcd.io/etcd/tests/v3/e2e -v -count=1

make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionFollower$ go.etcd.io/etcd/tests/v3/e2e -v -count=1

[Known issue]

I run into `context deadline exceeded` sometimes.

[References]
[1] Issue etcd-io#17737
[2] Supersedes PR (V1) https://github.com/henrybear327/etcd/tree/fix/e2e_blackhole
[3] Superseded PR (V2) etcd-io#17891

Signed-off-by: Chun-Hung Tseng <henrybear327@gmail.com>
  • Loading branch information
henrybear327 committed May 5, 2024
1 parent 989ad8b commit 3e15f96
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 29 deletions.
14 changes: 13 additions & 1 deletion client/pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
)
Expand All @@ -31,7 +33,17 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er
}

t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Proxy: func(req *http.Request) (*url.URL, error) {
// according to the comment of http.ProxyFromEnvironment: if the
// proxy URL is "localhost" (with or without a port number),
// then a nil URL and nil error will be returned.
// Thus, we need to workaround this by manually setting an
// ENV named FORWARD_PROXY and parse the URL (which is a localhost in our case)
if forward_proxy, exists := os.LookupEnv("FORWARD_PROXY"); exists {
return url.Parse(forward_proxy)
}
return http.ProxyFromEnvironment(req)
},
DialContext: (&net.Dialer{
Timeout: dialtimeoutd,
// value taken from http.DefaultTransport
Expand Down
89 changes: 73 additions & 16 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package proxy

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -130,18 +132,21 @@ type Server interface {

// ServerConfig defines proxy server configuration.
type ServerConfig struct {
Logger *zap.Logger
From url.URL
To url.URL
TLSInfo transport.TLSInfo
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
Logger *zap.Logger
From url.URL
To url.URL
TLSInfo transport.TLSInfo
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
IsForwardProxy bool
}

type server struct {
lg *zap.Logger

isForwardProxy bool

from url.URL
fromPort int
to url.URL
Expand Down Expand Up @@ -194,6 +199,8 @@ func NewServer(cfg ServerConfig) Server {
s := &server{
lg: cfg.Logger,

isForwardProxy: cfg.IsForwardProxy,

from: cfg.From,
to: cfg.To,

Expand All @@ -216,10 +223,12 @@ func NewServer(cfg ServerConfig) Server {
if err == nil {
s.fromPort, _ = strconv.Atoi(fromPort)
}
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
if !s.isForwardProxy {
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
}
}

if s.dialTimeout == 0 {
Expand All @@ -239,8 +248,10 @@ func NewServer(cfg ServerConfig) Server {
if strings.HasPrefix(s.from.Scheme, "http") {
s.from.Scheme = "tcp"
}
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
if !s.isForwardProxy {
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
}
}

addr := fmt.Sprintf(":%d", s.fromPort)
Expand Down Expand Up @@ -273,7 +284,10 @@ func (s *server) From() string {
}

func (s *server) To() string {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
if !s.isForwardProxy {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
}
return ""
}

// TODO: implement packet reordering from multiple TCP connections
Expand Down Expand Up @@ -353,6 +367,39 @@ func (s *server) listenAndServe() {
continue
}

parseHeaderForDestination := func() string {
// the first request should always contain a CONNECT header field
// since we set the transport to forward the traffic to the proxy
buf := make([]byte, s.bufferSize)
nr1, err := in.Read(buf)
if err != nil {
if err == io.EOF {
panic("No data available for forward proxy to work on")
}
}
data := buf[:nr1]

// attempt to parse for the HOST from the CONNECT request
req, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(data)))
if err != nil {
panic("Failed to parse header in forward proxy")
}

if req.Method == http.MethodConnect {
// make sure a reply is sent back to the client
connect_response := &http.Response{
StatusCode: 200,
ProtoMajor: 1,
ProtoMinor: 1,
}
connect_response.Write(in)

return req.URL.Host
}

panic("Wrong header type to start the connection")
}

var out net.Conn
if !s.tlsInfo.Empty() {
var tp *http.Transport
Expand All @@ -370,9 +417,19 @@ func (s *server) listenAndServe() {
}
continue
}
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
if s.isForwardProxy {
dest := parseHeaderForDestination()
out, err = tp.DialContext(ctx, "tcp", dest)
} else {
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
}
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
if s.isForwardProxy {
dest := parseHeaderForDestination()
out, err = net.Dial("tcp", dest)
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
}
}
if err != nil {
select {
Expand Down
5 changes: 5 additions & 0 deletions tests/e2e/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
partitionedMember := epc.Procs[mockPartitionNodeIndex]
// Mock partition
proxy := partitionedMember.PeerProxy()
forwardProxy := partitionedMember.PeerForwardProxy()
t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()

t.Logf("Wait 5s for any open connections to expire")
time.Sleep(5 * time.Second)
Expand All @@ -81,6 +84,8 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
forwardProxy.UnblackholeTx()
forwardProxy.UnblackholeRx()

leaderEPC = epc.Procs[epc.WaitLeader(t)]
time.Sleep(5 * time.Second)
Expand Down
22 changes: 19 additions & 3 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,13 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer
func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig {
var curls []string
var curl string
port := cfg.BasePort + 5*i
port := cfg.BasePort + 6*i
clientPort := port
peerPort := port + 1
peerPort := port + 1 // the port that the peer actually listens on
metricsPort := port + 2
peer2Port := port + 3
peer2Port := port + 3 // the port that the peer advertises
clientHTTPPort := port + 4
forwardProxyPort := port + 5 // the port of the forward proxy

if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
Expand All @@ -499,6 +500,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
peerListenURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var proxyCfg *proxy.ServerConfig
var forwardProxyCfg *proxy.ServerConfig
if cfg.PeerProxy {
if !cfg.IsPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
Expand All @@ -509,6 +511,19 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
To: peerListenURL,
From: peerAdvertiseURL,
}

// setup forward proxy
forwardProxyURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", forwardProxyPort)}
forwardProxyCfg = &proxy.ServerConfig{
Logger: zap.NewNop(),
From: forwardProxyURL,
IsForwardProxy: true,
}

if cfg.EnvVars == nil {
cfg.EnvVars = make(map[string]string)
}
cfg.EnvVars["FORWARD_PROXY"] = fmt.Sprintf("http://127.0.0.1:%d", forwardProxyPort)
}

name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i)
Expand Down Expand Up @@ -631,6 +646,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
GoFailPort: gofailPort,
GoFailClientTimeout: cfg.GoFailClientTimeout,
Proxy: proxyCfg,
ForwardProxy: forwardProxyCfg,
LazyFSEnabled: cfg.LazyFSEnabled,
}
}
Expand Down
34 changes: 28 additions & 6 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type EtcdProcess interface {
Close() error
Config() *EtcdServerProcessConfig
PeerProxy() proxy.Server
PeerForwardProxy() proxy.Server
Failpoints() *BinaryFailpoints
LazyFS() *LazyFS
Logs() LogsExpect
Expand All @@ -69,12 +70,13 @@ type LogsExpect interface {
}

type EtcdServerProcess struct {
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
lazyfs *LazyFS
failpoints *BinaryFailpoints
donec chan struct{} // closed when Interact() terminates
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
forwardProxy proxy.Server
lazyfs *LazyFS
failpoints *BinaryFailpoints
donec chan struct{} // closed when Interact() terminates
}

type EtcdServerProcessConfig struct {
Expand Down Expand Up @@ -102,6 +104,7 @@ type EtcdServerProcessConfig struct {

LazyFSEnabled bool
Proxy *proxy.ServerConfig
ForwardProxy *proxy.ServerConfig
}

func NewEtcdServerProcess(t testing.TB, cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) {
Expand Down Expand Up @@ -159,6 +162,14 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error {
case err := <-ep.proxy.Error():
return err
}

ep.cfg.lg.Info("starting forward proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.ForwardProxy.From.String()), zap.String("to", ep.cfg.ForwardProxy.To.String()))
ep.forwardProxy = proxy.NewServer(*ep.cfg.ForwardProxy)
select {
case <-ep.forwardProxy.Ready():
case err := <-ep.forwardProxy.Error():
return err
}
}
if ep.lazyfs != nil {
ep.cfg.lg.Info("starting lazyfs...", zap.String("name", ep.cfg.Name))
Expand Down Expand Up @@ -222,6 +233,13 @@ func (ep *EtcdServerProcess) Stop() (err error) {
}
ep.cfg.lg.Info("stopped server.", zap.String("name", ep.cfg.Name))
if ep.proxy != nil {
ep.cfg.lg.Info("stopping forward proxy...", zap.String("name", ep.cfg.Name))
err = ep.forwardProxy.Close()
ep.forwardProxy = nil
if err != nil {
return err
}

ep.cfg.lg.Info("stopping proxy...", zap.String("name", ep.cfg.Name))
err = ep.proxy.Close()
ep.proxy = nil
Expand Down Expand Up @@ -330,6 +348,10 @@ func (ep *EtcdServerProcess) PeerProxy() proxy.Server {
return ep.proxy
}

func (ep *EtcdServerProcess) PeerForwardProxy() proxy.Server {
return ep.forwardProxy
}

func (ep *EtcdServerProcess) LazyFS() *LazyFS {
return ep.lazyfs
}
Expand Down
9 changes: 6 additions & 3 deletions tests/robustness/failpoint/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,21 @@ func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, proces

func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
proxy := member.PeerProxy()
forwardProxy := member.PeerForwardProxy()

// Blackholing will cause peers to not be able to use streamWriters registered with member
// but peer traffic is still possible because member has 'pipeline' with peers
// TODO: find a way to stop all traffic
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()
defer func() {
t.Logf("Traffic restored from and to member %q", member.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
forwardProxy.UnblackholeTx()
forwardProxy.UnblackholeRx()
}()

if shouldWaitTillSnapshot {
return waitTillSnapshot(ctx, t, clus, member)
}
Expand Down

0 comments on commit 3e15f96

Please sign in to comment.