-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(plugin-server): Run ingestion only on worker threads #9738
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work - let's ofc get some tests in as well.
left some initial comments
Not sure what do you mean - what tests do we need here beyond what we already have? Note the intent here (as discussed on tuesday) is 0 behavioral changes besides moving some code from outside worker to inside so we know it's safe (can run it in production). Actual refactoring and behavioral changes will happen once we have verified that it works and are happy with it. |
d82584e
to
d5e0d05
Compare
This takes the function from 13-17s to 4s locally. Not great but not terrible either.
This is now ready for review/merge. @yakkomajuri please take a look and let's merge it sometime today :) I needed to unfortunately include some test refactorings - there were some bugs in the test suite causing pain getting this green. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's keep a close eye on this and revert if necessary.
console.log("Error creating topics. Probably they're already created.\n", error) | ||
|
||
const existingTopics = await admin.listTopics() | ||
const topicsToCreate = topics.filter((topic) => !existingTopics.includes(topic)).map((topic) => ({ topic })) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -109,5 +112,9 @@ function shouldSendEventToBuffer(hub: Hub, event: PreIngestionEvent, person?: Pe | |||
hub.statsd?.increment('conversion_events_buffer_size', { teamId: event.teamId.toString() }) | |||
} | |||
|
|||
if (!hub.CONVERSION_BUFFER_ENABLED && !hub.conversionBufferEnabledTeams.has(teamId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Follow-up to #9738, silly mistake
Follow-up to #9738, silly mistake
* master: fix: exclusion steps cannot be selected (#9762) feat(lemon-button): Support `status` for `primary` buttons (#9782) fix: healthcheck for kafka on plugin server (#9771) fix(billing): Update billing success message (#9739) fix(plugin-server): Set transpileOnly when importing piscina code in tests (#9777) fix(plugin-server): Remove unused kafka reset test code (#9779) fix: set kafka_skip_broken_messages on dead letter queue table (#9754) fix(plugin-server): remove dead code from worker.test.ts (#9776) refactor(plugin-server): Run ingestion only on worker threads (#9738) fix: Plugin-server tests with kafka need to have consumer stopped (#9774) chore(deps): Update posthog-js to 1.21.1 (#9773) chore(dep): upgrading rr-web (#9772) fix: ouroboros inputs (#9769)
* Refactor method to do everything it's supposed to do No tests though? * Remove unused argument from a function * Remove dead code * Run event pipeline on worker threads * Remove code in way of refactor * Make metrics happy * Silence a yeetcode-related failure * Mock over stopping unneeded services * Yeetcode a test * Reset the right table between tests * Reset healthcheck consumer between tests * Silence logspam around creating topics * Significantly speed up resetting clickhouse tables between tests This takes the function from 13-17s to 4s locally. Not great but not terrible either. * Remove try-catch * Remove a dead test
Follow-up to #9738, silly mistake
Problem
WIP. Creating this to run tests
Our ingestion logic is hard to understand and harder to split up. Doing some work in main thread and some in workers is one part of the problem.
Changes
We now run all of the previous
ingestEvent
in the worker thread.No behavior is dropped - most code is copy-pasta with minor callsite changes.
Once this is live we will have info whether doing ingestion purely in workers causes issues.
👉 Stay up-to-date with PostHog coding conventions for a smoother review.
How did you test this code?
See tests.