Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafkabp: Add AWSAvailabilityZoneIDRackID rack implementation #662

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 63 additions & 9 deletions kafkabp/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type RackIDFunc func() string
//
// - "aws": AWSAvailabilityZoneRackID.
//
// - "aws-zone-id": AWSAvailabilityZoneIDRackID.
//
// - "http://url" or "https://url": SimpleHTTPRackID with
// log.DefaultWrapper and prometheus counter of
// kafkabp_http_rack_id_failure_total, default timeout & limit, and given URL.
Expand All @@ -58,6 +60,9 @@ func (r *RackIDFunc) UnmarshalText(text []byte) error {
case "aws":
*r = AWSAvailabilityZoneRackID
return nil
case "aws-zone-id":
*r = AWSAvailabilityZoneIDRackID
return nil
}

// http cases
Expand Down Expand Up @@ -181,13 +186,11 @@ func doHTTP(r *http.Request, readLimit int64) (string, error) {
return body, nil
}

var awsRackID = sync.OnceValues(func() (string, error) {
func awsURL(url string) (string, error) {
const (
// References:
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-categories.html
tokenURL = "http://169.254.169.254/latest/api/token"
azURL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"

timeout = time.Second
readLimit = 1024
Expand All @@ -199,31 +202,43 @@ var awsRackID = sync.OnceValues(func() (string, error) {
token, err := func(ctx context.Context) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, tokenURL, http.NoBody)
if err != nil {
return "", fmt.Errorf("kafkabp.awsRackID: failed to create request from url %q: %w", tokenURL, err)
return "", fmt.Errorf("kafkabp.awsURL: failed to create request from url %q: %w", tokenURL, err)
}
req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "21600")

token, err := doHTTP(req, readLimit)
if err != nil {
return "", fmt.Errorf("kafkabp.awsRackID: failed to get AWS IMDS v2 token from url %q: %w", tokenURL, err)
return "", fmt.Errorf("kafkabp.awsURL: failed to get AWS IMDS v2 token from url %q: %w", tokenURL, err)
}
return token, nil
}(ctx)
if err != nil {
return "", err
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, azURL, http.NoBody)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
if err != nil {
return "", fmt.Errorf("kafkabp.awsRackID: failed to create request from url %q: %w", azURL, err)
return "", fmt.Errorf("kafkabp.awsURL: failed to create request from url %q: %w", url, err)
}
req.Header.Set("X-aws-ec2-metadata-token", token)

id, err := doHTTP(req, readLimit)
if err != nil {
err = fmt.Errorf("kafkabp.awsRackID: failed to get AWS availability zone from url %q: %w", azURL, err)
err = fmt.Errorf("kafkabp.awsURL: failed to query AWS url %q: %w", url, err)
}
return id, err
}

var awsZoneRackID = sync.OnceValues(func() (string, error) {
// Ref: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html#instancedata-data-categories
const zoneURL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
return awsURL(zoneURL)
})

var awsZoneIDRackID = sync.OnceValues(func() (string, error) {
// Ref: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html#instancedata-data-categories
const zoneIDURL = "http://169.254.169.254/latest/meta-data/placement/availability-zone-id"
return awsURL(zoneIDURL)
})

// AWSAvailabilityZoneRackID is a RackIDFunc implementation that returns AWS
Expand Down Expand Up @@ -251,11 +266,50 @@ var awsRackID = sync.OnceValues(func() (string, error) {
// the same error will be logged at slog's warning level every time
// AWSAvailabilityZoneRackID is called.
func AWSAvailabilityZoneRackID() string {
id, err := awsRackID()
id, err := awsZoneRackID()
if err != nil {
awsRackFailure.Inc()
slog.Warn("Failed to get AWS availability zone as rack id", "err", err)
return ""
}
return id
}

// AWSAvailabilityZoneIDRackID is a RackIDFunc implementation that returns AWS
// availability zone id as the rack id.
//
// It also caches the result globally, so if you have more than one
// AWSAvailabilityZoneRackID in your process only the first one actually makes
// the HTTP request, for example:
//
// consumer1 := kafkabp.NewConsumer(kafkabp.ConsumerConfig{
// RackID: kafkabp.AWSAvailabilityZoneIDRackID,
// Topic: "topic1",
// // other configs
// })
// consumer2 := kafkabp.NewConsumer(kafkabp.ConsumerConfig{
// RackID: kafkabp.AWSAvailabilityZoneIDRackID,
// Topic: "topic2",
// // other configs
// })
//
// It uses AWS instance metadata HTTP API with 1second overall timeout and 1024
// HTTP response read limits..
//
// If there was an error retrieving rack id through AWS instance metadata API,
// the same error will be logged at slog's warning level every time
// AWSAvailabilityZoneRackID is called.
//
// See [1] for differences between AWS availability zone and availability zone
// id.
//
// [1]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-availability-zones
func AWSAvailabilityZoneIDRackID() string {
id, err := awsZoneIDRackID()
if err != nil {
awsRackFailure.Inc()
slog.Warn("Failed to get AWS availability zone id as rack id", "err", err)
Copy link
Contributor

@kylelemons kylelemons Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔕 LogDNA will not show the error message to users when the error is moved into the structured portion of the log. I typically recommend having the error as part of the human portion instead, since error is not really adding things to filter against or aggregate beyond what you can already do with the message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my experience is the opposite, that with errors in kv pair makes it easier to find exact errors (it's probably not that helpful if you want to search for partial error messages) on logdna, for use cases like "how many time did this error happen, did we have a spike during this time window?".

with a better logging ui (like GCP's log explorer) you can add an kv value to the summary line in the ui if needed, which is the best of both worlds, but we just don't have that unfortunately.

return ""
}
return id
}
Loading