diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2a25847adb80..54fbb8630d56 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -133,6 +133,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Require client_auth by default when ssl is enabled for module http metricset server{pull}12333[12333] - The `elasticsearch/index_summary` metricset gracefully handles an empty Elasticsearch cluster when `xpack.enabled: true` is set. {pull}12489[12489] {issue}12487[12487] - When TLS is configured for the http metricset and a `certificate_authorities` is configured we now default to `required` for the `client_authentication`. {pull}12584[12584] +- Reuse connections in PostgreSQL metricsets. {issue}12504[12504] {pull}12603[12603] *Packetbeat* diff --git a/metricbeat/module/postgresql/activity/activity.go b/metricbeat/module/postgresql/activity/activity.go index 36e5d9f690f3..ca519dc4442a 100644 --- a/metricbeat/module/postgresql/activity/activity.go +++ b/metricbeat/module/postgresql/activity/activity.go @@ -18,15 +18,12 @@ package activity import ( - "database/sql" + "context" "github.com/pkg/errors" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/postgresql" - - // Register postgresql database/sql driver - _ "github.com/lib/pq" ) // init registers the MetricSet with the central registry. @@ -40,27 +37,26 @@ func init() { // MetricSet type defines all fields of the Postgresql MetricSet type MetricSet struct { - mb.BaseMetricSet + *postgresql.MetricSet } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - return &MetricSet{BaseMetricSet: base}, nil + ms, err := postgresql.NewMetricSet(base) + if err != nil { + return nil, err + } + return &MetricSet{MetricSet: ms}, nil } // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - db, err := sql.Open("postgres", m.HostData().URI) - if err != nil { - return errors.Wrap(err, "error in Open") - } - defer db.Close() - - results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_activity") + ctx := context.Background() + results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_activity") if err != nil { return errors.Wrap(err, "error in QueryStats") } diff --git a/metricbeat/module/postgresql/bgwriter/bgwriter.go b/metricbeat/module/postgresql/bgwriter/bgwriter.go index c5a0d8244174..464c87dd6d30 100644 --- a/metricbeat/module/postgresql/bgwriter/bgwriter.go +++ b/metricbeat/module/postgresql/bgwriter/bgwriter.go @@ -18,16 +18,13 @@ package bgwriter import ( - "database/sql" + "context" "fmt" "github.com/pkg/errors" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/postgresql" - - // Register postgresql database/sql driver - _ "github.com/lib/pq" ) // init registers the MetricSet with the central registry. @@ -41,25 +38,24 @@ func init() { // MetricSet type defines all fields of the MetricSet type MetricSet struct { - mb.BaseMetricSet + *postgresql.MetricSet } // New create a new instance of the MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - return &MetricSet{BaseMetricSet: base}, nil + ms, err := postgresql.NewMetricSet(base) + if err != nil { + return nil, err + } + return &MetricSet{MetricSet: ms}, nil } // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - db, err := sql.Open("postgres", m.HostData().URI) - if err != nil { - return errors.Wrap(err, "error in Open") - } - defer db.Close() - - results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_bgwriter") + ctx := context.Background() + results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_bgwriter") if err != nil { return errors.Wrap(err, "error in QueryStats") } diff --git a/metricbeat/module/postgresql/database/database.go b/metricbeat/module/postgresql/database/database.go index 089487d88282..4a960b81ead9 100644 --- a/metricbeat/module/postgresql/database/database.go +++ b/metricbeat/module/postgresql/database/database.go @@ -18,7 +18,7 @@ package database import ( - "database/sql" + "context" "github.com/pkg/errors" @@ -40,25 +40,24 @@ func init() { // MetricSet type defines all fields of the MetricSet type MetricSet struct { - mb.BaseMetricSet + *postgresql.MetricSet } // New create a new instance of the postgresql database MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - return &MetricSet{BaseMetricSet: base}, nil + ms, err := postgresql.NewMetricSet(base) + if err != nil { + return nil, err + } + return &MetricSet{MetricSet: ms}, nil } // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - db, err := sql.Open("postgres", m.HostData().URI) - if err != nil { - return errors.Wrap(err, "error in Open") - } - defer db.Close() - - results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_database") + ctx := context.Background() + results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_database") if err != nil { return errors.Wrap(err, "error in QueryStats") } diff --git a/metricbeat/module/postgresql/metricset.go b/metricbeat/module/postgresql/metricset.go new file mode 100644 index 000000000000..07cd29f35049 --- /dev/null +++ b/metricbeat/module/postgresql/metricset.go @@ -0,0 +1,104 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package postgresql + +import ( + "context" + "database/sql" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" + + // Register postgresql database/sql driver + _ "github.com/lib/pq" +) + +type MetricSet struct { + mb.BaseMetricSet + + db *sql.DB +} + +// NewMetricSet creates a PostgreSQL metricset with a pool of connections +func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { + return &MetricSet{BaseMetricSet: base}, nil +} + +// DB creates a database connection, it must be freed after use with `Close()` +func (ms *MetricSet) DB(ctx context.Context) (*sql.Conn, error) { + if ms.db == nil { + db, err := sql.Open("postgres", ms.HostData().URI) + if err != nil { + return nil, errors.Wrap(err, "failed to open connection") + } + ms.db = db + } + return ms.db.Conn(ctx) +} + +//QueryStats makes the database call for a given metric +func (ms *MetricSet) QueryStats(ctx context.Context, query string) ([]map[string]interface{}, error) { + db, err := ms.DB(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to obtain a connection with the database") + } + defer db.Close() + + rows, err := db.QueryContext(ctx, query) + if err != nil { + return nil, errors.Wrap(err, "failed to query database") + } + + columns, err := rows.Columns() + if err != nil { + return nil, errors.Wrap(err, "scanning columns") + } + vals := make([][]byte, len(columns)) + valPointers := make([]interface{}, len(columns)) + for i := range vals { + valPointers[i] = &vals[i] + } + + results := []map[string]interface{}{} + + for rows.Next() { + err = rows.Scan(valPointers...) + if err != nil { + return nil, errors.Wrap(err, "scanning row") + } + + result := map[string]interface{}{} + for i, col := range columns { + result[col] = string(vals[i]) + } + + logp.Debug("postgresql", "Result: %v", result) + results = append(results, result) + } + return results, nil +} + +// Close closes the metricset and its connections +func (ms *MetricSet) Close() error { + if ms.db == nil { + return nil + } + return errors.Wrap(ms.db.Close(), "failed to close connection") +} diff --git a/metricbeat/module/postgresql/postgresql.go b/metricbeat/module/postgresql/postgresql.go index 48ad384723e1..043aec486917 100644 --- a/metricbeat/module/postgresql/postgresql.go +++ b/metricbeat/module/postgresql/postgresql.go @@ -21,16 +21,13 @@ Package postgresql is Metricbeat module for PostgreSQL server. package postgresql import ( - "database/sql" "fmt" "net/url" "strconv" "strings" "github.com/lib/pq" - "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -103,39 +100,3 @@ func ParseURL(mod mb.Module, rawURL string) (mb.HostData, error) { return h, nil } - -//QueryStats makes the database call for a given metric -func QueryStats(db *sql.DB, query string) ([]map[string]interface{}, error) { - rows, err := db.Query(query) - if err != nil { - return nil, err - } - - columns, err := rows.Columns() - if err != nil { - return nil, errors.Wrap(err, "scanning columns") - } - vals := make([][]byte, len(columns)) - valPointers := make([]interface{}, len(columns)) - for i := range vals { - valPointers[i] = &vals[i] - } - - results := []map[string]interface{}{} - - for rows.Next() { - err = rows.Scan(valPointers...) - if err != nil { - return nil, errors.Wrap(err, "scanning row") - } - - result := map[string]interface{}{} - for i, col := range columns { - result[col] = string(vals[i]) - } - - logp.Debug("postgresql", "Result: %v", result) - results = append(results, result) - } - return results, nil -} diff --git a/metricbeat/module/postgresql/statement/statement.go b/metricbeat/module/postgresql/statement/statement.go index 9a3d7cea9ba0..ed58361702d9 100644 --- a/metricbeat/module/postgresql/statement/statement.go +++ b/metricbeat/module/postgresql/statement/statement.go @@ -18,15 +18,12 @@ package statement import ( - "database/sql" + "context" "github.com/pkg/errors" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/postgresql" - - // Register postgresql database/sql driver - _ "github.com/lib/pq" ) // init registers the MetricSet with the central registry as soon as the program @@ -44,33 +41,25 @@ func init() { // mb.BaseMetricSet because it implements all of the required mb.MetricSet // interface methods except for Fetch. type MetricSet struct { - mb.BaseMetricSet + *postgresql.MetricSet } // New creates a new instance of the MetricSet. New is responsible for unpacking // any MetricSet specific configuration options if there are any. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - config := struct{}{} - if err := base.Module().UnpackConfig(&config); err != nil { + ms, err := postgresql.NewMetricSet(base) + if err != nil { return nil, err } - - return &MetricSet{ - BaseMetricSet: base, - }, nil + return &MetricSet{MetricSet: ms}, nil } // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - db, err := sql.Open("postgres", m.HostData().URI) - if err != nil { - return errors.Wrap(err, "error in Open") - } - defer db.Close() - - results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_statements") + ctx := context.Background() + results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_statements") if err != nil { return errors.Wrap(err, "QueryStats") }