Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Go]: Metrics incremented in Setup methods are not recalled. #27038

Open
1 of 15 tasks
lostluck opened this issue Jun 6, 2023 · 2 comments
Open
1 of 15 tasks

[Bug][Go]: Metrics incremented in Setup methods are not recalled. #27038

lostluck opened this issue Jun 6, 2023 · 2 comments

Comments

@lostluck
Copy link
Contributor

lostluck commented Jun 6, 2023

What happened?

Because the ParDo Setup context is uncached metrics initialized in Setup are lost, which is unexpected. Work done in setup, while logically outside of a bundle, will be under the context of the first bundle to execute that transform.

So there needs to be a way to transfer/extract the metrics from the Setup context so they are recorded back to the runner.

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@mohamedawnallah
Copy link
Contributor

mohamedawnallah commented Feb 7, 2025

Hi @lostluck,

I am interested in this issue and have been exploring Apache Beam metrics before working on this issue regarding unrecalled metric increments in Setup methods. As part of my learning, I wrote a simple Beam Go SDK program that converts words to uppercase and ran it on a local Flink development cluster. The dev environment looks like Apache Beam Go SDK v2.62.0, apache/beam_flink1.19_job_server, and flink:1.19.0-java11.

I observed that no custom metrics are reported in the Flink runner—only internal Beam metrics appear in the Accumulators tab. This behavior seems related to issue #32895.

Any insights on this?

For reference, here is the simple program I used:

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"reflect"
	"strings"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
)

// Define metrics.
var (
	processedCount   = metrics.NewCounter("example", "processed_count")
	invalidCount     = metrics.NewCounter("example", "invalid_count")
	sizeDistribution = metrics.NewDistribution("example", "record_size")
)

// UppercaseFn is a DoFn that processes words and updates metrics.
type UppercaseFn struct{}

func (fn *UppercaseFn) ProcessElement(word string, emit func(string)) {
	processedCount.Inc(context.Background(), 1)
	sizeDistribution.Update(context.Background(), int64(len(word)))

	if word == "" {
		invalidCount.Inc(context.Background(), 1)
		return
	}

	emit(strings.ToUpper(word))
}

// PrintFn is a DoFn that prints each element.
type PrintFn struct{}

func (fn *PrintFn) ProcessElement(word string) {
	fmt.Println(word)
}

func init() {
	beam.RegisterType(reflect.TypeOf((*UppercaseFn)(nil)).Elem())
	beam.RegisterType(reflect.TypeOf((*PrintFn)(nil)).Elem())
}

func main() {
	flag.Parse()
	beam.Init()

	pipeline := beam.NewPipeline()
	scope := pipeline.Root()

	inputData := []string{"apple", "banana", "", "cherry"}
	words := beam.CreateList(scope, inputData)

	uppercaseWords := beam.ParDo(scope, &UppercaseFn{}, words)
	beam.ParDo0(scope, &PrintFn{}, uppercaseWords)

	result, err := flink.Execute(context.Background(), pipeline)
	if err != nil {
		log.Fatalf("Failed to execute pipeline: %v", err)
	}

	metrics := result.Metrics().AllMetrics()
	for _, counter := range metrics.Counters() {
		fmt.Printf("Counter %s: %d\n", counter.Name(), counter.Committed)
	}

	for _, distribution := range metrics.Distributions() {
		fmt.Printf("Distribution %s: min=%d, max=%d, sum=%d, mean=%d, count=%d\n",
			distribution.Name(),
			distribution.Committed.Min,
			distribution.Committed.Max,
			distribution.Committed.Sum,
			distribution.Committed.Sum/distribution.Committed.Count,
			distribution.Committed.Count)
	}
}

Looking forward to your thoughts! 🙏

@lostluck
Copy link
Contributor Author

Sorry for missing this earlier @mohamedawnallah .

If Flink doesn't support exporting user metrics, then it's not something the Go SDK can fix by itself.

IIRC this issue is strictly for when a DoFn has metrics, and increments them in in the Once Per DoFn Instance Setup() lifecycle method. The provided code would not verify this issue by itself since it has no Setup methods on its dofns.

The default Go SDK runner, Prism, should be able to validate this issue though, if the base code reveals it. The changes would be in the exec package, ultimately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants