Skip to content

Commit

Permalink
Change postgres to postgresql (#9)
Browse files Browse the repository at this point in the history
* Change postgres to postgresql

* Fixes as per https://goreportcard.com/

* gofmt -s -w

* Format comments

* Fix spelling as per Mr B
  • Loading branch information
sharmay authored Sep 5, 2020
1 parent 415f1e7 commit 359de96
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 25 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

Remote storage adapter enabling Prometheus to use PostgreSQL as a long-term store for time-series metrics. Code is based on [Prometheus - Remote storage adapter](https://github.com/prometheus/prometheus/tree/master/documentation/examples/remote_storage/remote_storage_adapter).

The PostgreSQL Prometheus Adapter is designed to utilize native partitioning enhancements available in recent versions of core Postgres to efficiently store Prometheus time series data in a PostgreSQL database, and is not dependent on external PostgreSQL extensions.
The PostgreSQL Prometheus Adapter is designed to utilize native partitioning enhancements available in recent versions of core PostgreSQL to efficiently store Prometheus time series data in a PostgreSQL database, and is not dependent on external PostgreSQL extensions.

The design is based on partitioning and threads. Incoming data is processed by one or more threads and one or more writer threads will store data in PostgreSQL daily or hourly partitions. Partitions will be auto-created by the adapter based on the timestamp of incoming data.

The PostgreSQL Prometheus Adapter accepts Prometheus remote read/write requests, and sends them to PostgreSQL.

Additional information regarding the adapter and getting started is provided below and available in this [blog post introducing the Postgres Prometheus Adapter](https://info.crunchydata.com/blog/using-postgres-to-back-prometheus-for-your-postgresql-monitoring-1).
Additional information regarding the adapter and getting started is provided below and available in this [blog post introducing the PostgreSQL Prometheus Adapter](https://info.crunchydata.com/blog/using-postgres-to-back-prometheus-for-your-postgresql-monitoring-1).

## PostgreSQL Version Support

Expand All @@ -25,7 +25,7 @@ PostgreSQL Prometheus Adapter supports:
make
```

### Make a cotnainer (optional)
### Make a container (optional)

```shell
make container
Expand Down
19 changes: 7 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ type config struct {
const (
tickInterval = time.Second
promLivenessCheck = time.Second
max_bgwriter = 10
max_bgparser = 20
maxBgWriter = 10
maxBgParser = 20
)

var (
Expand Down Expand Up @@ -111,7 +111,7 @@ var (
)
)

var worker [max_bgwriter]postgresql.PGWriter
var worker [maxBgWriter]postgresql.PGWriter

func init() {
prometheus.MustRegister(receivedSamples)
Expand All @@ -130,15 +130,15 @@ func main() {
if cfg.pgPrometheusConfig.PGWriters < 0 {
cfg.pgPrometheusConfig.PGWriters = 1
}
if cfg.pgPrometheusConfig.PGWriters > max_bgwriter {
cfg.pgPrometheusConfig.PGWriters = max_bgwriter
if cfg.pgPrometheusConfig.PGWriters > maxBgWriter {
cfg.pgPrometheusConfig.PGWriters = maxBgWriter
}

if cfg.pgPrometheusConfig.PGParsers < 0 {
cfg.pgPrometheusConfig.PGParsers = 1
}
if cfg.pgPrometheusConfig.PGParsers > max_bgparser {
cfg.pgPrometheusConfig.PGParsers = max_bgparser
if cfg.pgPrometheusConfig.PGParsers > maxBgParser {
cfg.pgPrometheusConfig.PGParsers = maxBgParser
}

http.Handle(cfg.telemetryPath, promhttp.Handler())
Expand Down Expand Up @@ -262,11 +262,6 @@ func write(logger log.Logger, writer writer) http.Handler {
level.Warn(logger).Log("msg", "Error sending samples to remote storage", "err", err, "storage", writer.Name(), "num_samples", len(samples))
}

//counter, err := sentSamples.GetMetricWithLabelValues(writer.Name())
//if err != nil {
// level.Warn(logger).Log("msg", "Couldn't get a counter", "labelValue", writer.Name(), "err", err)
//}

})
}

Expand Down
23 changes: 13 additions & 10 deletions pkg/postgresql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ type Config struct {
}

var promSamples = list.New()

// QueueMutex is used thread safe operations on promSamples list object.
var QueueMutex sync.Mutex
var vMetricIDMapMutex sync.Mutex
var vMetricIDMap tMetricIDMap

// Threaded writer
// PGWriter - Threaded writer
type PGWriter struct {
DB *pgx.Conn
id int
Expand All @@ -50,6 +52,7 @@ type PGWriter struct {
logger log.Logger
}

// PGParser - Threaded parser
type PGParser struct {
id int
KeepRunning bool
Expand Down Expand Up @@ -96,8 +99,6 @@ func (p *PGParser) RunPGParser(tid int, partitionScheme string, c *PGWriter) {
vMetricIDMapMutex.Unlock()
p.valueRows = append(p.valueRows, []interface{}{int64(id), toTimestamp(milliseconds), float64(sample.Value)})
}
samples = nil
// level.Info(c.logger).Log(fmt.Sprintf("bgparser%d",p.id), fmt.Sprintf("Parsed %d rows", len(p.valueRows) ) )
vMetricIDMapMutex.Lock()
c.valueRows = append(c.valueRows, p.valueRows...)
p.valueRows = nil
Expand Down Expand Up @@ -154,7 +155,7 @@ func (c *PGWriter) RunPGWriter(l log.Logger, tid int, commitSecs int, commitRows
c.Running = false
}

// Shutdown is a graceful shutdown mechanism
// PGWriterShutdown - Set shutdown flag for graceful shutdown
func (c *PGWriter) PGWriterShutdown() {
c.KeepRunning = false
}
Expand Down Expand Up @@ -190,12 +191,14 @@ func (c *PGWriter) PGWriterSave() {
level.Info(c.logger).Log("metric", fmt.Sprintf("BGWriter%d: Processed samples count,%d, duration,%v", c.id, rowCount+lblCount, duration))
}

// Push - Push element at then end of list
func Push(samples *model.Samples) {
QueueMutex.Lock()
promSamples.PushBack(samples)
QueueMutex.Unlock()
}

// Pop - Pop first element from list
func Pop() *model.Samples {
QueueMutex.Lock()
defer QueueMutex.Unlock()
Expand All @@ -206,7 +209,7 @@ func Pop() *model.Samples {
return nil
}

// Threaded writer
// Client - struct to hold critical values
type Client struct {
logger log.Logger
DB *pgx.Conn
Expand Down Expand Up @@ -275,18 +278,17 @@ func (c *PGWriter) setupPgPrometheus() error {

for rows.Next() {
var (
metric_name_label string
metric_id int64
metricNameLabel string
metricID int64
)
err := rows.Scan(&metric_name_label, &metric_id)
err := rows.Scan(&metricNameLabel, &metricID)

if err != nil {
rows.Close()
level.Info(c.logger).Log("msg", "Error scaning metric_labels")
return err
}
//level.Info(c.logger).Log("msg",fmt.Sprintf("YS>\t>%s<\t>%s<",metric_name_label, metric_id ) )
vMetricIDMap[metric_name_label] = metric_id
vMetricIDMap[metricNameLabel] = metricID
}
level.Info(c.logger).Log("msg", fmt.Sprintf("%d Rows Loaded in map: ", len(vMetricIDMap)))
rows.Close()
Expand Down Expand Up @@ -364,6 +366,7 @@ func createOrderedKeys(m *map[string]string) []string {
return keys
}

// Close - Close database connections
func (c *Client) Close() {
if c.DB != nil {
if err1 := c.DB.Close(context.Background()); err1 != nil {
Expand Down

0 comments on commit 359de96

Please sign in to comment.