Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mysql add support for different protocols #6138

Merged
merged 8 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions receiver/mysqlreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ The following settings are optional:

- `collection_interval` (default = `10s`): This receiver collects metrics on an interval. This value must be a string readable by Golang's [time.ParseDuration](https://pkg.go.dev/time#ParseDuration). Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.

- `transport`: (default = `tcp`): Defines the network to use for connecting to the server.

### Example Configuration

```yaml
Expand Down
34 changes: 18 additions & 16 deletions receiver/mysqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,34 @@ import (
)

type client interface {
Connect() error
getGlobalStats() (map[string]string, error)
getInnodbStats() (map[string]string, error)
Close() error
}

type mySQLClient struct {
client *sql.DB
connStr string
client *sql.DB
}

var _ client = (*mySQLClient)(nil)

type mySQLConfig struct {
username string
password string
database string
endpoint string
}
func newMySQLClient(conf *Config) client {
connStr := fmt.Sprintf("%s:%s@%s(%s)/%s", conf.Username, conf.Password, conf.Transport, conf.Endpoint, conf.Database)

func newMySQLClient(conf mySQLConfig) (*mySQLClient, error) {
connStr := fmt.Sprintf("%s:%s@tcp(%s)/%s", conf.username, conf.password, conf.endpoint, conf.database)
return &mySQLClient{
connStr: connStr,
}
}

db, err := sql.Open("mysql", connStr)
func (c *mySQLClient) Connect() error {
clientDB, err := sql.Open("mysql", c.connStr)
if err != nil {
return nil, err
return fmt.Errorf("unable to connect to database: %w", err)
}

return &mySQLClient{
client: db,
}, nil
c.client = clientDB
return nil
}

// getGlobalStats queries the db for global status metrics.
Expand Down Expand Up @@ -85,5 +84,8 @@ func Query(c mySQLClient, query string) (map[string]string, error) {
}

func (c *mySQLClient) Close() error {
return c.client.Close()
if c.client != nil {
return c.client.Close()
}
return nil
}
7 changes: 5 additions & 2 deletions receiver/mysqlreceiver/client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import (

var _ client = (*fakeClient)(nil)

type fakeClient struct {
}
type fakeClient struct{}

func readFile(fname string) (map[string]string, error) {
var stats = map[string]string{}
Expand All @@ -42,6 +41,10 @@ func readFile(fname string) (map[string]string, error) {
return stats, nil
}

func (c *fakeClient) Connect() error {
return nil
}

func (c *fakeClient) getGlobalStats() (map[string]string, error) {
return readFile("global_stats")
}
Expand Down
3 changes: 2 additions & 1 deletion receiver/mysqlreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package mysqlreceiver
import (
"errors"

"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/multierr"
)
Expand All @@ -26,7 +27,7 @@ type Config struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Database string `mapstructure:"database"`
Endpoint string `mapstructure:"endpoint"`
confignet.NetAddr `mapstructure:",squash"`
}

// Errors for missing required config parameters.
Expand Down
6 changes: 5 additions & 1 deletion receiver/mysqlreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/collector/receiver/scraperhelper"
Expand All @@ -44,7 +45,10 @@ func createDefaultConfig() config.Receiver {
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
CollectionInterval: 10 * time.Second,
},
Endpoint: "localhost:3306",
NetAddr: confignet.NetAddr{
Endpoint: "localhost:3306",
Transport: "tcp",
},
}
}

Expand Down
5 changes: 4 additions & 1 deletion receiver/mysqlreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)
Expand Down Expand Up @@ -53,7 +54,9 @@ func TestCreateMetricsReceiver(t *testing.T) {
},
Username: "otel",
Password: "otel",
Endpoint: "localhost:3306",
NetAddr: confignet.NetAddr{
Endpoint: "localhost:3306",
},
},
consumertest.NewNop(),
)
Expand Down
2 changes: 1 addition & 1 deletion receiver/mysqlreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/go-sql-driver/mysql v1.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/scrapertest v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/internal/scrapertest v0.0.0-20211110181051-0d46baeab9e9
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.38.1-0.20211103215828-cffbecb2ac9e
go.opentelemetry.io/collector/model v0.38.1-0.20211103215828-cffbecb2ac9e
Expand Down
24 changes: 10 additions & 14 deletions receiver/mysqlreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
)

type mySQLScraper struct {
client client
stopOnce sync.Once
sqlclient client
stopOnce sync.Once

logger *zap.Logger
config *Config
Expand All @@ -48,16 +48,13 @@ func newMySQLScraper(

// start starts the scraper by initializing the db client connection.
func (m *mySQLScraper) start(_ context.Context, host component.Host) error {
client, err := newMySQLClient(mySQLConfig{
username: m.config.Username,
password: m.config.Password,
database: m.config.Database,
endpoint: m.config.Endpoint,
})
sqlclient := newMySQLClient(m.config)

err := sqlclient.Connect()
if err != nil {
return err
}
m.client = client
m.sqlclient = sqlclient

return nil
}
Expand All @@ -66,7 +63,7 @@ func (m *mySQLScraper) start(_ context.Context, host component.Host) error {
func (m *mySQLScraper) shutdown(context.Context) error {
var err error
m.stopOnce.Do(func() {
err = m.client.Close()
err = m.sqlclient.Close()
})
Comment on lines 65 to 67
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djaglowski this seems wrong:

  1. It is guarantee that shutdown is called only once (so no need to protect with once).
  2. If start fails sqlclient is nil and this will crash here.

Probably fix this in a separate PR.

return err
}
Expand Down Expand Up @@ -100,8 +97,7 @@ func addToIntMetric(metric pdata.NumberDataPointSlice, labels pdata.AttributeMap

// scrape scrapes the mysql db metric stats, transforms them and labels them into a metric slices.
func (m *mySQLScraper) scrape(context.Context) (pdata.Metrics, error) {

if m.client == nil {
if m.sqlclient == nil {
return pdata.Metrics{}, errors.New("failed to connect to http client")
}

Expand All @@ -127,7 +123,7 @@ func (m *mySQLScraper) scrape(context.Context) (pdata.Metrics, error) {
threads := initMetric(ilm.Metrics(), metadata.M.MysqlThreads).Sum().DataPoints()

// collect innodb metrics.
innodbStats, err := m.client.getInnodbStats()
innodbStats, err := m.sqlclient.getInnodbStats()
if err != nil {
m.logger.Error("Failed to fetch InnoDB stats", zap.Error(err))
}
Expand All @@ -144,7 +140,7 @@ func (m *mySQLScraper) scrape(context.Context) (pdata.Metrics, error) {
}

// collect global status metrics.
globalStats, err := m.client.getGlobalStats()
globalStats, err := m.sqlclient.getGlobalStats()
if err != nil {
m.logger.Error("Failed to fetch global stats", zap.Error(err))
return pdata.Metrics{}, err
Expand Down
7 changes: 5 additions & 2 deletions receiver/mysqlreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/confignet"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/scrapertest"
Expand All @@ -30,9 +31,11 @@ func TestScrape(t *testing.T) {
sc := newMySQLScraper(zap.NewNop(), &Config{
Username: "otel",
Password: "otel",
Endpoint: "localhost:3306",
NetAddr: confignet.NetAddr{
Endpoint: "localhost:3306",
},
})
sc.client = &mysqlMock
sc.sqlclient = &mysqlMock

scrapedRMS, err := sc.scrape(context.Background())
require.NoError(t, err)
Expand Down