Skip to content

Commit

Permalink
chore(plugin-server): add plugin skip list for personless (#21378)
Browse files Browse the repository at this point in the history
* chore(plugin-server): add plugin skip list for personless

* runDeprecatedPlugins
  • Loading branch information
bretthoerner authored Apr 8, 2024
1 parent 03d81a6 commit 6143f83
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 11 deletions.
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ export interface Plugin {
capabilities?: PluginCapabilities
metrics?: StoredPluginMetrics
is_stateless?: boolean
skipped_for_personless?: boolean
public_jobs?: Record<string, JobSpec>
log_level?: PluginLogLevel
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import { EventPipelineRunner } from './runner'

export async function pluginsProcessEventStep(
runner: EventPipelineRunner,
event: PluginEvent
event: PluginEvent,
runDeprecatedPlugins: boolean
): Promise<PluginEvent | null> {
const processedEvent = await runInstrumentedFunction({
timeoutContext: () => ({
event: JSON.stringify(event),
}),
func: () => runProcessEvent(runner.hub, event),
func: () => runProcessEvent(runner.hub, event, runDeprecatedPlugins),
statsKey: 'kafka_queue.single_event',
timeoutMessage: 'Still running plugins on event. Timeout warning after 30 sec!',
teamId: event.team_id,
Expand Down
9 changes: 8 additions & 1 deletion plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,14 @@ export class EventPipelineRunner {
return this.registerLastStep('clientIngestionWarning', [event], [warningAck])
}

const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id)
// Some expensive, deprecated plugins are skipped when `$process_person=false`
const runDeprecatedPlugins = processPerson
const processedEvent = await this.runStep(
pluginsProcessEventStep,
[this, event, runDeprecatedPlugins],
event.team_id
)

if (processedEvent == null) {
// A plugin dropped the event.
return this.registerLastStep('pluginsProcessEventStep', [event])
Expand Down
14 changes: 14 additions & 0 deletions plugin-server/src/worker/plugins/loadPluginsFromDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ const loadPluginsTotalMsSummary = new Summary({
percentiles: [0.5, 0.9, 0.95, 0.99],
})

// These are plugins that do I/O (fetch, cache, storage) that we want to deprecate. For now, we
// want to at least ensure they don't run for "personless" ($process_person=false) events.
const personlessPluginSkipList = [
'https://github.com/posthog/currency-normalization-plugin',
'https://github.com/posthog/event-sequence-timer-plugin',
'https://github.com/posthog/first-event-today',
'https://github.com/posthog/first-time-event-tracker',
'https://github.com/posthog/flatten-properties-plugin',
'https://github.com/posthog/mailboxlayer-plugin',
]

export async function loadPluginsFromDB(
hub: Hub
): Promise<Pick<Hub, 'plugins' | 'pluginConfigs' | 'pluginConfigsPerTeam'>> {
Expand All @@ -33,6 +44,9 @@ export async function loadPluginsFromDB(
const plugins = new Map<PluginId, Plugin>()

for (const row of pluginRows) {
if (row.url && personlessPluginSkipList.includes(row.url.toLowerCase())) {
row.skipped_for_personless = true
}
plugins.set(row.id, row)
}
loadPluginsMsSummary.observe(new Date().getTime() - startTimer.getTime())
Expand Down
15 changes: 11 additions & 4 deletions plugin-server/src/worker/plugins/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,13 @@ export async function runComposeWebhook(hub: Hub, event: PostHogEvent): Promise<
)
}

export async function runProcessEvent(hub: Hub, event: PluginEvent): Promise<PluginEvent | null> {
export async function runProcessEvent(
hub: Hub,
event: PluginEvent,
runDeprecatedPlugins = true
): Promise<PluginEvent | null> {
const teamId = event.team_id
const pluginMethodsToRun = await getPluginMethodsForTeam(hub, teamId, 'processEvent')
const pluginMethodsToRun = await getPluginMethodsForTeam(hub, teamId, 'processEvent', runDeprecatedPlugins)
let returnedEvent: PluginEvent | null = event

const pluginsSucceeded: string[] = event.properties?.$plugins_succeeded || []
Expand Down Expand Up @@ -354,9 +358,12 @@ export async function runPluginTask(
async function getPluginMethodsForTeam<M extends keyof VMMethods>(
hub: Hub,
teamId: number,
method: M
method: M,
runDeprecatedPlugins = true
): Promise<[PluginConfig, VMMethods[M]][]> {
const pluginConfigs = hub.pluginConfigsPerTeam.get(teamId) || []
const pluginConfigs = (hub.pluginConfigsPerTeam.get(teamId) || []).filter((pluginConfig: PluginConfig) =>
runDeprecatedPlugins ? true : !pluginConfig.plugin?.skipped_for_personless
)
if (pluginConfigs.length === 0) {
return []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Array [
"timestamp": "2020-02-23T02:15:00.000Z",
"uuid": "uuid1",
},
true,
],
],
Array [
Expand Down
17 changes: 13 additions & 4 deletions plugin-server/tests/worker/plugins.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ describe('plugins', () => {
})

test('setupPlugins and runProcessEvent', async () => {
getPluginRows.mockReturnValueOnce([{ ...plugin60 }])
// Use a Plugin URL that is skipped for "personless" ($process_person=false) events.
const plugin = { ...plugin60, url: 'https://github.com/posthog/first-event-today' }
getPluginRows.mockReturnValueOnce([plugin])
getPluginAttachmentRows.mockReturnValueOnce([pluginAttachment1])
getPluginConfigRows.mockReturnValueOnce([pluginConfig39])
getPluginConfigRows.mockReturnValueOnce([{ ...pluginConfig39, plugin: plugin }])

await setupPlugins(hub)
const { plugins, pluginConfigs } = hub
Expand All @@ -67,7 +69,7 @@ describe('plugins', () => {
expect(pluginConfig.error).toEqual(pluginConfig39.error)

expect(pluginConfig.plugin).toEqual({
...plugin60,
...plugin,
capabilities: { jobs: [], scheduled_tasks: [], methods: ['processEvent'] },
})

Expand Down Expand Up @@ -95,7 +97,7 @@ describe('plugins', () => {
[
60,
{
...plugin60,
...plugin,
capabilities: { jobs: [], scheduled_tasks: [], methods: ['processEvent'] },
},
],
Expand All @@ -112,6 +114,13 @@ describe('plugins', () => {
const returnedEvent = await runProcessEvent(hub, event)
expect(event.properties!['processed']).toEqual(true)
expect(returnedEvent!.properties!['processed']).toEqual(true)

// Personless event skips the plugin
const personlessEvent = { event: '$test', properties: {}, team_id: 2 } as PluginEvent
const runDeprecatedPlugins = false
const returnedPersonlessEvent = await runProcessEvent(hub, personlessEvent, runDeprecatedPlugins)
expect(personlessEvent.properties!['processed']).toEqual(undefined)
expect(returnedPersonlessEvent!.properties!['processed']).toEqual(undefined)
})

test('stateless plugins', async () => {
Expand Down

0 comments on commit 6143f83

Please sign in to comment.