Skip to content

Commit

Permalink
Add timestamp column support to postgresql_extensible (#8602)
Browse files Browse the repository at this point in the history
(cherry picked from commit 0c99ae9)
  • Loading branch information
tfoldi authored and ssoroka committed Jan 27, 2021
1 parent 7a88ac7 commit 0d3b2eb
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
5 changes: 5 additions & 0 deletions plugins/inputs/postgresql_extensible/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ The example below has two queries are specified, with the following parameters:
# defined tags. The values in these columns must be of a string-type,
# a number-type or a blob-type.
#
# The timestamp field is used to override the data points timestamp value. By
# default, all rows inserted with current time. By setting a timestamp column,
# the row will be inserted with that column's value.
#
# Structure :
# [[inputs.postgresql_extensible.query]]
# sqlquery string
# version string
# withdbname boolean
# tagvalue string (coma separated)
# timestamp string
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_database where datname"
version=901
Expand Down
30 changes: 29 additions & 1 deletion plugins/inputs/postgresql_extensible/postgresql_extensible.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"strings"
"time"

_ "github.com/jackc/pgx/stdlib"

Expand All @@ -19,6 +20,7 @@ type Postgresql struct {
postgresql.Service
Databases []string
AdditionalTags []string
Timestamp string
Query query
Debug bool

Expand All @@ -32,6 +34,7 @@ type query []struct {
Withdbname bool
Tagvalue string
Measurement string
Timestamp string
}

var ignoredColumns = map[string]bool{"stats_reset": true}
Expand Down Expand Up @@ -82,13 +85,23 @@ var sampleConfig = `
## The script option can be used to specify the .sql file path.
## If script and sqlquery options specified at same time, sqlquery will be used
##
## the tagvalue field is used to define custom tags (separated by comas).
## the query is expected to return columns which match the names of the
## defined tags. The values in these columns must be of a string-type,
## a number-type or a blob-type.
##
## The timestamp field is used to override the data points timestamp value. By
## default, all rows inserted with current time. By setting a timestamp column,
## the row will be inserted with that column's value.
##
## Structure :
## [[inputs.postgresql_extensible.query]]
## sqlquery string
## version string
## withdbname boolean
## tagvalue string (comma separated)
## measurement string
## timestamp string
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_database"
version=901
Expand Down Expand Up @@ -150,6 +163,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
query string
tag_value string
meas_name string
timestamp string
columns []string
)

Expand All @@ -164,6 +178,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
for i := range p.Query {
sql_query = p.Query[i].Sqlquery
tag_value = p.Query[i].Tagvalue
timestamp = p.Query[i].Timestamp

if p.Query[i].Measurement != "" {
meas_name = p.Query[i].Measurement
Expand Down Expand Up @@ -206,6 +221,8 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
}
}

p.Timestamp = timestamp

for rows.Next() {
err = p.accRow(meas_name, rows, acc, columns)
if err != nil {
Expand All @@ -228,6 +245,7 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
columnVars []interface{}
dbname bytes.Buffer
tagAddress string
timestamp time.Time
)

// this is where we'll store the column name with its *interface{}
Expand Down Expand Up @@ -269,6 +287,9 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
"db": dbname.String(),
}

// set default timestamp to Now
timestamp = time.Now()

fields := make(map[string]interface{})
COLUMN:
for col, val := range columnMap {
Expand All @@ -278,6 +299,13 @@ COLUMN:
continue
}

if col == p.Timestamp {
if v, ok := (*val).(time.Time); ok {
timestamp = v
}
continue
}

for _, tag := range p.AdditionalTags {
if col != tag {
continue
Expand All @@ -301,7 +329,7 @@ COLUMN:
fields[col] = *val
}
}
acc.AddFields(meas_name, fields, tags)
acc.AddFields(meas_name, fields, tags, timestamp)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"testing"
"time"

"github.com/influxdata/telegraf/plugins/inputs/postgresql"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -126,6 +127,13 @@ func TestPostgresqlQueryOutputTests(t *testing.T) {
assert.True(t, found)
assert.Equal(t, true, v)
},
"SELECT timestamp'1980-07-23' as ts, true AS myvalue": func(acc *testutil.Accumulator) {
expectedTime := time.Date(1980, 7, 23, 0, 0, 0, 0, time.UTC)
v, found := acc.BoolField(measurement, "myvalue")
assert.True(t, found)
assert.Equal(t, true, v)
assert.True(t, acc.HasTimestamp(measurement, expectedTime))
},
}

for q, assertions := range examples {
Expand All @@ -134,6 +142,7 @@ func TestPostgresqlQueryOutputTests(t *testing.T) {
Version: 901,
Withdbname: false,
Tagvalue: "",
Timestamp: "ts",
}})
assertions(acc)
}
Expand Down

1 comment on commit 0d3b2eb

@muhammadraghib10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unable to get value assigned to timestamp column. My table has 4 columns: id, temperature as temp, timestamp with time zone and "utc" unix timestamp in integer form. I am setting the timestamp value in inputs plugin to column "utc" but it is not picking up the right value for timestamp instead it is sending the current timestamp to influxdb.

[[inputs.postgresql_extensible.query]] sqlquery="select temp from temp_utc where mid = 45 order by utc DESC;" version=901 withdbname=false tagvalue="" timestamp="utc"

Please sign in to comment.