Skip to content
This repository has been archived by the owner on Jul 12, 2024. It is now read-only.

Commit

Permalink
Harvester improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
a-feld committed Jul 1, 2021
1 parent 8699fa2 commit b59a161
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 34 deletions.
2 changes: 1 addition & 1 deletion telemetry/events_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func testEventGroupJSON(t testing.TB, batches []Batch, expect string) {
th.Helper()
}
factory, _ := NewEventRequestFactory(WithNoDefaultKey())
reqs, err := BuildSplitRequests(batches, factory)
reqs, err := buildSplitRequests(batches, factory)
if nil != err {
t.Fatal(err)
}
Expand Down
51 changes: 28 additions & 23 deletions telemetry/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ func NewHarvester(options ...func(*Config)) (*Harvester, error) {
return nil, err
}

userAgent := "harvester " + h.config.userAgent()

h.spanRequestFactory, err = NewSpanRequestFactory(
WithInsertKey(h.config.APIKey),
withScheme(spanURL.Scheme),
WithEndpoint(spanURL.Host),
WithUserAgent(h.config.userAgent()),
WithUserAgent(userAgent),
)
if err != nil {
return nil, err
Expand All @@ -106,7 +108,7 @@ func NewHarvester(options ...func(*Config)) (*Harvester, error) {
WithInsertKey(h.config.APIKey),
withScheme(metricURL.Scheme),
WithEndpoint(metricURL.Host),
WithUserAgent(h.config.userAgent()),
WithUserAgent(userAgent),
)
if err != nil {
return nil, err
Expand All @@ -121,7 +123,7 @@ func NewHarvester(options ...func(*Config)) (*Harvester, error) {
WithInsertKey(h.config.APIKey),
withScheme(eventURL.Scheme),
WithEndpoint(eventURL.Host),
WithUserAgent(h.config.userAgent()),
WithUserAgent(userAgent),
)
if err != nil {
return nil, err
Expand All @@ -136,7 +138,7 @@ func NewHarvester(options ...func(*Config)) (*Harvester, error) {
WithInsertKey(h.config.APIKey),
withScheme(logURL.Scheme),
WithEndpoint(logURL.Host),
WithUserAgent(h.config.userAgent()),
WithUserAgent(userAgent),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -344,7 +346,7 @@ func (h *Harvester) swapOutMetrics(now time.Time) []*http.Request {
}
group := &metricGroup{Metrics: rawMetrics}
entries := []MapEntry{commonBlock, group}
reqs, err := BuildSplitRequests([]Batch{entries}, h.metricRequestFactory)
reqs, err := buildSplitRequests([]Batch{entries}, h.metricRequestFactory)
if nil != err {
h.config.logError(map[string]interface{}{
"err": err.Error(),
Expand All @@ -370,7 +372,7 @@ func (h *Harvester) swapOutSpans() []*http.Request {
entries = append(entries, &spanCommonBlock{attributes: h.commonAttributes})
}
entries = append(entries, &spanGroup{Spans: sps})
reqs, err := BuildSplitRequests([]Batch{entries}, h.spanRequestFactory)
reqs, err := buildSplitRequests([]Batch{entries}, h.spanRequestFactory)
if nil != err {
h.config.logError(map[string]interface{}{
"err": err.Error(),
Expand All @@ -393,7 +395,7 @@ func (h *Harvester) swapOutEvents() []*http.Request {
group := &eventGroup{
Events: events,
}
reqs, err := BuildSplitRequests([]Batch{{group}}, h.eventRequestFactory)
reqs, err := buildSplitRequests([]Batch{{group}}, h.eventRequestFactory)
if nil != err {
h.config.logError(map[string]interface{}{
"err": err.Error(),
Expand All @@ -419,7 +421,7 @@ func (h *Harvester) swapOutLogs() []*http.Request {
entries = append(entries, &logCommonBlock{attributes: h.commonAttributes})
}
entries = append(entries, &logGroup{Logs: logs})
reqs, err := BuildSplitRequests([]Batch{entries}, h.logRequestFactory)
reqs, err := buildSplitRequests([]Batch{entries}, h.logRequestFactory)
if nil != err {
h.config.logError(map[string]interface{}{
"err": err.Error(),
Expand All @@ -430,8 +432,9 @@ func (h *Harvester) swapOutLogs() []*http.Request {
return reqs
}

func harvestRequest(req *http.Request, cfg *Config) {
func harvestRequest(req *http.Request, cfg *Config, wg *sync.WaitGroup) {
var attempts int
defer wg.Done()
for {
cfg.logDebug(map[string]interface{}{
"event": "data post",
Expand Down Expand Up @@ -474,6 +477,18 @@ func harvestRequest(req *http.Request, cfg *Config) {
case <-tmr.C:
case <-req.Context().Done():
tmr.Stop()
if err := req.Context().Err(); err != nil {
// NOTE: It is possible that the context was
// cancelled/timedout right after the request
// successfully finished. In that case, we will
// erroneously log a message. I (will) don't think
// that's worth trying to engineer around.
cfg.logError(map[string]interface{}{
"event": "harvest cancelled or timed out",
"message": "dropping data",
"context-error": err.Error(),
})
}
return
}
attempts++
Expand Down Expand Up @@ -502,24 +517,14 @@ func (h *Harvester) HarvestNow(ct context.Context) {
reqs = append(reqs, h.swapOutSpans()...)
reqs = append(reqs, h.swapOutEvents()...)
reqs = append(reqs, h.swapOutLogs()...)
wg := sync.WaitGroup{}

for _, req := range reqs {
wg.Add(1)
httpRequest := req.WithContext(ctx)
harvestRequest(httpRequest, &h.config)
if err := ctx.Err(); err != nil {
// NOTE: It is possible that the context was
// cancelled/timedout right after the request
// successfully finished. In that case, we will
// erroneously log a message. I (will) don't think
// that's worth trying to engineer around.
h.config.logError(map[string]interface{}{
"event": "harvest cancelled or timed out",
"message": "dropping data",
"context-error": err.Error(),
})
return
}
go harvestRequest(httpRequest, &h.config, &wg)
}
wg.Wait()
}

func harvestRoutine(h *Harvester) {
Expand Down
2 changes: 1 addition & 1 deletion telemetry/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestNewRequestHeaders(t *testing.T) {
cfg.Product = "myProduct"
cfg.ProductVersion = "0.1.0"
})
expectUserAgent := "NewRelic-Go-TelemetrySDK/" + version + " myProduct/0.1.0"
expectUserAgent := "NewRelic-Go-TelemetrySDK/" + version + " harvester myProduct/0.1.0"
h.RecordSpan(Span{TraceID: "id", ID: "id"})
h.RecordMetric(Gauge{})

Expand Down
2 changes: 1 addition & 1 deletion telemetry/logs_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func testLogGroupJSON(t testing.TB, batches []Batch, expect string) {
th.Helper()
}
factory, _ := NewLogRequestFactory(WithNoDefaultKey())
reqs, err := BuildSplitRequests(batches, factory)
reqs, err := buildSplitRequests(batches, factory)
if nil != err {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions telemetry/metrics_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestMetrics(t *testing.T) {
}]`)

factory, _ := NewMetricRequestFactory(WithNoDefaultKey())
reqs, err := BuildSplitRequests([]Batch{{commonBlock, NewMetricGroup(metrics)}}, factory)
reqs, err := buildSplitRequests([]Batch{{commonBlock, NewMetricGroup(metrics)}}, factory)
if err != nil {
t.Error("error creating request", err)
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func testGroupJSON(t testing.TB, batches []Batch, expect string) {
th.Helper()
}
factory, _ := NewMetricRequestFactory(WithNoDefaultKey())
reqs, err := BuildSplitRequests(batches, factory)
reqs, err := buildSplitRequests(batches, factory)
if nil != err {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions telemetry/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func requestNeedsSplit(r *http.Request) bool {
return r.ContentLength >= maxCompressedSizeBytes
}

// BuildSplitRequests converts a []Batch into a collection of appropiately sized requests
func BuildSplitRequests(batches []Batch, factory RequestFactory) ([]*http.Request, error) {
// buildSplitRequests converts a []Batch into a collection of appropiately sized requests
func buildSplitRequests(batches []Batch, factory RequestFactory) ([]*http.Request, error) {
return newRequestsInternal(batches, factory, requestNeedsSplit)
}

Expand Down
6 changes: 3 additions & 3 deletions telemetry/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestNeedsToSplitBatchesAndEntries(t *testing.T) {
rawData: randomJSON(maxCompressedSizeBytes),
},
}
reqs, err := BuildSplitRequests([]Batch{group1, group2}, testFactory())
reqs, err := buildSplitRequests([]Batch{group1, group2}, testFactory())
if err != nil {
t.Fatal(err)
}
Expand All @@ -226,7 +226,7 @@ func TestNeedsToSplitBatchesAndEntries(t *testing.T) {
func TestLargeRequestNeedsSplit(t *testing.T) {
js := randomJSON(4 * maxCompressedSizeBytes)
payloadEntry := testUnsplittablePayloadEntry{rawData: js}
reqs, err := BuildSplitRequests([]Batch{{&payloadEntry}}, testFactory())
reqs, err := buildSplitRequests([]Batch{{&payloadEntry}}, testFactory())
if reqs != nil {
t.Error(reqs)
}
Expand All @@ -238,7 +238,7 @@ func TestLargeRequestNeedsSplit(t *testing.T) {
func TestLargeRequestNoSplit(t *testing.T) {
js := randomJSON(maxCompressedSizeBytes / 2)
payloadEntry := testUnsplittablePayloadEntry{rawData: js}
reqs, err := BuildSplitRequests([]Batch{{&payloadEntry}}, testFactory())
reqs, err := buildSplitRequests([]Batch{{&payloadEntry}}, testFactory())
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion telemetry/spans_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func testSpanGroupJSON(t testing.TB, batches []Batch, expect string) {
th.Helper()
}
factory, _ := NewSpanRequestFactory(WithNoDefaultKey())
reqs, err := BuildSplitRequests(batches, factory)
reqs, err := buildSplitRequests(batches, factory)
if nil != err {
t.Fatal(err)
}
Expand Down

0 comments on commit b59a161

Please sign in to comment.