Skip to content

Commit

Permalink
Update formatting on *.go files (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
timflannagan authored Sep 4, 2020
1 parent 2e06672 commit 415f1e7
Show file tree
Hide file tree
Showing 2 changed files with 553 additions and 555 deletions.
120 changes: 60 additions & 60 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,37 @@
// The main package for the Prometheus server executable.
package main


import (
"fmt"
"io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"os/signal"
"time"

"path/filepath"
"github.com/crunchydata/postgresql-prometheus-adapter/pkg/postgresql"

"github.com/crunchydata/postgresql-prometheus-adapter/pkg/postgresql"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"

//"github.com/jamiealquiza/envy"

"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"

"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/prometheus/prompb"

//"github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"gopkg.in/alecthomas/kingpin.v2"
//"flag"

"gopkg.in/alecthomas/kingpin.v2"
//"flag"
)

type config struct {
Expand All @@ -61,14 +62,14 @@ type config struct {
logLevel string
haGroupLockId int
prometheusTimeout time.Duration
promlogConfig promlog.Config
promlogConfig promlog.Config
}

const (
tickInterval = time.Second
promLivenessCheck = time.Second
max_bgwriter = 10
max_bgparser = 20
max_bgwriter = 10
max_bgparser = 20
)

var (
Expand Down Expand Up @@ -122,61 +123,61 @@ func init() {

func main() {
cfg := parseFlags()
logger := promlog.New(&cfg.promlogConfig)
logger := promlog.New(&cfg.promlogConfig)
level.Info(logger).Log("config", fmt.Sprintf("%+v", cfg))
level.Info(logger).Log("pgPrometheusConfig", fmt.Sprintf("%+v", cfg.pgPrometheusConfig))

if ( cfg.pgPrometheusConfig.PGWriters < 0 ) {
cfg.pgPrometheusConfig.PGWriters=1
}
if ( cfg.pgPrometheusConfig.PGWriters > max_bgwriter ) {
cfg.pgPrometheusConfig.PGWriters=max_bgwriter
}
if cfg.pgPrometheusConfig.PGWriters < 0 {
cfg.pgPrometheusConfig.PGWriters = 1
}
if cfg.pgPrometheusConfig.PGWriters > max_bgwriter {
cfg.pgPrometheusConfig.PGWriters = max_bgwriter
}

if ( cfg.pgPrometheusConfig.PGParsers < 0 ) {
cfg.pgPrometheusConfig.PGParsers=1
}
if ( cfg.pgPrometheusConfig.PGParsers > max_bgparser ) {
cfg.pgPrometheusConfig.PGParsers=max_bgparser
}
if cfg.pgPrometheusConfig.PGParsers < 0 {
cfg.pgPrometheusConfig.PGParsers = 1
}
if cfg.pgPrometheusConfig.PGParsers > max_bgparser {
cfg.pgPrometheusConfig.PGParsers = max_bgparser
}

http.Handle(cfg.telemetryPath, promhttp.Handler())
writer, reader := buildClients(logger, cfg)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func(){
for sig := range c {
fmt.Printf("Signal: %v\n", sig)
for t := 0; t < cfg.pgPrometheusConfig.PGWriters; t++ {
fmt.Printf("Calling shutdown %d\n", t)
worker[t].PGWriterShutdown()
}
for t := 0; t < cfg.pgPrometheusConfig.PGWriters; t++ {
for worker[t].Running {
time.Sleep( 1 * time.Second )
fmt.Printf("Waiting for shutdown %d...\n", t)
}
}
os.Exit(0)
}
}()
for t := 0; t < cfg.pgPrometheusConfig.PGWriters; t++ {
go worker[t].RunPGWriter(logger, t, cfg.pgPrometheusConfig.CommitSecs, cfg.pgPrometheusConfig.CommitRows, cfg.pgPrometheusConfig.PGParsers, cfg.pgPrometheusConfig.PartitionScheme)
defer worker[t].PGWriterShutdown()
}

level.Info(logger).Log("msg", "Starting HTTP Listerner")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for sig := range c {
fmt.Printf("Signal: %v\n", sig)
for t := 0; t < cfg.pgPrometheusConfig.PGWriters; t++ {
fmt.Printf("Calling shutdown %d\n", t)
worker[t].PGWriterShutdown()
}
for t := 0; t < cfg.pgPrometheusConfig.PGWriters; t++ {
for worker[t].Running {
time.Sleep(1 * time.Second)
fmt.Printf("Waiting for shutdown %d...\n", t)
}
}
os.Exit(0)
}
}()
for t := 0; t < cfg.pgPrometheusConfig.PGWriters; t++ {
go worker[t].RunPGWriter(logger, t, cfg.pgPrometheusConfig.CommitSecs, cfg.pgPrometheusConfig.CommitRows, cfg.pgPrometheusConfig.PGParsers, cfg.pgPrometheusConfig.PartitionScheme)
defer worker[t].PGWriterShutdown()
}

level.Info(logger).Log("msg", "Starting HTTP Listerner")

http.Handle("/write", timeHandler("write", write(logger, writer)))
http.Handle("/read", timeHandler("read", read(logger, reader)))

level.Info(logger).Log("msg", "Starting up...")
level.Info(logger).Log("msg", "Listening", "addr", cfg.listenAddr)

err := http.ListenAndServe(cfg.listenAddr, nil)
err := http.ListenAndServe(cfg.listenAddr, nil)

level.Info(logger).Log("msg", "Started HTTP Listerner")
level.Info(logger).Log("msg", "Started HTTP Listerner")

if err != nil {
level.Error(logger).Log("msg", "Listen failure", "err", err)
Expand All @@ -189,19 +190,19 @@ func parseFlags() *config {
a.HelpFlag.Short('h')

cfg := &config{
promlogConfig: promlog.Config{},
}
promlogConfig: promlog.Config{},
}

a.Flag("adapter-send-timeout", "The timeout to use when sending samples to the remote storage.").Default("30s").DurationVar(&cfg.remoteTimeout)
a.Flag("web-listen-address", "Address to listen on for web endpoints.").Default(":9201").StringVar(&cfg.listenAddr)
a.Flag("web-telemetry-path", "Address to listen on for web endpoints.").Default("/metrics").StringVar(&cfg.telemetryPath)
flag.AddFlags(a, &cfg.promlogConfig)
flag.AddFlags(a, &cfg.promlogConfig)

a.Flag("pg-partition", "daily or hourly partitions, default: hourly").Default("hourly").StringVar(&cfg.pgPrometheusConfig.PartitionScheme)
a.Flag("pg-commit-secs", "Write data to database every N seconds").Default("15").IntVar(&cfg.pgPrometheusConfig.CommitSecs)
a.Flag("pg-commit-rows", "Write data to database every N Rows").Default("20000").IntVar(&cfg.pgPrometheusConfig.CommitRows)
a.Flag("pg-threads", "Writer DB threads to run 1-10").Default("1").IntVar(&cfg.pgPrometheusConfig.PGWriters)
a.Flag("parser-threads", "parser threads to run per DB writer 1-10").Default("5").IntVar(&cfg.pgPrometheusConfig.PGParsers)
a.Flag("pg-partition", "daily or hourly partitions, default: hourly").Default("hourly").StringVar(&cfg.pgPrometheusConfig.PartitionScheme)
a.Flag("pg-commit-secs", "Write data to database every N seconds").Default("15").IntVar(&cfg.pgPrometheusConfig.CommitSecs)
a.Flag("pg-commit-rows", "Write data to database every N Rows").Default("20000").IntVar(&cfg.pgPrometheusConfig.CommitRows)
a.Flag("pg-threads", "Writer DB threads to run 1-10").Default("1").IntVar(&cfg.pgPrometheusConfig.PGWriters)
a.Flag("parser-threads", "parser threads to run per DB writer 1-10").Default("5").IntVar(&cfg.pgPrometheusConfig.PGParsers)

_, err := a.Parse(os.Args[1:])
if err != nil {
Expand Down Expand Up @@ -295,7 +296,7 @@ func read(logger log.Logger, reader reader) http.Handler {
var resp *prompb.ReadResponse
resp, err = reader.Read(&req)
if err != nil {
fmt.Printf("MAIN req.Queries: %v\n", req.Queries)
fmt.Printf("MAIN req.Queries: %v\n", req.Queries)
level.Warn(logger).Log("msg", "Error executing query", "query", req, "storage", reader.Name(), "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -351,7 +352,7 @@ func protoToSamples(req *prompb.WriteRequest) model.Samples {
func sendSamples(w writer, samples model.Samples) error {
begin := time.Now()
var err error
err = w.Write(samples)
err = w.Write(samples)
duration := time.Since(begin).Seconds()
if err != nil {
failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
Expand All @@ -372,4 +373,3 @@ func timeHandler(path string, handler http.Handler) http.Handler {
}
return http.HandlerFunc(f)
}

Loading

0 comments on commit 415f1e7

Please sign in to comment.