From 6143f83a1dd7e83fc1206952fe643656678732b3 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Mon, 8 Apr 2024 08:39:20 -0600 Subject: [PATCH] chore(plugin-server): add plugin skip list for personless (#21378) * chore(plugin-server): add plugin skip list for personless * runDeprecatedPlugins --- plugin-server/src/types.ts | 1 + .../event-pipeline/pluginsProcessEventStep.ts | 5 +++-- .../worker/ingestion/event-pipeline/runner.ts | 9 ++++++++- .../src/worker/plugins/loadPluginsFromDB.ts | 14 ++++++++++++++ plugin-server/src/worker/plugins/run.ts | 15 +++++++++++---- .../__snapshots__/runner.test.ts.snap | 1 + plugin-server/tests/worker/plugins.test.ts | 17 +++++++++++++---- 7 files changed, 51 insertions(+), 11 deletions(-) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index b07e4771f3f2c..d9ccd83549de0 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -389,6 +389,7 @@ export interface Plugin { capabilities?: PluginCapabilities metrics?: StoredPluginMetrics is_stateless?: boolean + skipped_for_personless?: boolean public_jobs?: Record log_level?: PluginLogLevel } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts index 05e3a839e12b5..a6069160383af 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts @@ -7,13 +7,14 @@ import { EventPipelineRunner } from './runner' export async function pluginsProcessEventStep( runner: EventPipelineRunner, - event: PluginEvent + event: PluginEvent, + runDeprecatedPlugins: boolean ): Promise { const processedEvent = await runInstrumentedFunction({ timeoutContext: () => ({ event: JSON.stringify(event), }), - func: () => runProcessEvent(runner.hub, event), + func: () => runProcessEvent(runner.hub, event, runDeprecatedPlugins), statsKey: 'kafka_queue.single_event', timeoutMessage: 'Still running plugins on event. Timeout warning after 30 sec!', teamId: event.team_id, diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index b5b4743c32516..6b9ee04cede2b 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -165,7 +165,14 @@ export class EventPipelineRunner { return this.registerLastStep('clientIngestionWarning', [event], [warningAck]) } - const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id) + // Some expensive, deprecated plugins are skipped when `$process_person=false` + const runDeprecatedPlugins = processPerson + const processedEvent = await this.runStep( + pluginsProcessEventStep, + [this, event, runDeprecatedPlugins], + event.team_id + ) + if (processedEvent == null) { // A plugin dropped the event. return this.registerLastStep('pluginsProcessEventStep', [event]) diff --git a/plugin-server/src/worker/plugins/loadPluginsFromDB.ts b/plugin-server/src/worker/plugins/loadPluginsFromDB.ts index 81282e0646794..106f64c0e13a1 100644 --- a/plugin-server/src/worker/plugins/loadPluginsFromDB.ts +++ b/plugin-server/src/worker/plugins/loadPluginsFromDB.ts @@ -25,6 +25,17 @@ const loadPluginsTotalMsSummary = new Summary({ percentiles: [0.5, 0.9, 0.95, 0.99], }) +// These are plugins that do I/O (fetch, cache, storage) that we want to deprecate. For now, we +// want to at least ensure they don't run for "personless" ($process_person=false) events. +const personlessPluginSkipList = [ + 'https://github.com/posthog/currency-normalization-plugin', + 'https://github.com/posthog/event-sequence-timer-plugin', + 'https://github.com/posthog/first-event-today', + 'https://github.com/posthog/first-time-event-tracker', + 'https://github.com/posthog/flatten-properties-plugin', + 'https://github.com/posthog/mailboxlayer-plugin', +] + export async function loadPluginsFromDB( hub: Hub ): Promise> { @@ -33,6 +44,9 @@ export async function loadPluginsFromDB( const plugins = new Map() for (const row of pluginRows) { + if (row.url && personlessPluginSkipList.includes(row.url.toLowerCase())) { + row.skipped_for_personless = true + } plugins.set(row.id, row) } loadPluginsMsSummary.observe(new Date().getTime() - startTimer.getTime()) diff --git a/plugin-server/src/worker/plugins/run.ts b/plugin-server/src/worker/plugins/run.ts index b0226fc57b145..30823adf66326 100644 --- a/plugin-server/src/worker/plugins/run.ts +++ b/plugin-server/src/worker/plugins/run.ts @@ -208,9 +208,13 @@ export async function runComposeWebhook(hub: Hub, event: PostHogEvent): Promise< ) } -export async function runProcessEvent(hub: Hub, event: PluginEvent): Promise { +export async function runProcessEvent( + hub: Hub, + event: PluginEvent, + runDeprecatedPlugins = true +): Promise { const teamId = event.team_id - const pluginMethodsToRun = await getPluginMethodsForTeam(hub, teamId, 'processEvent') + const pluginMethodsToRun = await getPluginMethodsForTeam(hub, teamId, 'processEvent', runDeprecatedPlugins) let returnedEvent: PluginEvent | null = event const pluginsSucceeded: string[] = event.properties?.$plugins_succeeded || [] @@ -354,9 +358,12 @@ export async function runPluginTask( async function getPluginMethodsForTeam( hub: Hub, teamId: number, - method: M + method: M, + runDeprecatedPlugins = true ): Promise<[PluginConfig, VMMethods[M]][]> { - const pluginConfigs = hub.pluginConfigsPerTeam.get(teamId) || [] + const pluginConfigs = (hub.pluginConfigsPerTeam.get(teamId) || []).filter((pluginConfig: PluginConfig) => + runDeprecatedPlugins ? true : !pluginConfig.plugin?.skipped_for_personless + ) if (pluginConfigs.length === 0) { return [] } diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap index 9cd0d244500ae..9e2e872a5b484 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap +++ b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap @@ -33,6 +33,7 @@ Array [ "timestamp": "2020-02-23T02:15:00.000Z", "uuid": "uuid1", }, + true, ], ], Array [ diff --git a/plugin-server/tests/worker/plugins.test.ts b/plugin-server/tests/worker/plugins.test.ts index 47f0596228a85..9909f01301d7e 100644 --- a/plugin-server/tests/worker/plugins.test.ts +++ b/plugin-server/tests/worker/plugins.test.ts @@ -44,9 +44,11 @@ describe('plugins', () => { }) test('setupPlugins and runProcessEvent', async () => { - getPluginRows.mockReturnValueOnce([{ ...plugin60 }]) + // Use a Plugin URL that is skipped for "personless" ($process_person=false) events. + const plugin = { ...plugin60, url: 'https://github.com/posthog/first-event-today' } + getPluginRows.mockReturnValueOnce([plugin]) getPluginAttachmentRows.mockReturnValueOnce([pluginAttachment1]) - getPluginConfigRows.mockReturnValueOnce([pluginConfig39]) + getPluginConfigRows.mockReturnValueOnce([{ ...pluginConfig39, plugin: plugin }]) await setupPlugins(hub) const { plugins, pluginConfigs } = hub @@ -67,7 +69,7 @@ describe('plugins', () => { expect(pluginConfig.error).toEqual(pluginConfig39.error) expect(pluginConfig.plugin).toEqual({ - ...plugin60, + ...plugin, capabilities: { jobs: [], scheduled_tasks: [], methods: ['processEvent'] }, }) @@ -95,7 +97,7 @@ describe('plugins', () => { [ 60, { - ...plugin60, + ...plugin, capabilities: { jobs: [], scheduled_tasks: [], methods: ['processEvent'] }, }, ], @@ -112,6 +114,13 @@ describe('plugins', () => { const returnedEvent = await runProcessEvent(hub, event) expect(event.properties!['processed']).toEqual(true) expect(returnedEvent!.properties!['processed']).toEqual(true) + + // Personless event skips the plugin + const personlessEvent = { event: '$test', properties: {}, team_id: 2 } as PluginEvent + const runDeprecatedPlugins = false + const returnedPersonlessEvent = await runProcessEvent(hub, personlessEvent, runDeprecatedPlugins) + expect(personlessEvent.properties!['processed']).toEqual(undefined) + expect(returnedPersonlessEvent!.properties!['processed']).toEqual(undefined) }) test('stateless plugins', async () => {