diff --git a/package.json b/package.json index a8fc12a978753..c8fa01e81009e 100644 --- a/package.json +++ b/package.json @@ -1550,4 +1550,4 @@ "xmlbuilder": "13.0.2", "yargs": "^15.4.1" } -} +} \ No newline at end of file diff --git a/x-pack/plugins/observability_ai_assistant/jest.config.js b/x-pack/plugins/observability_ai_assistant/jest.config.js index e4a140341d07f..5eaabe2dcf492 100644 --- a/x-pack/plugins/observability_ai_assistant/jest.config.js +++ b/x-pack/plugins/observability_ai_assistant/jest.config.js @@ -5,10 +5,9 @@ * 2.0. */ -const path = require('path'); - module.exports = { preset: '@kbn/test', - rootDir: path.resolve(__dirname, '../../..'), + rootDir: '../../..', roots: ['/x-pack/plugins/observability_ai_assistant'], + setupFiles: ['/x-pack/plugins/observability_ai_assistant/.storybook/jest_setup.js'], }; diff --git a/x-pack/plugins/observability_ai_assistant/public/hooks/use_chat.ts b/x-pack/plugins/observability_ai_assistant/public/hooks/use_chat.ts index 7e1f0d8cf78bf..a15c867d59635 100644 --- a/x-pack/plugins/observability_ai_assistant/public/hooks/use_chat.ts +++ b/x-pack/plugins/observability_ai_assistant/public/hooks/use_chat.ts @@ -60,8 +60,8 @@ export function useChat({ messages, connectorId }: { messages: Message[]; connec assistant .chat({ messages, connectorId, signal: controller.signal }) .then((response$) => { - return new Promise((resolve, reject) => { - response$.pipe(delay(50)).subscribe({ + new Promise((resolve, reject) => { + const subscription = response$.pipe(delay(50)).subscribe({ next: (chunk) => { partialResponse.content += chunk.choices[0].delta.content ?? ''; partialResponse.function_call.name += @@ -77,6 +77,10 @@ export function useChat({ messages, connectorId }: { messages: Message[]; connec resolve(); }, }); + + controller.signal.addEventListener('abort', () => { + subscription.unsubscribe(); + }); }); }) .catch((err) => { diff --git a/x-pack/plugins/observability_ai_assistant/public/plugin.ts b/x-pack/plugins/observability_ai_assistant/public/plugin.ts index 6b54000115c79..607f54a3c6da7 100644 --- a/x-pack/plugins/observability_ai_assistant/public/plugin.ts +++ b/x-pack/plugins/observability_ai_assistant/public/plugin.ts @@ -5,20 +5,16 @@ * 2.0. */ -import type { CoreStart, HttpResponse, Plugin, PluginInitializerContext } from '@kbn/core/public'; +import type { CoreStart, Plugin, PluginInitializerContext } from '@kbn/core/public'; import type { Logger } from '@kbn/logging'; -import { filter, map } from 'rxjs'; -import type { Message } from '../common/types'; -import { createCallObservabilityAIAssistantAPI } from './api'; +import { createService } from './service/create_service'; import type { ConfigSchema, - CreateChatCompletionResponseChunk, ObservabilityAIAssistantPluginSetup, ObservabilityAIAssistantPluginSetupDependencies, ObservabilityAIAssistantPluginStart, ObservabilityAIAssistantPluginStartDependencies, } from './types'; -import { readableStreamReaderIntoObservable } from './utils/readable_stream_reader_into_observable'; export class ObservabilityAIAssistantPlugin implements @@ -38,52 +34,6 @@ export class ObservabilityAIAssistantPlugin } start(coreStart: CoreStart): ObservabilityAIAssistantPluginStart { - const client = createCallObservabilityAIAssistantAPI(coreStart); - - return { - isEnabled: () => { - return true; - }, - async chat({ - connectorId, - messages, - signal, - }: { - connectorId: string; - messages: Message[]; - signal: AbortSignal; - }) { - const response = (await client('POST /internal/observability_ai_assistant/chat', { - params: { - body: { - messages, - connectorId, - }, - }, - signal, - asResponse: true, - rawResponse: true, - })) as unknown as HttpResponse; - - const status = response.response?.status; - - if (!status || status >= 400) { - throw new Error(response.response?.statusText || 'Unexpected error'); - } - - const reader = response.response.body?.getReader(); - - if (!reader) { - throw new Error('Could not get reader from response'); - } - - return readableStreamReaderIntoObservable(reader).pipe( - map((line) => line.substring(6)), - filter((line) => !!line && line !== '[DONE]'), - map((line) => JSON.parse(line) as CreateChatCompletionResponseChunk) - ); - }, - callApi: client, - }; + return createService(coreStart); } } diff --git a/x-pack/plugins/observability_ai_assistant/public/service/create_service.test.ts b/x-pack/plugins/observability_ai_assistant/public/service/create_service.test.ts new file mode 100644 index 0000000000000..010503e7abf4f --- /dev/null +++ b/x-pack/plugins/observability_ai_assistant/public/service/create_service.test.ts @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { CoreStart } from '@kbn/core/public'; +import { ReadableStream } from 'stream/web'; +import { ObservabilityAIAssistantService } from '../types'; +import { createService } from './create_service'; + +describe('createService', () => { + describe('chat', () => { + let service: ObservabilityAIAssistantService; + + const httpPostSpy = jest.fn(); + + function respondWithChunks({ chunks, status = 200 }: { status?: number; chunks: string[][] }) { + const response = { + response: { + status, + body: new ReadableStream({ + start(controller) { + chunks.forEach((chunk) => { + controller.enqueue(new TextEncoder().encode(chunk.join('\n'))); + }); + controller.close(); + }, + }), + }, + }; + + httpPostSpy.mockResolvedValueOnce(response); + } + + async function chat(signal: AbortSignal = new AbortController().signal) { + const response = await service.chat({ messages: [], connectorId: '', signal }); + + return response; + } + + beforeEach(() => { + service = createService({ + http: { + post: httpPostSpy, + }, + } as unknown as CoreStart); + }); + + afterEach(() => { + httpPostSpy.mockReset(); + }); + + it('correctly parses a stream of JSON lines', async () => { + const chunk1 = ['data: {}', 'data: {}']; + const chunk2 = ['data: {}', 'data: [DONE]']; + + respondWithChunks({ chunks: [chunk1, chunk2] }); + + const response$ = await chat(); + + const results: any = []; + response$.subscribe({ + next: (data) => results.push(data), + complete: () => { + expect(results).toHaveLength(3); + }, + }); + }); + + it('correctly buffers partial lines', async () => { + const chunk1 = ['data: {}', 'data: {']; + const chunk2 = ['}', 'data: [DONE]']; + + respondWithChunks({ chunks: [chunk1, chunk2] }); + + const response$ = await chat(); + + const results: any = []; + response$.subscribe({ + next: (data) => results.push(data), + complete: () => { + expect(results).toHaveLength(2); + }, + }); + }); + + it('propagates invalid requests as an error', () => { + respondWithChunks({ status: 400, chunks: [] }); + + expect(() => chat()).rejects.toThrowErrorMatchingInlineSnapshot(`"Unexpected error"`); + }); + + it('propagates JSON parsing errors', async () => { + const chunk1 = ['data: {}', 'data: invalid json']; + + respondWithChunks({ chunks: [chunk1] }); + + const response$ = await chat(); + + response$.subscribe({ + error: (err) => { + expect(err).toBeInstanceOf(SyntaxError); + }, + }); + }); + }); +}); diff --git a/x-pack/plugins/observability_ai_assistant/public/service/create_service.ts b/x-pack/plugins/observability_ai_assistant/public/service/create_service.ts new file mode 100644 index 0000000000000..a27d390706c11 --- /dev/null +++ b/x-pack/plugins/observability_ai_assistant/public/service/create_service.ts @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { CoreStart, HttpResponse } from '@kbn/core/public'; +import { filter, map } from 'rxjs'; +import type { Message } from '../../common'; +import { createCallObservabilityAIAssistantAPI } from '../api'; +import { CreateChatCompletionResponseChunk, ObservabilityAIAssistantService } from '../types'; +import { readableStreamReaderIntoObservable } from '../utils/readable_stream_reader_into_observable'; + +export function createService(coreStart: CoreStart): ObservabilityAIAssistantService { + const client = createCallObservabilityAIAssistantAPI(coreStart); + + return { + isEnabled: () => { + return true; + }, + async chat({ + connectorId, + messages, + signal, + }: { + connectorId: string; + messages: Message[]; + signal: AbortSignal; + }) { + const response = (await client('POST /internal/observability_ai_assistant/chat', { + params: { + body: { + messages, + connectorId, + }, + }, + signal, + asResponse: true, + rawResponse: true, + })) as unknown as HttpResponse; + + const status = response.response?.status; + + if (!status || status >= 400) { + throw new Error(response.response?.statusText || 'Unexpected error'); + } + + const reader = response.response.body?.getReader(); + + if (!reader) { + throw new Error('Could not get reader from response'); + } + + return readableStreamReaderIntoObservable(reader).pipe( + map((line) => line.substring(6)), + filter((line) => !!line && line !== '[DONE]'), + map((line) => JSON.parse(line) as CreateChatCompletionResponseChunk) + ); + }, + callApi: client, + }; +}