Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse connections in postgresql metricsets #12603

Merged
merged 1 commit into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Require client_auth by default when ssl is enabled for module http metricset server{pull}12333[12333]
- The `elasticsearch/index_summary` metricset gracefully handles an empty Elasticsearch cluster when `xpack.enabled: true` is set. {pull}12489[12489] {issue}12487[12487]
- When TLS is configured for the http metricset and a `certificate_authorities` is configured we now default to `required` for the `client_authentication`. {pull}12584[12584]
- Reuse connections in PostgreSQL metricsets. {issue}12504[12504] {pull}12603[12603]

*Packetbeat*

Expand Down
22 changes: 9 additions & 13 deletions metricbeat/module/postgresql/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
package activity

import (
"database/sql"
"context"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/postgresql"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

// init registers the MetricSet with the central registry.
Expand All @@ -40,27 +37,26 @@ func init() {

// MetricSet type defines all fields of the Postgresql MetricSet
type MetricSet struct {
mb.BaseMetricSet
*postgresql.MetricSet
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
ms, err := postgresql.NewMetricSet(base)
if err != nil {
return nil, err
}
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_activity")
ctx := context.Background()
results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_activity")
if err != nil {
return errors.Wrap(err, "error in QueryStats")
}
Expand Down
22 changes: 9 additions & 13 deletions metricbeat/module/postgresql/bgwriter/bgwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
package bgwriter

import (
"database/sql"
"context"
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/postgresql"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

// init registers the MetricSet with the central registry.
Expand All @@ -41,25 +38,24 @@ func init() {

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*postgresql.MetricSet
}

// New create a new instance of the MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
ms, err := postgresql.NewMetricSet(base)
if err != nil {
return nil, err
}
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_bgwriter")
ctx := context.Background()
results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_bgwriter")
if err != nil {
return errors.Wrap(err, "error in QueryStats")
}
Expand Down
19 changes: 9 additions & 10 deletions metricbeat/module/postgresql/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package database

import (
"database/sql"
"context"

"github.com/pkg/errors"

Expand All @@ -40,25 +40,24 @@ func init() {

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*postgresql.MetricSet
}

// New create a new instance of the postgresql database MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
ms, err := postgresql.NewMetricSet(base)
if err != nil {
return nil, err
}
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_database")
ctx := context.Background()
results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_database")
if err != nil {
return errors.Wrap(err, "error in QueryStats")
}
Expand Down
104 changes: 104 additions & 0 deletions metricbeat/module/postgresql/metricset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package postgresql

import (
"context"
"database/sql"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

type MetricSet struct {
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
mb.BaseMetricSet

db *sql.DB
}

// NewMetricSet creates a PostgreSQL metricset with a pool of connections
func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
return &MetricSet{BaseMetricSet: base}, nil
}

// DB creates a database connection, it must be freed after use with `Close()`
func (ms *MetricSet) DB(ctx context.Context) (*sql.Conn, error) {
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
if ms.db == nil {
db, err := sql.Open("postgres", ms.HostData().URI)
if err != nil {
return nil, errors.Wrap(err, "failed to open connection")
}
ms.db = db
}
return ms.db.Conn(ctx)
}

//QueryStats makes the database call for a given metric
func (ms *MetricSet) QueryStats(ctx context.Context, query string) ([]map[string]interface{}, error) {
db, err := ms.DB(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain a connection with the database")
}
defer db.Close()

rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, errors.Wrap(err, "failed to query database")
}

columns, err := rows.Columns()
if err != nil {
return nil, errors.Wrap(err, "scanning columns")
}
vals := make([][]byte, len(columns))
valPointers := make([]interface{}, len(columns))
for i := range vals {
valPointers[i] = &vals[i]
}

results := []map[string]interface{}{}

for rows.Next() {
err = rows.Scan(valPointers...)
if err != nil {
return nil, errors.Wrap(err, "scanning row")
}

result := map[string]interface{}{}
for i, col := range columns {
result[col] = string(vals[i])
}

logp.Debug("postgresql", "Result: %v", result)
results = append(results, result)
}
return results, nil
}

// Close closes the metricset and its connections
func (ms *MetricSet) Close() error {
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
if ms.db == nil {
return nil
}
return errors.Wrap(ms.db.Close(), "failed to close connection")
}
39 changes: 0 additions & 39 deletions metricbeat/module/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ Package postgresql is Metricbeat module for PostgreSQL server.
package postgresql

import (
"database/sql"
"fmt"
"net/url"
"strconv"
"strings"

"github.com/lib/pq"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
)
Expand Down Expand Up @@ -103,39 +100,3 @@ func ParseURL(mod mb.Module, rawURL string) (mb.HostData, error) {

return h, nil
}

//QueryStats makes the database call for a given metric
func QueryStats(db *sql.DB, query string) ([]map[string]interface{}, error) {
rows, err := db.Query(query)
if err != nil {
return nil, err
}

columns, err := rows.Columns()
if err != nil {
return nil, errors.Wrap(err, "scanning columns")
}
vals := make([][]byte, len(columns))
valPointers := make([]interface{}, len(columns))
for i := range vals {
valPointers[i] = &vals[i]
}

results := []map[string]interface{}{}

for rows.Next() {
err = rows.Scan(valPointers...)
if err != nil {
return nil, errors.Wrap(err, "scanning row")
}

result := map[string]interface{}{}
for i, col := range columns {
result[col] = string(vals[i])
}

logp.Debug("postgresql", "Result: %v", result)
results = append(results, result)
}
return results, nil
}
25 changes: 7 additions & 18 deletions metricbeat/module/postgresql/statement/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
package statement

import (
"database/sql"
"context"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/postgresql"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

// init registers the MetricSet with the central registry as soon as the program
Expand All @@ -44,33 +41,25 @@ func init() {
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
*postgresql.MetricSet
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := struct{}{}
if err := base.Module().UnpackConfig(&config); err != nil {
ms, err := postgresql.NewMetricSet(base)
if err != nil {
return nil, err
}

return &MetricSet{
BaseMetricSet: base,
}, nil
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_statements")
ctx := context.Background()
results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_statements")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing metricset Close won't cancel these requests, I guess it would be unexpected for them to leak, but it could be worth it to handle ctx cancellation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, Close doesn't cancel running queries, they will be executed till they finish. I wouldn't expect leaks because of that.
Not sure if it would worth to hande cancelation here, we'd need to keep a thread safe list of cancelation functions available on close. Once we have #11981 or similar we will be able to cancel the parent context when calling the closers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ I thought about that PR too. That said, you don't really need to keep a list, isn't it? When creating the metricset you can create a Background context, use it in all fetch operations and cancel it on close.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having that that PR is moving (:crossed_fingers:) I'm ok with leaving this for update after it's merged

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, you don't really need to keep a list, isn't it? When creating the metricset you can create a Background context, use it in all fetch operations and cancel it on close.

Yes, this would be another option, but I think this is what should be done in a general what by #11981. Doing it this way here has some disadvantages, we'd be hiding the context inside the metricset and it would be a different one to the one received by Fetch if #11981 is implemented.

Having that that PR is moving (crossed_fingers) I'm ok with leaving this for update after it's merged

Yeah, let me move this forward 🙂

if err != nil {
return errors.Wrap(err, "QueryStats")
}
Expand Down