-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhsage_workflow.py
347 lines (289 loc) · 10 KB
/
hsage_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
from agent_configs import (
COHERE_MODEL,
COHERE_API_KEY,
PHOENIX_API_KEY,
)
from llama_index.core.tools import FunctionTool
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
Context,
)
from typing import Optional, Any
from hsage import (
query_db,
find_test,
test_example,
explain,
)
import cohere
import asyncio
import llama_index.core
import os
from hsage_cli import pretty_print_example, pretty_print_tests
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
llama_index.core.set_global_handler(
"arize_phoenix", endpoint="https://llamatrace.com/v1/traces"
)
# Initialize Cohere client
co = cohere.Client(api_key=COHERE_API_KEY)
# Define custom events
class ToolCallEvent(Event):
"""
Event for tool calls.
"""
tool_name: str
arguments: dict
from_longer_workflow: Optional[bool] = False
class TestRecommendationEvent(Event):
"""
Event for test recommendations.
"""
situation: str
class ExampleCreationEvent(Event):
"""
Event for example creation.
"""
test_name: str
situation: Optional[str] = None
class FinalResponseEvent(Event):
"""
Event for final responses.
"""
response: str
class HelperEvent(Event):
"""
Helps pass around tool call results that are not StopEvents
We use this to collect results from async tool calls for
Test Recommendation and Example Creation
TODO: figure out the best way to handle this... maybe via context?
"""
result: Any
# additional_args: Optional[dict] = None
# Define available tools
TOOLS = {
"query_db": query_db,
"find_test": find_test,
"test_example": test_example,
"explain": explain,
}
# import function tool
ASYNC_TOOLS = {
"async_test_example": FunctionTool.from_defaults(
fn=test_example,
description="Creates a single example test \
for a given test name and situation.",
)
}
cohere_tools = [
{
"name": "query_db",
"description": "Queries a database using an embedding model and \
returns relevant results.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query to search \
for in the database.",
}
},
"required": ["query"],
},
},
{
"name": "find_test",
"description": "Finds a test for a given situation.",
"parameters": {
"type": "object",
"properties": {
"situation": {
"type": "string",
"description": "The situation to find a test for.",
}
},
"required": ["situation"],
},
},
# test example tool
{
"name": "test_example",
"description": "Creates a single example test for a \
given test name and situation.",
"parameters": {
"type": "object",
"properties": {
"test_name": {
"type": "string",
"description": "The name of the test to create an example for.",
},
"situation": {
"type": "string",
"description": "The situation to create an example for.",
},
},
"required": ["test_name", "situation"],
},
},
# explain tool
{
"name": "explain",
"description": "Explains a statistical concept described by the query.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query to explain.",
}
},
"required": ["query"],
},
},
# mock Example Creation Event tool
# if this is called, we return the ExampleCreationEvent, which will do a workflow
# which creates several auto-checked examples for a given test name and situation.
{
"name": "make_lots_of_examples",
"description": "Creates several examples for a given test name and situation.",
"parameters": {
"type": "object",
"properties": {
"test_name": {
"type": "string",
"description": "The name of the test to create examples for.",
},
"situation": {
"type": "string",
"description": "The situation to create examples for.",
},
},
"required": ["test_name"],
},
},
]
def ask_cohere_for_tool_call(query: str, cohere_tools) -> str:
# only helper for inteacting with Cohere
response = co.chat(message=query, model=COHERE_MODEL, tools=cohere_tools)
# parse the response, and return the name.
return response.tool_calls
def decide_initial_workflow_tool(query: str) -> ToolCallEvent | ExampleCreationEvent:
"""
Hack for creating a router for the StatisticsWorkflow
Idea here is that we can pass a set of tools to a LLM
and ask it to decide which one to use
But, that can be tricky without a reliable router,
so we're gonna pretend that the LLM
is getting a list of tools + workflow "tools", and just using the output
as a classifier.
The input will be a query, and the return will be the
string mapping to the event we need.
"""
tool_result = ask_cohere_for_tool_call(query, cohere_tools)
# TODO: handle multiple tool calls and global message context
tool_name = tool_result[0].name
if tool_name == "make_lots_of_examples":
# sometimes situation is not in the arguments, and sometimes it is.
situation = tool_result[0].parameters.get("situation", "")
test_name = tool_result[0].parameters.get("test_name", "")
return ExampleCreationEvent(test_name=test_name, situation=situation)
else:
tool_args = tool_result[0].parameters
return ToolCallEvent(tool_name=tool_name, arguments=tool_args)
# import the pretty printing functions from hsage_cli
# create dictionary to map tool names to pretty printing functions
tool_name_to_pretty_printer = {
"test_example": pretty_print_example,
"find_test": pretty_print_tests,
}
# Define the workflow
class StatisticsWorkflow(Workflow):
@step
async def router(
self, ctx: Context, ev: StartEvent
) -> ToolCallEvent | ExampleCreationEvent:
# how do I get the LLM to trigger a tool call OR exa
query = ev.query
event = decide_initial_workflow_tool(query)
return event
@step(num_workers=10)
async def tool_calling_step(
self, ctx: Context, ev: ToolCallEvent
) -> StopEvent | HelperEvent:
# TODO: handle multiple tool calls and global message context,
# so we won't stop immediately after the first tool call.
"""
This is sloppy, but is useful because it just allows users
to access a tool quickly and go.
"""
tool_name = ev.tool_name
tool_arguments = ev.arguments
tool_result = await asyncio.to_thread(TOOLS[tool_name], **tool_arguments)
if ev.from_longer_workflow:
return HelperEvent(result=tool_result)
else:
if tool_name == "test_example":
console, table = pretty_print_example(tool_result)
console.print(table)
if tool_name == "find_test":
console = pretty_print_tests(tool_result)
console.print()
return StopEvent(result=tool_result)
@step
async def example_generation_step(
self, ctx: Context, ev: ExampleCreationEvent
) -> ToolCallEvent:
"""
This helps users get multiple generated examples for
how to use a specific test and function.
We want some more complicated analysis logic
here when creating the examples, so we create
a separate event type and step for it.
Ideally, we could create a TON of examples,
and throw away the bad or malformed ones
We can use an LLM that leverages a checklist
of sorts to ensure examples are useful.
We're borrowing ideas from Modal's whitepaper on scaling inference
for accurate results, and
also a checklist paper released by Cohere recently.
checklist paper: https://arxiv.org/pdf/2410.03608
modal whitepaper: https://modal.com/blog/llama-human-eval
"""
for _ in range(5):
ctx.send_event(
ToolCallEvent(
tool_name="test_example",
arguments={"test_name": ev.test_name, "situation": ev.situation},
from_longer_workflow=True,
)
)
return None
@step
async def collect_examples(self, ctx: Context, ev: HelperEvent) -> StopEvent:
# we'll collect the results in a list
collected_events = ctx.collect_events(ev, [HelperEvent] * 5)
# TODO: FILTERING STEP
# we'll use an LLM to filter out the bad examples and keep the good ones
# for now, we'll just return the first three
good_examples = []
if collected_events is None:
return None
good_examples = collected_events[:3]
for example in good_examples:
console, table = pretty_print_example(example.result)
console.print(table)
return StopEvent(result=good_examples)
async def main(query: str):
w = StatisticsWorkflow(timeout=240, verbose=False)
result = await w.run(query=query)
print(result)
if __name__ == "__main__":
# run the workflow
query = "make me a bunch of examples for \
applying statistical tests to Youtube Videos on a given channel"
# query = "how do I test if two categorical variables are independent?"
asyncio.run(main(query=query))