Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support http2 (#278) #280

Merged
merged 2 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
goVer: ['1.13', '1.17', '1.18']
goVer: ['1.16', '1.17', '1.18']
steps:
- uses: actions/checkout@v2
- name: Setup go ${{ matrix.goVer }}
Expand All @@ -34,7 +34,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
goVer: ['1.13', '1.17', '1.18']
goVer: ['1.16', '1.17', '1.18']
steps:
- uses: actions/checkout@v2
- name: Setup go ${{ matrix.goVer }}
Expand All @@ -59,7 +59,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
goVer: ['1.13', '1.17', '1.18']
goVer: ['1.16', '1.17', '1.18']
steps:
- uses: actions/checkout@v2
- name: Setup go ${{ matrix.goVer }}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
**IMPORTANT: Code of Nebula go client has been transferred from [nebula-clients](https://github.com/vesoft-inc/nebula-clients) to this repository(nebula-go), and new releases in the future will be published in this repository.
Please update your `go.mod` and imports correspondingly.**

Official Nebula Go client which communicates with the server using [fbthrift](https://github.com/facebook/fbthrift/). Currently the latest stable release is **[v3.4.0](https://github.com/vesoft-inc/nebula-go/tree/release-v3.4)**
Official Nebula Go client which communicates with the server using [fbthrift](https://github.com/vesoft-inc/fbthrift/). Currently the latest stable release is **[v3.4.0](https://github.com/vesoft-inc/nebula-go/tree/release-v3.4)**

The code in **master branch** will be updated to accommodate the nightly changes made in NebulaGraph.
To Use the console with a stable release of NebulaGraph, please check the branches and use the corresponding version.
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func logoutAndClose(conn *connection, sessionID int64) {
func TestConnection(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestConnection(t *testing.T) {
func TestConnectionIPv6(t *testing.T) {
hostAddress := HostAddress{Host: addressIPv6, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestAuthentication(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}

conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -1425,7 +1425,7 @@ func prepareSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -1462,7 +1462,7 @@ func dropSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down
11 changes: 11 additions & 0 deletions configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type PoolConfig struct {
MaxConnPoolSize int
// The min connections in pool for all addresses
MinConnPoolSize int
// UseHTTP2 indicates whether to use HTTP2
UseHTTP2 bool
}

// validateConf validates config
Expand Down Expand Up @@ -58,6 +60,7 @@ func GetDefaultConf() PoolConfig {
IdleTime: 0 * time.Millisecond,
MaxConnPoolSize: 10,
MinConnPoolSize: 0,
UseHTTP2: false,
}
}

Expand Down Expand Up @@ -128,6 +131,8 @@ type SessionPoolConf struct {
maxSize int
// The min sessions in pool for all addresses
minSize int
// useHTTP2 indicates whether to use HTTP2
useHTTP2 bool
}

type SessionPoolConfOption func(*SessionPoolConf)
Expand Down Expand Up @@ -192,6 +197,12 @@ func WithMinSize(minSize int) SessionPoolConfOption {
}
}

func WithHTTP2(useHTTP2 bool) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.useHTTP2 = useHTTP2
}
}

func (conf *SessionPoolConf) checkMandatoryFields() error {
// Check mandatory fields
if conf.username == "" {
Expand Down
76 changes: 58 additions & 18 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,26 @@
package nebula_go

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"strconv"
"time"

"github.com/facebook/fbthrift/thrift/lib/go/thrift"
"github.com/vesoft-inc/fbthrift/thrift/lib/go/thrift"
"github.com/vesoft-inc/nebula-go/v3/nebula"
"github.com/vesoft-inc/nebula-go/v3/nebula/graph"
"golang.org/x/net/http2"
)

type connection struct {
severAddress HostAddress
timeout time.Duration
returnedAt time.Time // the connection was created or returned.
sslConfig *tls.Config
useHTTP2 bool
graph *graph.GraphServiceClient
}

Expand All @@ -40,28 +44,64 @@ func newConnection(severAddress HostAddress) *connection {

// open opens a transport for the connection
// if sslConfig is not nil, an SSL transport will be created
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslConfig *tls.Config) error {
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslConfig *tls.Config, useHTTP2 bool) error {
ip := hostAddress.Host
port := hostAddress.Port
newAdd := net.JoinHostPort(ip, strconv.Itoa(port))
cn.timeout = timeout
bufferSize := 128 << 10

var err error
var sock thrift.Transport
if sslConfig != nil {
sock, err = thrift.NewSSLSocketTimeout(newAdd, sslConfig, timeout)
cn.useHTTP2 = useHTTP2

var (
err error
transport thrift.Transport
pf thrift.ProtocolFactory
)
if useHTTP2 {
if sslConfig != nil {
transport, err = thrift.NewHTTPPostClientWithOptions("https://"+newAdd, thrift.HTTPClientOptions{
Client: &http.Client{
Transport: &http2.Transport{
TLSClientConfig: sslConfig,
},
},
})
} else {
transport, err = thrift.NewHTTPPostClientWithOptions("http://"+newAdd, thrift.HTTPClientOptions{
Client: &http.Client{
Transport: &http2.Transport{
// So http2.Transport doesn't complain the URL scheme isn't 'https'
AllowHTTP: true,
// Pretend we are dialing a TLS endpoint. (Note, we ignore the passed tls.Config)
DialTLSContext: func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) {
_ = cfg
var d net.Dialer
return d.DialContext(ctx, network, addr)
},
},
},
})
}
if err != nil {
return fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}
pf = thrift.NewBinaryProtocolFactoryDefault()
} else {
sock, err = thrift.NewSocket(thrift.SocketAddr(newAdd), thrift.SocketTimeout(timeout))
}
if err != nil {
return fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}
bufferSize := 128 << 10

// Set transport
bufferedTranFactory := thrift.NewBufferedTransportFactory(bufferSize)
transport := thrift.NewHeaderTransport(bufferedTranFactory.GetTransport(sock))
pf := thrift.NewHeaderProtocolFactory()
var sock thrift.Transport
if sslConfig != nil {
sock, err = thrift.NewSSLSocketTimeout(newAdd, sslConfig, timeout)
} else {
sock, err = thrift.NewSocket(thrift.SocketAddr(newAdd), thrift.SocketTimeout(timeout))
}
if err != nil {
return fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}
// Set transport
bufferedTranFactory := thrift.NewBufferedTransportFactory(bufferSize)
transport = thrift.NewHeaderTransport(bufferedTranFactory.GetTransport(sock))
pf = thrift.NewHeaderProtocolFactory()
}

cn.graph = graph.NewGraphServiceClientFactory(transport, pf)
if err = cn.graph.Open(); err != nil {
Expand Down Expand Up @@ -92,7 +132,7 @@ func (cn *connection) verifyClientVersion() error {
// When the timeout occurs, the connection will be reopened to avoid the impact of the message.
func (cn *connection) reopen() error {
cn.close()
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig)
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2)
}

// Authenticate
Expand Down
16 changes: 8 additions & 8 deletions connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewSslConnectionPool(addresses []HostAddress, conf PoolConfig, sslConfig *t

// initPool initializes the connection pool
func (pool *ConnectionPool) initPool() error {
if err := checkAddresses(pool.conf.TimeOut, pool.addresses, pool.sslConfig); err != nil {
if err := checkAddresses(pool.conf.TimeOut, pool.addresses, pool.sslConfig, pool.conf.UseHTTP2); err != nil {
return fmt.Errorf("failed to open connection, error: %s ", err.Error())
}

Expand All @@ -79,7 +79,7 @@ func (pool *ConnectionPool) initPool() error {
newConn := newConnection(pool.addresses[i%len(pool.addresses)])

// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig); err != nil {
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig, pool.conf.UseHTTP2); err != nil {
// If initialization failed, clean idle queue
idleLen := pool.idleConnectionQueue.Len()
for i := 0; i < idleLen; i++ {
Expand Down Expand Up @@ -198,7 +198,7 @@ func (pool *ConnectionPool) releaseAndBack(conn *connection, pushBack bool) {

// Ping checks availability of host
func (pool *ConnectionPool) Ping(host HostAddress, timeout time.Duration) error {
return pingAddress(host, timeout, pool.sslConfig)
return pingAddress(host, timeout, pool.sslConfig, pool.conf.UseHTTP2)
}

// Close closes all connection
Expand Down Expand Up @@ -249,7 +249,7 @@ func (pool *ConnectionPool) newConnToHost() (*connection, error) {
host := pool.getHost()
newConn := newConnection(host)
// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig); err != nil {
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig, pool.conf.UseHTTP2); err != nil {
return nil, err
}
// Add connection to active queue
Expand Down Expand Up @@ -356,23 +356,23 @@ func (pool *ConnectionPool) timeoutConnectionList() (closing []*connection) {
// checkAddresses checks addresses availability
// It opens a temporary connection to each address and closes it immediately.
// If no error is returned, the addresses are available.
func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config) error {
func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config, useHTTP2 bool) error {
var timeout = 3 * time.Second
if confTimeout != 0 && confTimeout < timeout {
timeout = confTimeout
}
for _, address := range addresses {
if err := pingAddress(address, timeout, sslConfig); err != nil {
if err := pingAddress(address, timeout, sslConfig, useHTTP2); err != nil {
return err
}
}
return nil
}

func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Config) error {
func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Config, useHTTP2 bool) error {
newConn := newConnection(address)
// Open connection to host
if err := newConn.open(newConn.severAddress, timeout, sslConfig); err != nil {
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2); err != nil {
return err
}
defer newConn.close()
Expand Down
2 changes: 2 additions & 0 deletions examples/basic_example/graph_client_basic_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)

// Initialize logger
Expand All @@ -31,6 +32,7 @@ func main() {
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)

// Initialize logger
Expand All @@ -34,6 +35,7 @@ func main() {
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
2 changes: 2 additions & 0 deletions examples/json_example/parse_json_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)

// Initialize logger
Expand Down Expand Up @@ -77,6 +78,7 @@ func main() {
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
2 changes: 2 additions & 0 deletions examples/parameter_example/parameter_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)

// Initialize logger
Expand All @@ -31,6 +32,7 @@ func main() {
hostList := []nebulago.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebulago.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

// Initialize connection pool
pool, err := nebulago.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
3 changes: 3 additions & 0 deletions examples/session_pool_example/session_pool_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)

// Initialize logger
Expand All @@ -39,6 +40,7 @@ func main() {
"nebula",
[]nebula.HostAddress{hostAddress},
"example_space",
nebula.WithHTTP2(useHTTP2),
)
if err != nil {
log.Fatal(fmt.Sprintf("failed to create session pool config, %s", err.Error()))
Expand Down Expand Up @@ -160,6 +162,7 @@ func prepareSpace() {
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/vesoft-inc/nebula-go/v3
go 1.13

require (
github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295
github.com/stretchr/testify v1.7.0
github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28
golang.org/x/net v0.5.0
)
Loading