Skip to content

Commit

Permalink
MySQL: Replica latency/running check
Browse files Browse the repository at this point in the history
  • Loading branch information
setaou committed Nov 28, 2023
1 parent 4287a59 commit babbe47
Showing 1 changed file with 123 additions and 11 deletions.
134 changes: 123 additions & 11 deletions backends_processor/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/zclconf/go-cty/cty"
"github.com/zclconf/go-cty/cty/gocty"

_ "github.com/go-sql-driver/mysql"
)
Expand Down Expand Up @@ -42,6 +43,7 @@ type MySQLChecker struct {
connect_timeout time.Duration
read_timeout time.Duration
write_timeout time.Duration
check_replica bool
}

type MySQLCheckerConfig struct {
Expand All @@ -55,6 +57,7 @@ type MySQLCheckerConfig struct {
ConnectTimeout string `hcl:"connect_timeout,optional"`
ReadTimeout string `hcl:"read_timeout,optional"`
WriteTimeout string `hcl:"write_timeout,optional"`
CheckReplica bool `hcl:"check_replica,optional"`
}

type MySQLCheckerFactory struct{}
Expand Down Expand Up @@ -103,6 +106,7 @@ func (w MySQLCheckerFactory) New(tc *Config, wg *sync.WaitGroup, ctx context.Con
upd_chan: make(chan backend.BackendUpdate),
upd_chan_stop: make(chan struct{}),
source: config.Source,
check_replica: config.CheckReplica,
}

var err error
Expand Down Expand Up @@ -163,6 +167,7 @@ func (w MySQLCheckerFactory) New(tc *Config, wg *sync.WaitGroup, ctx context.Con
c.max_period,
c.backoff_factor,
status_chan,
c.check_replica,
)
err := check.StartPolling()
if err != nil {
Expand Down Expand Up @@ -277,9 +282,10 @@ type MySQLCheck struct {
stop_chan chan bool
running bool
db *sql.DB
check_replica bool
}

func NewMySQLCheck(backend *backend.Backend, dsn string, default_period time.Duration, max_period time.Duration, backoff_factor float64, status_chan chan *backend.Backend) *MySQLCheck {
func NewMySQLCheck(backend *backend.Backend, dsn string, default_period time.Duration, max_period time.Duration, backoff_factor float64, status_chan chan *backend.Backend, check_replica bool) *MySQLCheck {
c := &MySQLCheck{
backend: backend,
dsn: dsn,
Expand All @@ -290,19 +296,100 @@ func NewMySQLCheck(backend *backend.Backend, dsn string, default_period time.Dur
status_chan: status_chan,
stop_chan: make(chan bool),
running: false,
check_replica: check_replica,
}
backend.Meta.Set("mysql", "status", cty.UnknownVal(cty.String))
backend.Meta.Set("mysql", "readonly", cty.UnknownVal(cty.Bool))
if c.check_replica {
backend.Meta.Set("mysql", "replica_latency", cty.UnknownVal(cty.Number))
backend.Meta.Set("mysql", "replica_running", cty.UnknownVal(cty.Bool))
}
return c
}

func (c *MySQLCheck) fetchStatus() (ret_status cty.Value, ret_readonly cty.Value, ret_err error) {
func (c *MySQLCheck) fetchReadOnly() (ret_readonly cty.Value, ret_err error) {
defer func() {
if r := recover(); r != nil {
ret_readonly = cty.BoolVal(false)
ret_err = misc.EnsureError(r)
}
}()

var read_only bool

result, err := c.db.Query("SELECT @@read_only")
misc.PanicIfErr(err)
defer result.Close()
result.Next()
err = result.Scan(&read_only)
misc.PanicIfErr(err)

return cty.BoolVal(read_only), nil
}

func (c *MySQLCheck) fetchReplicaLatency() (ret_replica_latency cty.Value, ret_err error) {
defer func() {
if r := recover(); r != nil {
ret_replica_latency = cty.NumberIntVal(-1)
ret_err = misc.EnsureError(r)
}
}()

// Default value -1 if replication is not running
var replication_latency int64 = -1

// Execute query
result, err := c.db.Query("SHOW REPLICA STATUS")
misc.PanicIfErr(err)
defer result.Close()

// If we have a row
if result.Next() {
// Find the column index for Seconds_Behind_Source
columns, err := result.Columns()
misc.PanicIfErr(err)
sbs_column := -1
for i := range columns {
if columns[i] == "Seconds_Behind_Source" {
sbs_column = i
break
}
}
if sbs_column == -1 {
panic("Column Seconds_Behind_Source not found in SHOW REPLICA STATUS")
}

// Create the buffer and scan the row
var sbs_value sql.NullInt64
values := make([]interface{}, len(columns))
for i := range columns {
if i == sbs_column {
values[i] = &sbs_value
} else {
values[i] = new(sql.RawBytes)
}
}
err = result.Scan(values...)
misc.PanicIfErr(err)

// Get the value if not null
if sbs_value.Valid {
replication_latency = int64(sbs_value.Int64)
}
}

return cty.NumberIntVal(replication_latency), nil
}

func (c *MySQLCheck) fetchStatus() (ret_status cty.Value, ret_readonly cty.Value, ret_replica_latency cty.Value, ret_err error) {
defer func() {
if r := recover(); r != nil {
ret_status = cty.StringVal("err")
ret_readonly = cty.BoolVal(false)
ret_replica_latency = cty.NumberIntVal(-1)
ret_err = misc.EnsureError(r)

// Increase fetch period
if period, updated := c.ticker.ApplyBackoff(); updated {
log.Warn().Str("address", c.backend.Address).Dur("period", period).Msg("Updating fetch period")
}
Expand All @@ -311,25 +398,27 @@ func (c *MySQLCheck) fetchStatus() (ret_status cty.Value, ret_readonly cty.Value

log.Trace().Str("address", c.backend.Address).Msg("Probing Backend")

result, err := c.db.Query("SELECT @@read_only")
// Read Only
read_only, err := c.fetchReadOnly()
misc.PanicIfErr(err)
defer result.Close()

var read_only bool

result.Next()
err = result.Scan(&read_only)
misc.PanicIfErr(err)
// Replica Latency
var replica_latency cty.Value = cty.UnknownVal(cty.Bool)
if c.check_replica {
replica_latency, err = c.fetchReplicaLatency()
misc.PanicIfErr(err)
}

// If everything went OK, reset the fetch period if needed
if period, updated := c.ticker.Reset(); updated {
log.Warn().Str("address", c.backend.Address).Dur("period", period).Msg("Updating fetch period")
}

return cty.StringVal("ok"), cty.BoolVal(read_only), nil
return cty.StringVal("ok"), read_only, replica_latency, nil
}

func (c *MySQLCheck) updateStatus() {
new_status, new_readonly, err := c.fetchStatus()
new_status, new_readonly, new_replica_latency, err := c.fetchStatus()

if err != nil {
log.Error().Str("address", c.backend.Address).Err(err).Msg("Error while fetching status from backend")
Expand Down Expand Up @@ -361,6 +450,29 @@ func (c *MySQLCheck) updateStatus() {
c.status_chan <- c.backend
}

if c.check_replica {
old_replica_latency, ok := c.backend.Meta.Get("mysql", "replica_latency")
if !ok || !old_replica_latency.IsKnown() || old_replica_latency.Equals(new_replica_latency).False() {
c.backend.Meta.Set("mysql", "replica_latency", new_replica_latency)
c.backend.Meta.Set("mysql", "replica_running", new_replica_latency.GreaterThanOrEqualTo(cty.NumberUIntVal(0)))

var new_replica_latency_v int64
err := gocty.FromCtyValue(new_replica_latency, &new_replica_latency_v)
misc.PanicIfErr(err)

if !old_replica_latency.IsKnown() {
log.Debug().Str("address", c.backend.Address).Int64("new_replica_latency", new_replica_latency_v).Msg("Backend replica_latency changed")
} else {
var old_replica_latency_v int64
err := gocty.FromCtyValue(old_replica_latency, &old_replica_latency_v)
misc.PanicIfErr(err)

log.Debug().Str("address", c.backend.Address).Int64("old_replica_latency", old_replica_latency_v).Int64("new_replica_latency", new_replica_latency_v).Msg("Backend replica_latency changed")
}

c.status_chan <- c.backend
}
}
}

func (c *MySQLCheck) StartPolling() error {
Expand Down

0 comments on commit babbe47

Please sign in to comment.