-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(plugin-server): refactor the event pipeline #9829
Conversation
const isAnonymousEvent = | ||
event.properties && event.properties['$device_id'] && event.distinctId === event.properties['$device_id'] | ||
const isRecentPerson = | ||
!person || DateTime.now().diff(person.created_at).as('seconds') < hub.BUFFER_CONVERSION_SECONDS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This was previously bugged, would return 0 if e.g. a user was created 1 day ago. Only caught this thanks to unit tests - very hard to catch this sort of bug otherwise.
Let's write more unit tests!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! This monster was not as bad as it seemed given all the tests.
Very close to approving ✅
Left some comments but didn't find any major gaps, love the test coverage. Let's maybe get this in on Monday though and I'll make sure to also keep an eye out
error?: string | ||
} | ||
|
||
const STEPS_TO_EMIT_TO_DLQ_ON_FAILURE: Array<StepType> = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could be a set
plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts
Outdated
Show resolved
Hide resolved
|
||
if (preIngestionEvent && preIngestionEvent.event !== '$snapshot') { | ||
return runner.nextStep('determineShouldBufferStep', preIngestionEvent) | ||
} else if (preIngestionEvent && preIngestionEvent.event === '$snapshot') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else if (preIngestionEvent && preIngestionEvent.event === '$snapshot') { | |
} else if (preIngestionEvent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentional - I was trying to make it clearer to the reader it's dealing with a snapshot here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair
} | ||
|
||
const processedPluginEvent = convertToProcessedPluginEvent(event) | ||
const isSnapshot = event.event === '$snapshot' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could move this up given we have a isSnapshot
test on line 15
const promises = [] | ||
let actionMatches: Action[] = [] | ||
if (event.event !== '$snapshot') { | ||
actionMatches = await runner.hub.actionMatcher.match(event, person, elements) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we actually run the onEvent
/ onSnapshot
flow earlier? I'd like to make sure that runs if the action path is broken. Also diff is probably minimal but I'd like to trigger exporting an event before action webhooks etc.
will anyway group code together that's used together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean. Do you want to ignore errors from action matching when deciding whether to run onAction? That's a new requirement if so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - let me submit a suggestion as to what I want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mistyped my question - onEvent calling should not be affected by action-related errors? If so, I think re-ordering is too implicit about that and we should make that obvious in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's resolve this in a follow-up PR - I'm in merge conflict hell until this is in.
plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts
Show resolved
Hide resolved
const promises = [] | ||
let actionMatches: Action[] = [] | ||
if (event.event !== '$snapshot') { | ||
actionMatches = await runner.hub.actionMatcher.match(event, person, elements) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - let me submit a suggestion as to what I want
* master: chore: start stack once in cloud tests (#9879) feat(apps): frontend apps (#9831) chore: Fix snapshots on master (#9885) chore(apps): rename plugins to apps (#9755) refactor: Remove constance library dependency, use json-encoded model (#9852) chore(clickhouse): avoid creating kafka_events, events_mv (#9863) fix(insights): Fix timezone date issues (#9678) refactor(plugin-server): refactor the event pipeline (#9829) feat(object storage): add unused object storage (#9846) fix: make kafka health check timeout test reliable (#9857) fix: query elements from start of day (#9827)
* Start refactoring event pipeline * Add some initial metrics * Handle DLQ error messages in pipeline runner * Add public functions for the pipeline * Tests for runner.ts * Tests for every step in event pipeline * yeet some now-unneeded worker code * Add timeoutGuard * Emit to DLQ from buffer * Move some tests to a separate file * fix internal metrics * Refactor method location, WIP * Fix code determining if user is a recent person or not * Update tests to deal with new pipeline * Rename methods for consistency * Remove now-dead test * Update process-event.test.ts * Update DLQ test * Ignore test under yeet * Remove mocked * Remove dead code * Update naming
Problem
This PR refactors how the event pipeline works. It's a follow-up to #9738
The problem being solved by these PRs is that the event processing pipeline was really hard to follow due to confusing terminology, worker/main thread split, return value dependencies and more.
This in turn is needed to split plugin-server processing in a nice way.
Changes
Event pipeline is now a pipeline of steps where every step can call a subsequent one or stop processing.
Note that the problem isn't fully solved -
process-event.ts
probably does too much right now, but we're in a better position than before.Also note that this way of structuring things also lent itself really well for unit testing and caught some bugs that previously slipped through.