From 66d545c9c604c927ecfd196b4ee562b14e08e3c6 Mon Sep 17 00:00:00 2001 From: Lyubo Kamenov Date: Thu, 13 Jun 2024 14:31:42 -0400 Subject: [PATCH] add extra log metrics for snapshot progress (#162) * Add extra log metrics for logrepl --- source/logrepl/cdc.go | 2 +- source/snapshot/fetch_worker.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index 2984f98..995fa92 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -102,7 +102,7 @@ func (i *CDCIterator) StartSubscriber(ctx context.Context) error { sdk.Logger(ctx).Info(). Str("slot", i.config.SlotName). Str("publication", i.config.PublicationName). - Msg("Starting logical replication") + Msgf("Starting logical replication at %s", i.sub.StartLSN) go func() { if err := i.sub.Run(ctx); err != nil { diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index 2d95b30..bb62087 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -201,11 +201,14 @@ func (f *FetchWorker) Run(ctx context.Context) error { Int("rows", nfetched). Str("table", f.conf.Table). Dur("elapsed", time.Since(start)). + Str("completed_perc", fmt.Sprintf("%.2f", (float64(nfetched)/float64(f.snapshotEnd))*100)). + Str("rate_per_min", fmt.Sprintf("%.0f", float64(nfetched)/time.Since(start).Minutes())). Msg("fetching rows") } sdk.Logger(ctx).Info(). Dur("elapsed", time.Since(start)). + Str("rate_per_min", fmt.Sprintf("%.0f", float64(nfetched)/time.Since(start).Minutes())). Str("table", f.conf.Table). Msgf("%q snapshot completed", f.conf.Table) @@ -254,12 +257,18 @@ func (f *FetchWorker) updateSnapshotEnd(ctx context.Context, tx pgx.Tx) error { } func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) { + start := time.Now().UTC() + rows, err := tx.Query(ctx, fmt.Sprintf("FETCH %d FROM %s", f.conf.FetchSize, f.cursorName)) if err != nil { return 0, fmt.Errorf("failed to fetch rows: %w", err) } defer rows.Close() + sdk.Logger(ctx).Info(). + Dur("fetch_elapsed", time.Since(start)). + Msg("cursor fetched data") + var fields []string for _, f := range rows.FieldDescriptions() { fields = append(fields, f.Name) @@ -292,6 +301,13 @@ func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) { } func (f *FetchWorker) send(ctx context.Context, d FetchData) error { + start := time.Now().UTC() + defer func() { + sdk.Logger(ctx).Trace(). + Dur("send_elapsed", time.Since(start)). + Msg("sending data to chan") + }() + select { case <-ctx.Done(): return ctx.Err()