From 18e40d09150fa9cafd442e42770288f5a51cd740 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:53:56 -0700 Subject: [PATCH] [azopenai] Switch EventReader from using bufio.Scanner to bufio.Reader (#22703) bufio.Scanner has an implicit max size for it's internal buffer but SSE has no restriction on how large chunks can be. We need to allow for arbitrarily large chunks and, luckily, bufio.Reader can already handle that. --- sdk/ai/azopenai/CHANGELOG.md | 2 ++ sdk/ai/azopenai/event_reader.go | 31 ++++++++++++++++------------ sdk/ai/azopenai/event_reader_test.go | 29 +++++++++++++++++++++++++- 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/sdk/ai/azopenai/CHANGELOG.md b/sdk/ai/azopenai/CHANGELOG.md index 57abf4d52157..07165e045a3f 100644 --- a/sdk/ai/azopenai/CHANGELOG.md +++ b/sdk/ai/azopenai/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- EventReader can now handle chunks of text larger than 64k. Thank you @ChrisTrenkamp for finding the issue and suggesting a fix. (PR#22703) + ### Other Changes ## 0.5.1 (2024-04-02) diff --git a/sdk/ai/azopenai/event_reader.go b/sdk/ai/azopenai/event_reader.go index 6c68e88a601d..78c00d91cfe4 100644 --- a/sdk/ai/azopenai/event_reader.go +++ b/sdk/ai/azopenai/event_reader.go @@ -16,20 +16,33 @@ import ( // EventReader streams events dynamically from an OpenAI endpoint. type EventReader[T any] struct { - reader io.ReadCloser // Required for Closing - scanner *bufio.Scanner + reader io.ReadCloser // Required for Closing + bufioReader *bufio.Reader + zeroT T } func newEventReader[T any](r io.ReadCloser) *EventReader[T] { - return &EventReader[T]{reader: r, scanner: bufio.NewScanner(r)} + return &EventReader[T]{ + reader: r, + bufioReader: bufio.NewReader(r), + } } // Read reads the next event from the stream. // Returns io.EOF when there are no further events. func (er *EventReader[T]) Read() (T, error) { // https://html.spec.whatwg.org/multipage/server-sent-events.html - for er.scanner.Scan() { // Scan while no error - line := er.scanner.Text() // Get the line & interpret the event stream: + + for { + line, err := er.bufioReader.ReadString('\n') + + if err != nil { + if errors.Is(err, io.EOF) { + return er.zeroT, errors.New("incomplete stream") + } + + return er.zeroT, err + } if line == "" || line[0] == ':' { // If the line is blank or is a comment, skip it continue @@ -52,14 +65,6 @@ func (er *EventReader[T]) Read() (T, error) { // Unreachable } } - - scannerErr := er.scanner.Err() - - if scannerErr == nil { - return *new(T), errors.New("incomplete stream") - } - - return *new(T), scannerErr } // Close closes the EventReader and any applicable inner stream state. diff --git a/sdk/ai/azopenai/event_reader_test.go b/sdk/ai/azopenai/event_reader_test.go index 40e0853160d8..352c9b599228 100644 --- a/sdk/ai/azopenai/event_reader_test.go +++ b/sdk/ai/azopenai/event_reader_test.go @@ -7,6 +7,7 @@ package azopenai import ( + "fmt" "io" "strings" "testing" @@ -43,7 +44,7 @@ func TestEventReader_BadReader(t *testing.T) { } func TestEventReader_StreamIsClosedBeforeDone(t *testing.T) { - buff := strings.NewReader("data: {}") + buff := strings.NewReader("data: {}\n") eventReader := newEventReader[ChatCompletions](io.NopCloser(buff)) @@ -75,3 +76,29 @@ func TestEventReader_SpacesAroundAreas(t *testing.T) { require.NotEmpty(t, evt) require.Equal(t, "without-spaces", *evt.Choices[0].Delta.Content) } + +func TestEventReader_CanReadHugeChunk(t *testing.T) { + // Ran into this with a customer that gets _huge_ chunks of text in their stream: + // https://github.com/Azure/azure-sdk-for-go/pull/22646 + // bufio.Scanner has a limitation of 64k (which is huge, but not big enough). + + bigBytes := make([]byte, 64*1024+1) + + for i := 0; i < len(bigBytes); i++ { + bigBytes[i] = 'A' + } + + buff := strings.NewReader( + fmt.Sprintf("data: {\"name\":\"chatcmpl-7Z4kUpXX6HN85cWY28IXM4EwemLU3\",\"object\":\"chat.completion.chunk\",\"created\":1688594090,\"model\":\"gpt-4-0613\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"%s\"},\"finish_reason\":null}]}\n", string(bigBytes)) + fmt.Sprintf("data: {\"name\":\"chatcmpl-7Z4kUpXX6HN85cWY28IXM4EwemLU3\",\"object\":\"chat.completion.chunk\",\"created\":1688594090,\"model\":\"gpt-4-0613\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"%s\"},\"finish_reason\":null}]}\n", "small message"), + ) + + eventReader := newEventReader[ChatCompletions](io.NopCloser(buff)) + + evt, err := eventReader.Read() + require.NoError(t, err) + require.Equal(t, string(bigBytes), *evt.Choices[0].Delta.Content) + + evt, err = eventReader.Read() + require.NoError(t, err) + require.Equal(t, "small message", *evt.Choices[0].Delta.Content) +}