-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core-util] Abstract the abortable promise pattern #24821
[core-util] Abstract the abortable promise pattern #24821
Conversation
7626f46
to
4dac6ce
Compare
API change check API changes are not detected in this pull request. |
077c1c8
to
f78fe3e
Compare
f78fe3e
to
832c0c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! I imagine we can utilize this in messaing maybe?
sdk/core/core-util/src/delay.ts
Outdated
abortSignal?.removeEventListener("abort", onAbort); | ||
} | ||
function onAbort(): void { | ||
cleanupBeforeAbort?.(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if it's a valid scenario: do we ever need to await some async cleanup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically you don't want to block on cleanup and it is an anti-pattern to await inside the promise executer because error handling becomes harder. @xirzec do you have thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Yes, I've been bitten by blocked calls in Service Bus closing code...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't like "event" style callbacks to be awaited, it creates hard to reason about systems. We could always rename this to be more obvious like onAbortCalled
or something
buildPromise({ | ||
resolve: (x) => { | ||
resolve(x); | ||
removeListeners(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the order matter? the old delay
code removes listeners before resolving.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted the ordering but I don't think it matters.
@jeremymeng That is absolutely my intention here 😁 I am working on rewriting the partition receiver in EH in #24731 and I am going to utilize this there but keeping it internal for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for abstracting this!
I've apparently spent too many hours working with promises in my career since I have a lot of minor stylistic opinions. 😅
sdk/core/core-util/src/delay.ts
Outdated
* @internal | ||
*/ | ||
export function createAbortablePromise<T>(inputs: { | ||
buildPromise: (inputs: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typically we'd have the required argument be first and optional arguments be on a separate options object. I don't think it should be mandatory, but I'm curious why we took an object-first approach here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh I like the object-first approach because it forces me to name everything so the client code becomes more readable but I agree with you we should stick with our convention if we're going to export it eventually. Addressed in 64ea6c2.
sdk/core/core-util/src/delay.ts
Outdated
}) => void; | ||
cleanupBeforeAbort?: () => void; | ||
}): (options?: DelayOptions) => Promise<T> { | ||
const { buildPromise, cleanupBeforeAbort } = inputs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you like you can destructure in the method declaration to avoid having to have the awkward name inputs
:
export function createAbortablePromise<T> ({ buildPromise, cleanupBeforeAbort }: OptionType): ReturnType {
sdk/core/core-util/src/delay.ts
Outdated
return ({ abortSignal, abortErrorMsg } = {}) => | ||
new Promise((resolve, reject) => { | ||
function rejectOnAbort(): void { | ||
reject(new AbortError(abortErrorMsg ?? "The operation was aborted.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re-use the StandardAbortMessage constant here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated that constant earlier to be unique to the delay function, so now it reads "The delay was aborted". It wouldn't work for this general-purpose function.
sdk/core/core-util/src/delay.ts
Outdated
abortSignal?.removeEventListener("abort", onAbort); | ||
} | ||
function onAbort(): void { | ||
cleanupBeforeAbort?.(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't like "event" style callbacks to be awaited, it creates hard to reason about systems. We could always rename this to be more obvious like onAbortCalled
or something
sdk/core/core-util/src/delay.ts
Outdated
if (abortSignal?.aborted) { | ||
return rejectOnAbort(); | ||
} | ||
buildPromise({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One subtlety here is the promise constructor catches exceptions in the callback, so a promise created like this:
const promise = new Promise(() => { throw new Error("oh no"));
will correctly return a rejected promise with the captured error instead of the constructor itself throwing.
To keep this contract consistent, can we put a try/catch around the call to buildPromise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, we shouldn't assume that reject
will be called appropriately inside.
abortErrorMsg, | ||
}); | ||
aborter.abort(); | ||
await assert.isRejected(promise, new RegExp(abortErrorMsg)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the new RegExp
isn't necessary here as isRejected
can take a string directly
|
||
it("should reject when aborted", async function () { | ||
const aborter = new AbortController(); | ||
const abortErrorMsg = "The operation was aborted."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could make this a unique message to test the message actually gets used instead of the standard one?
sdk/core/core-util/src/delay.ts
Outdated
cleanupBeforeAbort?: () => void; | ||
}): (options?: DelayOptions) => Promise<T> { | ||
const { buildPromise, cleanupBeforeAbort } = inputs; | ||
return ({ abortSignal, abortErrorMsg } = {}) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about having the abortSignal
and the abortErrorMsg
be part of the original options bag instead of returning a function here? I feel like it's a little nicer to be able to implement delay like this:
return createAbortablePromise<void>({
buildPromise: ({ resolve }) => {
token = setTimeout(resolve, timeInMs);
},
cleanupBeforeAbort: () => clearTimeout(token),
abortSignal,
abortErrorMsg: abortErrorMsg ?? StandardAbortMessage,
});
or if we use my other suggestions:
return createAbortablePromise<void>(
(resolve) => {
token = setTimeout(resolve, timeInMs);
}, {
onAbortCalled: () => clearTimeout(token),
abortSignal,
abortErrorMsg
}
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, the customer can create the function themselves if they need to. Addressed in 64ea6c2
sdk/core/core-util/src/delay.ts
Outdated
*/ | ||
export function createAbortablePromise<T>(inputs: { | ||
buildPromise: (inputs: { | ||
resolve: (value: T | PromiseLike<T>) => void; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about typing the buildPromise more like the original Promise constructor? something like:
buildPromise: (resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: any) => void) => void;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, addressed in 64ea6c2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
Co-authored-by: Jeff Fisher <xirzec@xirzec.com>
The CI failure is a known unrelated issue so I am going to override and merge. |
/check-enforcer override |
# Re-implementing the Event Receiver This PR re-implements the event receiver using promises and a single queue to fix an ordering issue and to correct waiting behavior. ## Problem Statement [Issue #23993] A customer reported that the list of events passed into the `processEvents` callback is not always ordered by `sequenceNumber`. This leads to processing the events in a wrong order. The customer provided a sample that prints an out of order message when the `sequenceNumber` of received messages is not in order and I confirm that I see the message printed sometimes. ## Analysis The customer-provided callback, `processEvents`, gets called every time a batch of events is received from the service. This batch is coming from a single partition. Events are ordered within a partition by their `sequenceNumber`, and events received by `processEvents` should be in the same order. However currently, the list of events the `processEvents` callback gets called on is not always in-order. Upon further investigation, it was found that the library implements a complex logic to read events from the service. It maintains two queues for reading events, one for building a batch of events that will be sent to the next call of the `processEvents` callback, and another for when errors occur or there are no active listeners. The coordination to read events from the two queues is subtle and is the source of the ordering bug. ## Re-design The most straightforward way to simplify this design and to ensure ordering is to use a single queue and add incoming events to it in the order they're received. Reading from this queue is as simple as the following: - If the queue contains any events, check if their count is already the `maxMessageCount` or more: - If yes, remove `maxMessageCount` events and return them immediately - If no, wait for a few milliseconds and then remove up to `maxMessageCount` and return them - If the queue doesn't contain any events, wait until the `maxWaitTimeInSeconds` and then return an empty list, or until one or more event arrive and then return those ### Abstraction The idea is concisely captured by `waitForEvents`, a newly introduced function that races a list of promises, one for each of the scenarios listed above: https://github.com/Azure/azure-sdk-for-js/blob/10826927554e7254dce0a4849f1e0c8219373522/sdk/eventhub/event-hubs/src/eventHubReceiver.ts#L733-L739 The first promise resolves right away and is returned if the queue already has `maxMessageCount` events or more. It corresponds to the first scenario listed above. The second promise is created by the `checkOnInterval` function. The promise is resolved only if the queue has any events in it. Otherwise, it keeps checking every number of milliseconds. Note that chained to it is a timer promise that waits another number of milliseconds to give the service a chance to send more events. This corresponds to the second scenario listed above. The third promise is a simple timer promise that is resolved after the `maxWaitTime` has elapsed. This promise corresponds to the third scenario. ### Rewrite In addition to some other minor improvements, the `receiveBatch` method is concisely rewritten using that abstraction as follows: https://github.com/Azure/azure-sdk-for-js/blob/10826927554e7254dce0a4849f1e0c8219373522/sdk/eventhub/event-hubs/src/eventHubReceiver.ts#L578-L628 Notice that the chain of promises makes the algorithm simple to read: a link is established first, credits are added to it as needed, and then the waiting starts. Also, notice that at this point, no actual events were read from the queue yet, all what this does is waiting until one of the promises resolve. The actual reading from the queue is thened to that chain so that it happens only after everything else is said and done. For example, if an error occurred, it should be handled and we don't want to prematurely mutate the queue. The reading from the queue is as simple as the following: https://github.com/Azure/azure-sdk-for-js/blob/10826927554e7254dce0a4849f1e0c8219373522/sdk/eventhub/event-hubs/src/eventHubReceiver.ts#L630 ## Other changes ### Exporting `core-util`'s `createAbortablePromise` This function was added in #24821 and proved to be useful in this re-write so I am exporting it. I am planning on using it in core-lro too. ### Updating tests There are two tests updated, one for authentication and one for returning events in the presence of retryable and non-retryable errors. In the former, the receiver is expected to receive events after the auth token has been invalidated but not yet refreshed. However, I am observing that a disconnected event has been received at that moment and the receiver has been deleted. The old receiver's behavior is to continue receiving despite the deletion but the new one's behavior correctly cleans up the receiver. I deleted this expectation for now. In the latter, the test forces an error on the receiver after 50 milliseconds but the receiver already finishes around 40 milliseconds, so I updated the forced error to happen sooner, at 10 milliseconds: https://github.com/Azure/azure-sdk-for-js/blob/10826927554e7254dce0a4849f1e0c8219373522/sdk/eventhub/event-hubs/test/internal/receiveBatch.spec.ts#L107 Finally, a couple test suites were added for `waitForEvents` and `checkOnInterval` functions. ## Updates in action Live tests succeed [[here](https://dev.azure.com/azure-sdk/internal/_build/results?buildId=2201768&view=results)]. Please ignore the timeout in the deployed resources script in canary, it is an unrelated service issue, see [[here](https://dev.azure.com/azure-sdk/internal/_build/results?buildId=2198994&view=results)]. A log for how the updated receiver behaves when used by the customer sample can be found in [log2.txt](https://github.com/Azure/azure-sdk-for-js/files/10775378/log2.txt). Notice that the out of order message was never printed. ## Reviewing tips The changes in `eventHubReceiver.ts` are too many and the diff is not easily readable. I highly suggest to review 1082692 instead because it is on top of a deleting commit so there is no diff to wrestle with. The main changes are in `receiveBatch` but please feel free to review the rest of the module too.
An abortable promise is a common pattern in places where the customer have the ability to abort async work, e.g. LROs. I abstracted this pattern into an internal factory function in core-util for now and refactored the
delay
function to use it.