Skip to content

Commit 9ff53cf

Browse files
committedOct 15, 2024
fix: improve AppendIngest with TeeReader and update AppendableFunc interface
1 parent 0b4d39d commit 9ff53cf

File tree

4 files changed

+109
-8
lines changed

4 files changed

+109
-8
lines changed
 

‎docs/sources/reference/components/pyroscope/pyroscope.receive_http.md

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ You can also create multiple `pyroscope.receive_http` components with different
111111

112112
- Components that export [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-exporters)
113113

114+
114115
{{< admonition type="note" >}}
115116
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
116117
Refer to the linked documentation for more details.

‎internal/component/pyroscope/appender.go

+17-6
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,6 @@ func (a *appender) Append(ctx context.Context, labels labels.Labels, samples []*
122122
return multiErr
123123
}
124124

125-
type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error
126-
127-
func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
128-
return f(ctx, labels, samples)
129-
}
130-
131125
// AppendIngest satisfies the AppenderIngest interface.
132126
func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) error {
133127
now := time.Now()
@@ -143,3 +137,20 @@ func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) e
143137
}
144138
return multiErr
145139
}
140+
141+
type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error
142+
143+
func (f AppendableFunc) Appender() Appender {
144+
return appenderFunc(f)
145+
}
146+
147+
type appenderFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error
148+
149+
func (f appenderFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
150+
return f(ctx, labels, samples)
151+
}
152+
153+
func (f appenderFunc) AppendIngest(ctx context.Context, profile *IncomingProfile) error {
154+
// This is a no-op implementation since AppendableFunc doesn't handle IncomingProfile
155+
return nil
156+
}

‎internal/component/pyroscope/write/write.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package write
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
8+
"io"
79
"net/http"
810
"net/url"
911
"path"
@@ -338,7 +340,10 @@ func (e *PyroscopeWriteError) Error() string {
338340

339341
// AppendIngest implements the pyroscope.Appender interface.
340342
func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error {
341-
for _, endpoint := range f.config.Endpoints {
343+
var buf bytes.Buffer
344+
tee := io.TeeReader(profile.Body, &buf)
345+
346+
for i, endpoint := range f.config.Endpoints {
342347
u, err := url.Parse(endpoint.URL)
343348
if err != nil {
344349
return fmt.Errorf("parse endpoint URL: %w", err)
@@ -347,7 +352,14 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
347352
u.Path = path.Join(u.Path, profile.URL.Path)
348353
u.RawQuery = profile.URL.RawQuery
349354

350-
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), profile.Body)
355+
var bodyReader io.Reader
356+
if i == 0 {
357+
bodyReader = tee
358+
} else {
359+
bodyReader = bytes.NewReader(buf.Bytes())
360+
}
361+
362+
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bodyReader)
351363
if err != nil {
352364
return fmt.Errorf("create request: %w", err)
353365
}

‎internal/component/pyroscope/write/write_test.go

+77
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package write
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
7+
"io"
68
"net/http"
79
"net/http/httptest"
10+
"net/url"
811
"sync"
912
"testing"
1013
"time"
@@ -253,3 +256,77 @@ func TestBadAlloyConfig(t *testing.T) {
253256
err := syntax.Unmarshal([]byte(exampleAlloyConfig), &args)
254257
require.ErrorContains(t, err, "at most one of basic_auth, authorization, oauth2, bearer_token & bearer_token_file must be configured")
255258
}
259+
260+
func Test_Write_AppendIngest(t *testing.T) {
261+
var (
262+
export Exports
263+
argument = DefaultArguments()
264+
appendCount = atomic.NewInt32(0)
265+
serverCount = int32(3)
266+
servers = make([]*httptest.Server, serverCount)
267+
endpoints = make([]*EndpointOptions, 0, serverCount)
268+
)
269+
270+
testData := []byte("test-profile-data")
271+
272+
handlerFn := func(expectedPath, expectedQuery string) http.HandlerFunc {
273+
return func(w http.ResponseWriter, r *http.Request) {
274+
appendCount.Inc()
275+
require.Equal(t, expectedPath, r.URL.Path, "Unexpected path")
276+
require.Equal(t, expectedQuery, r.URL.RawQuery, "Unexpected query")
277+
require.Equal(t, "test-value", r.Header.Get("X-Test-Header"), "Unexpected header value")
278+
body, err := io.ReadAll(r.Body)
279+
require.NoError(t, err, "Failed to read request body")
280+
require.Equal(t, testData, body, "Unexpected body content")
281+
w.WriteHeader(http.StatusOK)
282+
}
283+
}
284+
285+
for i := int32(0); i < serverCount; i++ {
286+
servers[i] = httptest.NewServer(handlerFn("/ingest", "key=value"))
287+
endpoints = append(endpoints, &EndpointOptions{
288+
URL: servers[i].URL,
289+
RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout,
290+
Headers: map[string]string{
291+
"X-Test-Header": "test-value",
292+
},
293+
})
294+
}
295+
defer func() {
296+
for _, s := range servers {
297+
s.Close()
298+
}
299+
}()
300+
301+
argument.Endpoints = endpoints
302+
303+
// Create the receiver
304+
var wg sync.WaitGroup
305+
wg.Add(1)
306+
c, err := New(component.Options{
307+
ID: "test-write",
308+
Logger: util.TestAlloyLogger(t),
309+
Registerer: prometheus.NewRegistry(),
310+
OnStateChange: func(e component.Exports) {
311+
defer wg.Done()
312+
export = e.(Exports)
313+
},
314+
}, argument)
315+
require.NoError(t, err, "Failed to create component")
316+
317+
ctx, cancel := context.WithCancel(context.Background())
318+
defer cancel()
319+
go c.Run(ctx)
320+
wg.Wait() // wait for the state change to happen
321+
require.NotNil(t, export.Receiver, "Receiver is nil")
322+
323+
incomingProfile := &pyroscope.IncomingProfile{
324+
Body: io.NopCloser(bytes.NewReader(testData)),
325+
Headers: http.Header{"Content-Type": []string{"application/octet-stream"}},
326+
URL: &url.URL{Path: "/ingest", RawQuery: "key=value"},
327+
}
328+
329+
err = export.Receiver.Appender().AppendIngest(context.Background(), incomingProfile)
330+
require.NoError(t, err)
331+
require.Equal(t, serverCount, appendCount.Load())
332+
}

0 commit comments

Comments
 (0)