Skip to content

Commit

Permalink
Cherry pick from 3.4-beta-sc: support http2 (#278)
Browse files Browse the repository at this point in the history
* feat: support http2 (#246)

* update fbthrift (#255)

* fix conflicts

* remove ssl examples

---------

Co-authored-by: Vee Zhang <veezhang@126.com>
  • Loading branch information
Sophie-Xie and veezhang committed Aug 8, 2023
1 parent 71a8713 commit 278925f
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 40 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
goVer: ['1.13', '1.17', '1.18']
goVer: ['1.16', '1.17', '1.18']
steps:
- uses: actions/checkout@v2
- name: Setup go ${{ matrix.goVer }}
Expand All @@ -34,7 +34,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
goVer: ['1.13', '1.17', '1.18']
goVer: ['1.16', '1.17', '1.18']
steps:
- uses: actions/checkout@v2
- name: Setup go ${{ matrix.goVer }}
Expand All @@ -59,7 +59,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
goVer: ['1.13', '1.17', '1.18']
goVer: ['1.16', '1.17', '1.18']
steps:
- uses: actions/checkout@v2
- name: Setup go ${{ matrix.goVer }}
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func logoutAndClose(conn *connection, sessionID int64) {
func TestConnection(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestConnection(t *testing.T) {
func TestConnectionIPv6(t *testing.T) {
hostAddress := HostAddress{Host: addressIPv6, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestAuthentication(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}

conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -1425,7 +1425,7 @@ func prepareSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -1462,7 +1462,7 @@ func dropSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down
11 changes: 11 additions & 0 deletions configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type PoolConfig struct {
MaxConnPoolSize int
// The min connections in pool for all addresses
MinConnPoolSize int
// UseHTTP2 indicates whether to use HTTP2
UseHTTP2 bool
}

// validateConf validates config
Expand Down Expand Up @@ -58,6 +60,7 @@ func GetDefaultConf() PoolConfig {
IdleTime: 0 * time.Millisecond,
MaxConnPoolSize: 10,
MinConnPoolSize: 0,
UseHTTP2: false,
}
}

Expand Down Expand Up @@ -128,6 +131,8 @@ type SessionPoolConf struct {
maxSize int
// The min sessions in pool for all addresses
minSize int
// useHTTP2 indicates whether to use HTTP2
useHTTP2 bool
}

type SessionPoolConfOption func(*SessionPoolConf)
Expand Down Expand Up @@ -192,6 +197,12 @@ func WithMinSize(minSize int) SessionPoolConfOption {
}
}

func WithHTTP2(useHTTP2 bool) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.useHTTP2 = useHTTP2
}
}

func (conf *SessionPoolConf) checkMandatoryFields() error {
// Check mandatory fields
if conf.username == "" {
Expand Down
74 changes: 57 additions & 17 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,26 @@
package nebula_go

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"strconv"
"time"

"github.com/facebook/fbthrift/thrift/lib/go/thrift"
"github.com/vesoft-inc/nebula-go/v3/nebula"
"github.com/vesoft-inc/nebula-go/v3/nebula/graph"
"golang.org/x/net/http2"
)

type connection struct {
severAddress HostAddress
timeout time.Duration
returnedAt time.Time // the connection was created or returned.
sslConfig *tls.Config
useHTTP2 bool
graph *graph.GraphServiceClient
}

Expand All @@ -40,28 +44,64 @@ func newConnection(severAddress HostAddress) *connection {

// open opens a transport for the connection
// if sslConfig is not nil, an SSL transport will be created
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslConfig *tls.Config) error {
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslConfig *tls.Config, useHTTP2 bool) error {
ip := hostAddress.Host
port := hostAddress.Port
newAdd := net.JoinHostPort(ip, strconv.Itoa(port))
cn.timeout = timeout
bufferSize := 128 << 10

var err error
var sock thrift.Transport
if sslConfig != nil {
sock, err = thrift.NewSSLSocketTimeout(newAdd, sslConfig, timeout)
cn.useHTTP2 = useHTTP2

var (
err error
transport thrift.Transport
pf thrift.ProtocolFactory
)
if useHTTP2 {
if sslConfig != nil {
transport, err = thrift.NewHTTPPostClientWithOptions("https://"+newAdd, thrift.HTTPClientOptions{
Client: &http.Client{
Transport: &http2.Transport{
TLSClientConfig: sslConfig,
},
},
})
} else {
transport, err = thrift.NewHTTPPostClientWithOptions("http://"+newAdd, thrift.HTTPClientOptions{
Client: &http.Client{
Transport: &http2.Transport{
// So http2.Transport doesn't complain the URL scheme isn't 'https'
AllowHTTP: true,
// Pretend we are dialing a TLS endpoint. (Note, we ignore the passed tls.Config)
DialTLSContext: func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) {
_ = cfg
var d net.Dialer
return d.DialContext(ctx, network, addr)
},
},
},
})
}
if err != nil {
return fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}
pf = thrift.NewBinaryProtocolFactoryDefault()
} else {
sock, err = thrift.NewSocket(thrift.SocketAddr(newAdd), thrift.SocketTimeout(timeout))
}
if err != nil {
return fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}
bufferSize := 128 << 10

// Set transport
bufferedTranFactory := thrift.NewBufferedTransportFactory(bufferSize)
transport := thrift.NewHeaderTransport(bufferedTranFactory.GetTransport(sock))
pf := thrift.NewHeaderProtocolFactory()
var sock thrift.Transport
if sslConfig != nil {
sock, err = thrift.NewSSLSocketTimeout(newAdd, sslConfig, timeout)
} else {
sock, err = thrift.NewSocket(thrift.SocketAddr(newAdd), thrift.SocketTimeout(timeout))
}
if err != nil {
return fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}
// Set transport
bufferedTranFactory := thrift.NewBufferedTransportFactory(bufferSize)
transport = thrift.NewHeaderTransport(bufferedTranFactory.GetTransport(sock))
pf = thrift.NewHeaderProtocolFactory()
}

cn.graph = graph.NewGraphServiceClientFactory(transport, pf)
if err = cn.graph.Open(); err != nil {
Expand Down Expand Up @@ -92,7 +132,7 @@ func (cn *connection) verifyClientVersion() error {
// When the timeout occurs, the connection will be reopened to avoid the impact of the message.
func (cn *connection) reopen() error {
cn.close()
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig)
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2)
}

// Authenticate
Expand Down
16 changes: 8 additions & 8 deletions connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewSslConnectionPool(addresses []HostAddress, conf PoolConfig, sslConfig *t

// initPool initializes the connection pool
func (pool *ConnectionPool) initPool() error {
if err := checkAddresses(pool.conf.TimeOut, pool.addresses, pool.sslConfig); err != nil {
if err := checkAddresses(pool.conf.TimeOut, pool.addresses, pool.sslConfig, pool.conf.UseHTTP2); err != nil {
return fmt.Errorf("failed to open connection, error: %s ", err.Error())
}

Expand All @@ -79,7 +79,7 @@ func (pool *ConnectionPool) initPool() error {
newConn := newConnection(pool.addresses[i%len(pool.addresses)])

// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig); err != nil {
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig, pool.conf.UseHTTP2); err != nil {
// If initialization failed, clean idle queue
idleLen := pool.idleConnectionQueue.Len()
for i := 0; i < idleLen; i++ {
Expand Down Expand Up @@ -198,7 +198,7 @@ func (pool *ConnectionPool) releaseAndBack(conn *connection, pushBack bool) {

// Ping checks availability of host
func (pool *ConnectionPool) Ping(host HostAddress, timeout time.Duration) error {
return pingAddress(host, timeout, pool.sslConfig)
return pingAddress(host, timeout, pool.sslConfig, pool.conf.UseHTTP2)
}

// Close closes all connection
Expand Down Expand Up @@ -249,7 +249,7 @@ func (pool *ConnectionPool) newConnToHost() (*connection, error) {
host := pool.getHost()
newConn := newConnection(host)
// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig); err != nil {
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig, pool.conf.UseHTTP2); err != nil {
return nil, err
}
// Add connection to active queue
Expand Down Expand Up @@ -356,23 +356,23 @@ func (pool *ConnectionPool) timeoutConnectionList() (closing []*connection) {
// checkAddresses checks addresses availability
// It opens a temporary connection to each address and closes it immediately.
// If no error is returned, the addresses are available.
func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config) error {
func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config, useHTTP2 bool) error {
var timeout = 3 * time.Second
if confTimeout != 0 && confTimeout < timeout {
timeout = confTimeout
}
for _, address := range addresses {
if err := pingAddress(address, timeout, sslConfig); err != nil {
if err := pingAddress(address, timeout, sslConfig, useHTTP2); err != nil {
return err
}
}
return nil
}

func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Config) error {
func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Config, useHTTP2 bool) error {
newConn := newConnection(address)
// Open connection to host
if err := newConn.open(newConn.severAddress, timeout, sslConfig); err != nil {
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2); err != nil {
return err
}
defer newConn.close()
Expand Down
2 changes: 2 additions & 0 deletions examples/basic_example/graph_client_basic_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)

// Initialize logger
Expand All @@ -31,6 +32,7 @@ func main() {
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)

// Initialize logger
Expand All @@ -34,6 +35,7 @@ func main() {
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
2 changes: 2 additions & 0 deletions examples/json_example/parse_json_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)

// Initialize logger
Expand Down Expand Up @@ -77,6 +78,7 @@ func main() {
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
2 changes: 2 additions & 0 deletions examples/parameter_example/parameter_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)

// Initialize logger
Expand All @@ -31,6 +32,7 @@ func main() {
hostList := []nebulago.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebulago.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

// Initialize connection pool
pool, err := nebulago.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
3 changes: 3 additions & 0 deletions examples/session_pool_example/session_pool_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)

// Initialize logger
Expand All @@ -39,6 +40,7 @@ func main() {
"nebula",
[]nebula.HostAddress{hostAddress},
"example_space",
nebula.WithHTTP2(useHTTP2),
)
if err != nil {
log.Fatal(fmt.Sprintf("failed to create session pool config, %s", err.Error()))
Expand Down Expand Up @@ -160,6 +162,7 @@ func prepareSpace() {
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ go 1.13
require (
github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.5.0
)

replace github.com/facebook/fbthrift => github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28
Loading

0 comments on commit 278925f

Please sign in to comment.