From c189308b077e014efccf0856c6a1cfc6447fcde7 Mon Sep 17 00:00:00 2001 From: Brace Sproul Date: Mon, 15 Jan 2024 15:29:32 -0800 Subject: [PATCH] core[minor]: Add ability for runnable passthrough to call a func (#3998) * core[minor]: Add ability for runnable passthrough to call a func * chore: lint files * add tests & update typing * Fix implementation of passthrough func --------- Co-authored-by: Nuno Campos --- langchain-core/src/runnables/passthrough.ts | 47 +++++++++++++++++-- .../tests/runnable_passthrough.test.ts | 35 ++++++++++++++ 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/langchain-core/src/runnables/passthrough.ts b/langchain-core/src/runnables/passthrough.ts index 5555947d3d8d..3074eff4b838 100644 --- a/langchain-core/src/runnables/passthrough.ts +++ b/langchain-core/src/runnables/passthrough.ts @@ -1,3 +1,4 @@ +import { concat } from "../utils/stream.js"; import { Runnable, RunnableAssign, @@ -6,6 +7,11 @@ import { } from "./base.js"; import type { RunnableConfig } from "./config.js"; +// eslint-disable-next-line @typescript-eslint/no-explicit-any +type RunnablePassthroughFunc = + | ((input: RunInput) => void) + | ((input: RunInput, config?: RunnableConfig) => void); + /** * A runnable to passthrough inputs unchanged or with additional keys. * @@ -44,10 +50,23 @@ export class RunnablePassthrough extends Runnable< lc_serializable = true; + func?: RunnablePassthroughFunc; + + constructor(fields?: { func?: RunnablePassthroughFunc }) { + super(fields); + if (fields) { + this.func = fields.func; + } + } + async invoke( input: RunInput, options?: Partial ): Promise { + if (this.func) { + this.func(input); + } + return this._callWithConfig( (input: RunInput) => Promise.resolve(input), input, @@ -55,15 +74,37 @@ export class RunnablePassthrough extends Runnable< ); } - transform( + async *transform( generator: AsyncGenerator, options: Partial ): AsyncGenerator { - return this._transformStreamWithConfig( + let finalOutput: RunInput | undefined; + let finalOutputSupported = true; + + for await (const chunk of this._transformStreamWithConfig( generator, (input: AsyncGenerator) => input, options - ); + )) { + yield chunk; + if (finalOutputSupported) { + if (finalOutput === undefined) { + finalOutput = chunk; + } else { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + finalOutput = concat(finalOutput, chunk as any); + } catch { + finalOutput = undefined; + finalOutputSupported = false; + } + } + } + } + + if (this.func && finalOutput !== undefined) { + this.func(finalOutput); + } } /** diff --git a/langchain-core/src/runnables/tests/runnable_passthrough.test.ts b/langchain-core/src/runnables/tests/runnable_passthrough.test.ts index 28336e31f197..18f6e8635fd0 100644 --- a/langchain-core/src/runnables/tests/runnable_passthrough.test.ts +++ b/langchain-core/src/runnables/tests/runnable_passthrough.test.ts @@ -46,3 +46,38 @@ test("RunnablePassthrough can call .assign as the first step with proper typing" console.log(result); expect(result).toEqual({ outputValue: "testing2" }); }); + +test("RunnablePassthrough can invoke a function without modifying passthrough value", async () => { + let wasCalled = false; + const addOne = (input: number) => { + wasCalled = true; + return input + 1; + }; + const passthrough = new RunnablePassthrough({ + func: addOne, + }); + const result = await passthrough.invoke(1); + expect(result).toEqual(1); + expect(wasCalled).toEqual(true); +}); + +test("RunnablePassthrough can transform a function as constructor args", async () => { + let wasCalled = false; + const addOne = (input: number) => { + wasCalled = true; + return input + 1; + }; + const passthrough = new RunnablePassthrough({ + func: addOne, + }); + async function* generateNumbers() { + yield 1; + } + const transformedGenerator = passthrough.transform(generateNumbers(), {}); + const results = []; + for await (const value of transformedGenerator) { + results.push(value); + } + expect(results).toEqual([1]); + expect(wasCalled).toEqual(true); +});