diff --git a/dialer.go b/dialer.go index 153fbb37..ad0aaac3 100644 --- a/dialer.go +++ b/dialer.go @@ -27,6 +27,7 @@ import ( "cloud.google.com/go/cloudsqlconn/errtypes" "cloud.google.com/go/cloudsqlconn/internal/cloudsql" "cloud.google.com/go/cloudsqlconn/internal/trace" + "github.com/google/uuid" "golang.org/x/net/proxy" "google.golang.org/api/option" sqladmin "google.golang.org/api/sqladmin/v1beta4" @@ -73,6 +74,10 @@ type Dialer struct { // defaultDialCfg holds the constructor level DialOptions, so that it can // be copied and mutated by the Dial function. defaultDialCfg dialCfg + + // dialerID uniquely identifies a Dialer. Used for monitoring purposes, + // *only* when a client has configured OpenCensus exporters. + dialerID string } // NewDialer creates a new Dialer. @@ -110,12 +115,20 @@ func NewDialer(ctx context.Context, opts ...DialerOption) (*Dialer, error) { opt(&dialCfg) } + if err := trace.InitMetrics(); err != nil { + // This error means the internal metric configuration is incorrect and + // should never be surfaced to callers, as there's nothing actionable + // for a caller to do. Ignoring the error seems worse and so we return + // it. + return nil, err + } d := &Dialer{ instances: make(map[string]*cloudsql.Instance), key: cfg.rsaKey, refreshTimeout: cfg.refreshTimeout, sqladmin: client, defaultDialCfg: dialCfg, + dialerID: uuid.New().String(), } return d, nil } @@ -123,9 +136,12 @@ func NewDialer(ctx context.Context, opts ...DialerOption) (*Dialer, error) { // Dial returns a net.Conn connected to the specified Cloud SQL instance. The instance argument must be the // instance's connection name, which is in the format "project-name:region:instance-name". func (d *Dialer) Dial(ctx context.Context, instance string, opts ...DialOption) (conn net.Conn, err error) { + startTime := time.Now() var endDial trace.EndSpanFunc ctx, endDial = trace.StartSpan(ctx, "cloud.google.com/go/cloudsqlconn.Dial", - trace.AddInstanceName(instance)) + trace.AddInstanceName(instance), + trace.AddDialerID(d.dialerID), + ) defer func() { endDial(err) }() cfg := d.defaultDialCfg for _, opt := range opts { @@ -171,7 +187,42 @@ func (d *Dialer) Dial(ctx context.Context, instance string, opts ...DialOption) _ = tlsConn.Close() // best effort close attempt return nil, errtypes.NewDialError("handshake failed", i.String(), err) } - return tlsConn, nil + latency := time.Since(startTime).Milliseconds() + go func() { + trace.RecordDialLatency(ctx, instance, d.dialerID, latency) + trace.RecordConnectionOpen(ctx, instance, d.dialerID) + }() + + return newInstrumentedConn(tlsConn, instance, d.dialerID), nil +} + +// newInstrumentedConn initializes an instrumentedConn that on closing will +// decrement the number of open connects and record the result. +func newInstrumentedConn(conn net.Conn, instance, dialerID string) *instrumentedConn { + return &instrumentedConn{ + Conn: conn, + closeFunc: func() { + trace.RecordConnectionClose(context.Background(), instance, dialerID) + }, + } +} + +// instrumentedConn wraps a net.Conn and invokes closeFunc when the connection +// is closed. +type instrumentedConn struct { + net.Conn + closeFunc func() +} + +// Close delegates to the underylying net.Conn interface and reports the close +// to the provided closeFunc only when Close returns no error. +func (i *instrumentedConn) Close() error { + err := i.Conn.Close() + if err != nil { + return err + } + go i.closeFunc() + return nil } // Close closes the Dialer; it prevents the Dialer from refreshing the information diff --git a/go.mod b/go.mod index 0af17c87..d450bcf2 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( cloud.google.com/go v0.75.0 // indirect + github.com/google/uuid v1.3.0 github.com/jackc/pgx/v4 v4.10.1 github.com/pkg/errors v0.9.1 // indirect go.opencensus.io v0.22.6 diff --git a/go.sum b/go.sum index aedbaa25..c38af6bd 100644 --- a/go.sum +++ b/go.sum @@ -127,6 +127,8 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= diff --git a/internal/trace/doc.go b/internal/trace/doc.go index 025fb237..e1a6dfce 100644 --- a/internal/trace/doc.go +++ b/internal/trace/doc.go @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package trace provides an interface for tracing internal operations. +// Package trace provides an interface for tracing internal operations and +// reporting various metrics. Metrics are recorded on a package-global basis. package trace // import "cloud.google.com/go/cloudsqlconn/internal/trace" diff --git a/internal/trace/metrics.go b/internal/trace/metrics.go new file mode 100644 index 00000000..76d7c022 --- /dev/null +++ b/internal/trace/metrics.go @@ -0,0 +1,80 @@ +package trace + +import ( + "context" + "fmt" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + keyInstance, _ = tag.NewKey("cloudsql_instance") + keyDialerID, _ = tag.NewKey("cloudsql_dialer_id") +) + +var ( + mLatencyMS = stats.Int64( + "/cloudsqlconn/latency", + "The latency in milliseconds per Dial", + stats.UnitMilliseconds, + ) + latencyView = &view.View{ + Name: "/cloudsqlconn/dial_latency", + Measure: mLatencyMS, + Description: "The distribution of dialer latencies (ms)", + // Latency in buckets, e.g., >=0ms, >=100ms, etc. + Aggregation: view.Distribution(0, 5, 25, 100, 250, 500, 1000, 2000, 5000, 30000), + TagKeys: []tag.Key{keyInstance, keyDialerID}, + } +) + +var ( + mConnections = stats.Int64( + "/cloudsqlconn/connection", + "A connect or disconnect event to Cloud SQL", + stats.UnitDimensionless, + ) + connectionsView = &view.View{ + Name: "/cloudsqlconn/open_connections", + Measure: mConnections, + Description: "The sum of Cloud SQL connections", + Aggregation: view.Sum(), + TagKeys: []tag.Key{keyInstance, keyDialerID}, + } +) + +// RecordDialLatency records a latency value for a call to dial. +func RecordDialLatency(ctx context.Context, instance, dialerID string, latency int64) { + // tag.New creates a new context and errors only if the new tag already + // exists in the provided context. Since we're adding tags within this + // package only, we can be confident that there were be no duplicate tags + // and so can ignore the error. + ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID)) + stats.Record(ctx, mLatencyMS.M(latency)) +} + +// RecordConnectionOpen reports a connection event. +func RecordConnectionOpen(ctx context.Context, instance, dialerID string) { + // Why are we ignoring this error? See above under RecordDialLatency. + ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID)) + stats.Record(ctx, mConnections.M(1)) +} + +// RecordConnectionClose records a disconnect event. +func RecordConnectionClose(ctx context.Context, instance, dialerID string) { + // Why are we ignoring this error? See above under RecordDialLatency. + ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID)) + stats.Record(ctx, mConnections.M(-1)) +} + +// InitMetrics registers all views. Without registering views, metrics will not +// be reported. If any names of the registered views conflict, this function +// returns an error to indicate a configuration problem. +func InitMetrics() error { + if err := view.Register(latencyView, connectionsView); err != nil { + return fmt.Errorf("failed to initialize metrics: %v", err) + } + return nil +} diff --git a/internal/trace/metrics_test.go b/internal/trace/metrics_test.go new file mode 100644 index 00000000..81f493fe --- /dev/null +++ b/internal/trace/metrics_test.go @@ -0,0 +1,13 @@ +package trace_test + +import ( + "testing" + + "cloud.google.com/go/cloudsqlconn/internal/trace" +) + +func TestMetricsInitializes(t *testing.T) { + if err := trace.InitMetrics(); err != nil { + t.Fatalf("want no error, got = %v", err) + } +} diff --git a/internal/trace/trace.go b/internal/trace/trace.go index d80a1712..dc88918d 100644 --- a/internal/trace/trace.go +++ b/internal/trace/trace.go @@ -43,6 +43,11 @@ func AddInstanceName(name string) Attribute { return Attribute{key: "/cloudsql/instance", value: name} } +// AddDialerID creates an attribute to identify a particular dialer. +func AddDialerID(dialerID string) Attribute { + return Attribute{key: "/cloudsql/dialer_id", value: dialerID} +} + // StartSpan begins a span with the provided name and returns a context and a // function to end the created span. func StartSpan(ctx context.Context, name string, attrs ...Attribute) (context.Context, EndSpanFunc) {