Skip to content

Commit

Permalink
thriftbp: Support unix domain socket on thrift clients
Browse files Browse the repository at this point in the history
Also update thrifttest to use UDS when possible.
  • Loading branch information
fishy committed May 3, 2024
1 parent 039b291 commit f41c106
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 12 deletions.
2 changes: 2 additions & 0 deletions thriftbp/client_middlewares_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ func TestRetry(t *testing.T) {
handler := BaseplateService{}
processor := baseplatethrift.NewBaseplateServiceV2Processor(&handler)
server, err := thrifttest.NewBaseplateServer(thrifttest.ServerConfig{
TB: t,
Processor: processor,
SecretStore: store,
ClientConfig: thriftbp.ClientPoolConfig{
Expand Down Expand Up @@ -556,6 +557,7 @@ func (srv mockBaseplateService) IsHealthy(ctx context.Context, req *baseplatethr

func setupFake(ctx context.Context, t *testing.T, handler baseplatethrift.BaseplateServiceV2, slug string) thriftbp.ClientPool {
srv, err := thrifttest.NewBaseplateServer(thrifttest.ServerConfig{
TB: t,
Processor: baseplatethrift.NewBaseplateServiceV2Processor(handler),
ClientConfig: thriftbp.ClientPoolConfig{
ServiceSlug: slug,
Expand Down
27 changes: 24 additions & 3 deletions thriftbp/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"
"strconv"
"strings"
"time"

"github.com/apache/thrift/lib/go/thrift"
Expand Down Expand Up @@ -61,8 +62,19 @@ type ClientPoolConfig struct {
// ImageUploadService -> image-upload
ServiceSlug string `yaml:"serviceSlug"`

// Addr is the address of a thrift service. Addr must be in the format
// "${host}:${port}"
// Addr is the address of a thrift service.
//
// Addr must be in one of the following formats:
//
// - "${host}:${port}" for TCP
// - "unix://${path}" for Unix Domain Socket
//
// NOTE: When using unix domain socket with an absolute path, there must be 3
// slashes after "unix:", with the third slash being the root. For example,
// "unix:///var/run/thrift.socket" means Unix Domain Socket to
// "/var/run/thrift.socket" (an absolute path), while
// "unix://var/run/thrift.socket" means Unix Domain Socket to
// "var/run/thrift.socket" (a relative path).
Addr string `yaml:"addr"`

// InitialConnections is the desired inital number of thrift connections
Expand Down Expand Up @@ -561,7 +573,16 @@ func newClient(
return nil, nil, fmt.Errorf("thriftbp: error getting next address for new Thrift client: %w", err)
}

transport := thrift.NewTSocketConf(addr, cfg)
var transport thrift.TTransport

if path, ok := strings.CutPrefix(addr, "unix://"); ok {
transport = thrift.NewTSocketFromAddrConf(&net.UnixAddr{
Net: "unix",
Name: path,
}, cfg)
} else {
transport = thrift.NewTSocketConf(addr, cfg)
}
if err := transport.Open(); err != nil {
return nil, nil, fmt.Errorf("thriftbp: error opening TSocket for new Thrift client: %w", err)
}
Expand Down
57 changes: 57 additions & 0 deletions thriftbp/client_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"path/filepath"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -217,6 +218,7 @@ func TestThriftHostnameHeader(t *testing.T) {

handler := thriftHostnameHandler{}
server, err := thrifttest.NewBaseplateServer(thrifttest.ServerConfig{
TB: t,
Processor: baseplatethrift.NewBaseplateServiceV2Processor(&handler),
SecretStore: store,
ClientConfig: thriftbp.ClientPoolConfig{
Expand Down Expand Up @@ -303,3 +305,58 @@ func TestInitialConnectionsFallback(t *testing.T) {
})
}
}

func TestUDS(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "socket")

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

store := newSecretsStore(t)
t.Cleanup(func() {
store.Close()
})

handler := thriftHostnameHandler{}
server, err := thriftbp.NewServer(thriftbp.ServerConfig{
Processor: baseplatethrift.NewBaseplateServiceV2Processor(&handler),
Socket: thrift.NewTServerSocketFromAddrTimeout(&net.UnixAddr{
Net: "unix",
Name: path,
}, 0),
})
if err != nil {
t.Fatal(err)
}

bp := baseplate.NewTestBaseplate(baseplate.NewTestBaseplateArgs{
Store: store,
})

handler.server = thriftbp.ApplyBaseplate(bp, server)
go server.Serve()
// give the server a little time to start serving
time.Sleep(10 * time.Millisecond)
t.Cleanup(func() {
server.Stop()
})

pool, err := thriftbp.NewBaseplateClientPool(thriftbp.ClientPoolConfig{
ServiceSlug: "test",
Addr: "unix://" + path,
MaxConnections: 10,
ThriftHostnameHeader: "my-thrift-header",
})
if err != nil {
t.Fatalf("Failed to create client pool: %v", err)
}
t.Cleanup(func() {
pool.Close()
})
client := baseplatethrift.NewBaseplateServiceV2Client(pool.TClient())
_, err = client.IsHealthy(ctx, &baseplatethrift.IsHealthyRequest{})
if err != nil {
t.Fatal(err)
}
}
42 changes: 33 additions & 9 deletions thriftbp/thrifttest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package thrifttest

import (
"context"
"net"
"path/filepath"
"testing"
"time"

"github.com/apache/thrift/lib/go/thrift"
Expand Down Expand Up @@ -62,11 +65,18 @@ type ServerConfig struct {
// Required, the secret store.
SecretStore *secrets.Store

// Optional but highly recommended.
//
// When TB is non-nil, the server and client will communicate via unix domain
// socket under TB.TempDir(). When TB is nil, they use a local TCP port
// instead.
TB testing.TB

// ServerConfig is an optional value, sane defaults will be chosen where
// appropriate.
//
// ServerConfig.Socket will always be replaced with one created in
// NewBaseplateServer using the local loopback address.
// NewBaseplateServer using the local loopback address or unix domain socket.
ServerConfig baseplate.Config

// ClientConfig is an optional value, sane defaults will be chosen where
Expand Down Expand Up @@ -131,20 +141,34 @@ func (s *Server) Close() error {
// This is inspired by httptest.NewServer from the go standard library and can
// be used to test a thrift service.
func NewBaseplateServer(cfg ServerConfig) (*Server, error) {
socket, err := thrift.NewTServerSocket(loopbackAddr)
if err != nil {
return nil, err
}
// Call listen to reserve a port and check for any issues early
if err := socket.Listen(); err != nil {
return nil, err
var socket *thrift.TServerSocket
var addr string
if cfg.TB != nil {
dir := cfg.TB.TempDir()
path := filepath.Join(dir, "socket")
socket = thrift.NewTServerSocketFromAddrTimeout(&net.UnixAddr{
Net: "unix",
Name: path,
}, 0)
addr = "unix://" + path
} else {
s, err := thrift.NewTServerSocket(loopbackAddr)
if err != nil {
return nil, err
}
// Call listen to reserve a port and check for any issues early
if err := s.Listen(); err != nil {
return nil, err
}
addr = s.Addr().String()
socket = s
}

if cfg.EdgeContextImpl == nil {
cfg.EdgeContextImpl = ecinterface.Mock()
}

cfg.ServerConfig.Addr = socket.Addr().String()
cfg.ServerConfig.Addr = addr
bp := baseplate.NewTestBaseplate(baseplate.NewTestBaseplateArgs{
Config: cfg.ServerConfig,
Store: cfg.SecretStore,
Expand Down

0 comments on commit f41c106

Please sign in to comment.