-
Notifications
You must be signed in to change notification settings - Fork 5
In-memory action definitions synced with Django #403
Conversation
This is a diff of almost 500 lines already, would you prefer to merge just this action syncing feature alone (with it tested, but unused in ingestion), or merge all of action matching at once (will be a big diff)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few thoughts inline. Otherwise, I think it's good to merge this feature chunk by chunk.
src/worker/tasks.ts
Outdated
hello: (server, args) => { | ||
return `hello ${args}!` | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally you got rid of it 😁
this.pubSub = new PubSub(pluginsServer, { | ||
'fetch-action': async (message) => await this.actionManager.fetchAction(parseInt(message)), | ||
'delete-action': (message) => this.actionManager.deleteAction(parseInt(message)), | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only problem with this is that it'll add another redis connection per worker thread, which can add up. I guess even if we have officially given up on Heroku free redis tiers, it might be good to reload these inside the workers with a similar broadcastTask
system that plugin reloads use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, used similar system as reloadPlugins
, this way avoiding increase of number ofRedis connections with per-thread pubsub instances
return this.actionCache[id] | ||
} | ||
|
||
public async fetchAction(id: Action['id']): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this function is misleading. It's more reloadAction
than "fetch (and return) from the database"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, was definitely on the fence about this, went for reloadAction
and dropAction
) | ||
|
||
// This is normally done by Django async in such a situation | ||
await actionManager.fetchAction(67) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, echoing my point from above, the name is misleading here. What happens to the fetched action? Not obvious it actually caches it in a map... :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other than calling this here, should we also have a bigger and more E2E test to make sure actions actually reload in the workers? Fine to push that for a later PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this stage it's hard to test this E2E with the pubsub only in the main thread (whole startPluginsServer
needed instead of just createServer
), I don't know about reaching into actionManagers
in worker threads, and there's no actual functionality (like onAction
) to test here yet. But definitely a goal to have E2E tests when more parts are in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure this will work well (once all code paths are tested/covered, see below :P), however as a general observation, I'm afraid of "eventual inconsistency" here. In case we miss some drop/reload signal from the app, we'll never reach consistency.
This dropped signal might seem unlikely, but experience shows that things breaking randomly is more likely than not, given enough time :). Even if not today, then someone could accidentally introduce a 30sec "await" after the actions load, but before the pubsub started, or ... whatever imaginary case.
With reloads, this is less of an issue since reloads happen frequently, and always read in all plugins and diff changes. However here I'd add a "every 15min do a manual sync" just in case.
src/main/pluginsServer.ts
Outdated
'reload-action': async (message) => | ||
await piscina?.broadcastTask({ task: 'reloadAction', args: { actionId: parseInt(message) } }), | ||
'drop-action': async (message) => | ||
await piscina?.broadcastTask({ task: 'dropAction`', args: { actionId: parseInt(message) } }), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some "`" here after "dropAction". Can you make sure this code path is also tested?
src/worker/tasks.ts
Outdated
reloadAction: async (server, args: { actionId: Action['id'] }) => { | ||
return await server.eventsProcessor.actionManager.reloadAction(args.actionId) | ||
}, | ||
dropAction: (server, args: { actionId: Action['id'] }) => { | ||
return server.eventsProcessor.actionManager.dropAction(args.actionId) | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add these to worker.test.ts
} | ||
|
||
public async prepare(): Promise<void> { | ||
this.actionCache = await this.db.fetchAllActionsMap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how exactly action manager will be used yet, so:
If something goes wrong with fetching actions, such that this cache isn't populated, do we want to stop ingestion completely? or continue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, in that case an error will be thrown in this method (if e.g. Postgres connection fails) and indeed ingestion will be stopped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I'm asking if that's what we ought to do, or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes, we do, since otherwise data integrity will be at risk (action matching wouldn't work). Besides, this is not likely to fail for internal reasons, and if Postgres (the external dependency) fails, then we are in catastrophic failure territory anyway
Done @mariusandra resyncing all every 5 minutes + added taskRunner tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a nice bit of code! Let's see what it does 👀
I added a few comments still inline, feel free to fix or defer.
@@ -727,10 +727,16 @@ export class DB { | |||
|
|||
public async fetchAllActionsMap(): Promise<Record<Action['id'], Action>> { | |||
const rawActions: RawAction[] = ( | |||
await this.postgresQuery(`SELECT * FROM posthog_action`, undefined, 'fetchActions') | |||
await this.postgresQuery(`SELECT * FROM posthog_action WHERE deleted = FALSE`, undefined, 'fetchActions') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch!
src/utils/status.ts
Outdated
if (process.env.NODE_ENV?.toLowerCase() === 'test') { | ||
// TODO: use determineNodeEnv() here | ||
return () => {} // eslint-disable-line @typescript-eslint/no-empty-function | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the frustration with this, though sometimes when coding locally logging is good to have. It's somewhat unintuitive to find this piece of code to re-enable it.
I think there's on alternative to this, adding LOG_LEVEL=none
as a default for test mode.
It might have a bug, in that the existing console patching mechanism will also swallow messages sent by jest that would be good to still see. If so, the solution would be to just add this log level filtering inside status
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eh, removed this path altogether as it had circular import problems with config
, maybe another PR
src/utils/pubsub.ts
Outdated
throw new Error( | ||
`Received a pubsub message for unassociated channel ${channel}! Associated channels are: ${Object.keys( | ||
this.taskMap | ||
).join(', ')}` | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this can backfire? E.g. through some upgrade django starts sending runCommand
via pubsub before the plugin servers have restarted to receive it. Failing silently to sentry would be better probably.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly, yeah, moved to just captureException
src/main/pluginsServer.ts
Outdated
|
||
if (hub.jobQueueManager) { | ||
const queueString = hub.jobQueueManager.getJobQueueTypesAsString() | ||
await hub!.db!.redisSet('@posthog-plugin-server/enabled-job-queues', queueString) | ||
} | ||
|
||
// every 5 minutes all ActionManager caches are reloaded for eventual consistency | ||
pingJob = schedule.scheduleJob('*/5 * * * *', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think this is overridden by the pingJob
below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
…Hog/plugin-server#403) * Add ActionManager * Refactor ActionManager * Remove hello * Adjust ActionManager method names and use single PubSub * Touch tests up * Make some adjustments * Disable `status` stdout logs in test mode * Fix `status` * Fix test problems * Fix dropAction typo * Reload all ActionManager caches every 5 min * Fix duplicate RawAction * Don't stringify JSONB column for `insertRow` * It's a hub now * Filter by Action.deleted * Enhance ActionManager tests * Add Action-syncing task runner tests * Use `LOG_LEVEL=warn` in tests * Don't `throw` error on unassociated channel pubsub * Don't use defaultConfig in Status.buildMethod due to circular import * Fix actions reload job var name
Changes
Part of #235.
$pageview
)onAction
Piscina taskposthog_action_events
table (skipped for ClickHouse)Checklist