Skip to content

Commit 8fbf275

Browse files
authored
fix: stream order (#411)
1 parent 6c434c1 commit 8fbf275

File tree

3 files changed

+101
-24
lines changed

3 files changed

+101
-24
lines changed

apps/tests/aws-runtime/test/test-service.ts

+67-3
Original file line numberDiff line numberDiff line change
@@ -607,14 +607,20 @@ export const counterCollectionOrderByN = counterCollection.index(
607607
const entityEvent = event<{ id: string }>("entityEvent");
608608
const entitySignal = signal("entitySignal");
609609
const entitySignal2 = signal<{ n: number }>("entitySignal2");
610+
const entityOrderSignal = signal<{ n: number }>("entityOrderSignal");
610611

611612
export const counterWatcher = counter.stream(
612613
"counterWatcher",
613-
{ operations: ["remove"], includeOld: true },
614+
{ operations: ["modify", "remove"], includeOld: true },
614615
async (item) => {
615616
console.log(item);
616-
const { n } = item.oldValue!;
617-
await entitySignal2.sendSignal(item.key.id, { n: n + 1 });
617+
if (item.operation === "remove") {
618+
const { n } = item.oldValue!;
619+
await entitySignal2.sendSignal(item.key.id, { n: n + 1 });
620+
} else if (item.newValue.namespace === "default") {
621+
const { n } = item.newValue;
622+
await entityOrderSignal.sendSignal(item.key.id, { n });
623+
}
618624
}
619625
);
620626

@@ -658,12 +664,31 @@ export const entityTask = task(
658664
"entityTask",
659665
async (_, { execution: { id } }) => {
660666
const value = await counter.get(["default", id]);
667+
// send 4 updates to force multiple update events at the same time and test stream ordering
661668
await counter.put({
662669
namespace: "default",
663670
id,
664671
n: (value?.n ?? 0) + 1,
665672
optional: undefined,
666673
});
674+
await counter.put({
675+
namespace: "default",
676+
id,
677+
n: (value?.n ?? 0) + 2,
678+
optional: undefined,
679+
});
680+
await counter.put({
681+
namespace: "default",
682+
id,
683+
n: (value?.n ?? 0) + 3,
684+
optional: undefined,
685+
});
686+
await counter.put({
687+
namespace: "default",
688+
id,
689+
n: (value?.n ?? 0) + 4,
690+
optional: undefined,
691+
});
667692
}
668693
);
669694

@@ -748,15 +773,34 @@ export const entityIndexTask = task(
748773
export const entityWorkflow = workflow(
749774
"entityWorkflow",
750775
async (_, { execution: { id } }) => {
776+
let orderValue = 0;
777+
let orderError: string | undefined;
778+
// this signal handler is intended to test that the counter stream is executed in order.
779+
// it sends a signal each time the "default" name space counter is updated
780+
// and we want to check if the order is always increasing
781+
// and then validate the order value against the final value at the end.
782+
entityOrderSignal.onSignal(({ n }) => {
783+
if (n > orderValue) {
784+
console.log(`Ordered Value: ${orderValue} ${n}`);
785+
orderValue = n;
786+
} else {
787+
orderError = `order value ${n} is less than or equal to previous value ${orderValue}, the stream handler executed out of order!`;
788+
}
789+
});
790+
791+
// default: 1
751792
await counter.put({ namespace: "default", id, n: 1, optional: undefined });
752793
await counter.put({
753794
namespace: "different",
754795
id,
755796
n: 1,
756797
optional: undefined,
757798
});
799+
// default: 2 after insert "different"
758800
await entitySignal.expectSignal();
801+
// default: 6 after entity task (adds 4)
759802
await entityTask();
803+
// default: 7 after event subscription
760804
await Promise.all([entityEvent.emit({ id }), entitySignal.expectSignal()]);
761805
try {
762806
// will fail
@@ -769,6 +813,7 @@ export const entityWorkflow = workflow(
769813
}
770814
const { value: entityValue, version } =
771815
(await counter.getWithMetadata(["default", id])) ?? {};
816+
// default: 8 after get with metadata and successful assertion
772817
await counter.put(
773818
{ namespace: "default", id, n: entityValue!.n + 1, optional: undefined },
774819
{ expectedVersion: version }
@@ -782,6 +827,9 @@ export const entityWorkflow = workflow(
782827
},
783828
]);
784829

830+
// default: 9 after transact write
831+
const finalValue = await counter.get(["default", id]);
832+
785833
const result0 = await entityIndexTask();
786834

787835
// send deletion, to be picked up by the stream
@@ -793,6 +841,22 @@ export const entityWorkflow = workflow(
793841
entitySignal2.expectSignal(),
794842
]);
795843

844+
if (
845+
orderError ||
846+
// wait 30 seconds for the order value to match the final value.
847+
!(await condition(
848+
{ timeout: duration(300, "seconds") },
849+
() => finalValue?.n === orderValue
850+
))
851+
) {
852+
if (orderError) {
853+
throw new Error(orderError);
854+
}
855+
throw new Error(
856+
`Order handler never received the final value! have: ${orderValue} expected: ${finalValue?.n}`
857+
);
858+
}
859+
796860
/**
797861
* Testing sort keys and query
798862
*/

apps/tests/aws-runtime/test/tester.test.ts

+7-7
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ eventualRuntimeTestHarness(
132132
},
133133
{
134134
namespace: "default",
135-
n: 6,
135+
n: 9,
136136
},
137137
{
138138
namespace: "another2",
@@ -150,7 +150,7 @@ eventualRuntimeTestHarness(
150150
},
151151
{
152152
namespace: "default",
153-
n: 6,
153+
n: 9,
154154
},
155155
{
156156
namespace: "another",
@@ -168,7 +168,7 @@ eventualRuntimeTestHarness(
168168
},
169169
{
170170
namespace: "default",
171-
n: 6,
171+
n: 9,
172172
},
173173
{
174174
namespace: "different",
@@ -184,13 +184,13 @@ eventualRuntimeTestHarness(
184184
betweenN: [
185185
{
186186
namespace: "default",
187-
n: 6,
187+
n: 9,
188188
},
189189
],
190190
greaterThanN: [
191191
{
192192
namespace: "default",
193-
n: 6,
193+
n: 9,
194194
},
195195
{
196196
namespace: "another",
@@ -204,7 +204,7 @@ eventualRuntimeTestHarness(
204204
},
205205
{
206206
namespace: "default",
207-
n: 6,
207+
n: 9,
208208
},
209209
],
210210
sparse: [
@@ -224,7 +224,7 @@ eventualRuntimeTestHarness(
224224
},
225225
],
226226
},
227-
{ n: 7 },
227+
{ n: 10 },
228228
[
229229
{ counterNumber: 1, n: 1 },
230230
{ counterNumber: 2, n: 2 },

packages/@eventual/core-runtime/src/handlers/entity-stream-worker.ts

+27-14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { EntityStreamContext, EntityStreamItem } from "@eventual/core";
22
import { ServiceType, getEventualResource } from "@eventual/core/internal";
33
import { serviceTypeScope } from "../service-type.js";
4+
import { normalizeCompositeKey } from "../stores/entity-store.js";
45
import { getLazy, promiseAllSettledPartitioned } from "../utils.js";
56
import { registerWorkerIntrinsics, type WorkerIntrinsicDeps } from "./utils.js";
67

@@ -23,12 +24,10 @@ export function createEntityStreamWorker(
2324

2425
return async (entityName, streamName, items) =>
2526
serviceTypeScope(ServiceType.EntityStreamWorker, async () => {
26-
const streamHandler = getEventualResource(
27-
"Entity",
28-
entityName
29-
)?.streams.find((s) => s.name === streamName);
27+
const entity = getEventualResource("Entity", entityName);
28+
const streamHandler = entity?.streams.find((s) => s.name === streamName);
3029

31-
if (!streamHandler) {
30+
if (!entity || !streamHandler) {
3231
throw new Error(`Stream handler ${streamName} does not exist`);
3332
}
3433

@@ -45,20 +44,34 @@ export function createEntityStreamWorker(
4544

4645
return { failedItemIds: result?.failedItemIds ?? [] };
4746
} else {
47+
const itemsByKey: Record<string, EntityStreamItem<any>[]> = {};
48+
items.forEach((item) => {
49+
const normalizedKey = normalizeCompositeKey(entity, item.key);
50+
const serializedKey = JSON.stringify(normalizedKey);
51+
(itemsByKey[serializedKey] ??= []).push(item);
52+
});
53+
4854
const results = await promiseAllSettledPartitioned(
49-
items,
50-
async (item) => {
51-
return await streamHandler.handler(item, context);
55+
Object.entries(itemsByKey),
56+
async ([, itemGroup]) => {
57+
for (const i in itemGroup) {
58+
const item = itemGroup[i]!;
59+
try {
60+
const result = await streamHandler.handler(item, context);
61+
// if the handler doesn't fail and doesn't return false, continue
62+
if (result !== false) {
63+
continue;
64+
}
65+
} catch {}
66+
// if the handler fails or returns false, return the rest of the items
67+
return itemGroup.slice(Number(i)).map((i) => i.id);
68+
}
69+
return [];
5270
}
5371
);
5472

5573
return {
56-
failedItemIds: [
57-
...results.rejected.map(([e]) => e.id),
58-
...results.fulfilled
59-
.filter(([, r]) => r === false)
60-
.map(([e]) => e.id),
61-
],
74+
failedItemIds: results.fulfilled.flatMap((s) => s[1]),
6275
};
6376
}
6477
});

0 commit comments

Comments
 (0)