From 20dc8b8983b5b9217d0abd3e59a75df354c46adf Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Mon, 16 May 2022 12:29:28 +0300 Subject: [PATCH 01/22] Start refactoring event pipeline --- .../event-pipeline/createEventStep.ts | 11 +++ .../determineShouldBufferStep.ts | 20 +++++ .../event-pipeline/pluginsProcessEventStep.ts | 30 ++++++++ .../event-pipeline/prepareEventStep.ts | 27 +++++++ .../event-pipeline/runAsyncHandlersStep.ts | 47 ++++++++++++ .../worker/ingestion/event-pipeline/runner.ts | 74 +++++++++++++++++++ .../src/worker/ingestion/ingest-event.ts | 2 +- 7 files changed, 210 insertions(+), 1 deletion(-) create mode 100644 plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts create mode 100644 plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts create mode 100644 plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts create mode 100644 plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts create mode 100644 plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts create mode 100644 plugin-server/src/worker/ingestion/event-pipeline/runner.ts diff --git a/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts new file mode 100644 index 0000000000000..5153c7b474d68 --- /dev/null +++ b/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts @@ -0,0 +1,11 @@ +import { Person, PreIngestionEvent } from '../../../types' +import { EventPipelineRunner, StepResult } from './runner' + +export async function createEventStep( + runner: EventPipelineRunner, + event: PreIngestionEvent, + person: Person | undefined +): Promise { + const [, , elements] = await runner.hub.eventsProcessor.createEvent(event) + return runner.nextStep('runAsyncHandlersStep', event, person, elements) +} diff --git a/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts new file mode 100644 index 0000000000000..9b5a792369c06 --- /dev/null +++ b/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts @@ -0,0 +1,20 @@ +import { PreIngestionEvent } from '../../../types' +import { shouldSendEventToBuffer } from '../ingest-event' +import { EventPipelineRunner, StepResult } from './runner' + +export async function determineShouldBufferStep( + runner: EventPipelineRunner, + event: PreIngestionEvent +): Promise { + const person = await runner.hub.db.fetchPerson(event.teamId, event.distinctId) + + // even if the buffer is disabled we want to get metrics on how many events would have gone to it + const sendEventToBuffer = shouldSendEventToBuffer(runner.hub, event, person, event.teamId) + + if (sendEventToBuffer) { + await runner.hub.eventsProcessor.produceEventToBuffer(event) + return null + } else { + return runner.nextStep('createEventStep', event, person) + } +} diff --git a/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts new file mode 100644 index 0000000000000..740e1678faa4b --- /dev/null +++ b/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts @@ -0,0 +1,30 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' + +import { runInstrumentedFunction } from '../../../main/utils' +import { runProcessEvent } from '../../plugins/run' +import { EventPipelineRunner, StepResult } from './runner' + +export async function pluginsProcessEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { + let processedEvent: PluginEvent | null = event + + // run processEvent on all events that are not $snapshot + if (event.event !== '$snapshot') { + processedEvent = await runInstrumentedFunction({ + server: runner.hub, + event, + func: (event) => runProcessEvent(runner.hub, event), + statsKey: 'kafka_queue.single_event', + timeoutMessage: 'Still running plugins on event. Timeout warning after 30 sec!', + }) + } + + if (processedEvent) { + return runner.nextStep('prepareEventStep', processedEvent) + } else { + // processEvent might not return an event. This is expected and plugins, e.g. downsample plugin uses it. + runner.hub.statsd?.increment('kafka_queue.dropped_event', { + teamID: String(event.team_id), + }) + return null + } +} diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts new file mode 100644 index 0000000000000..08930265cf422 --- /dev/null +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -0,0 +1,27 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' +import { DateTime } from 'luxon' + +import { EventPipelineRunner, StepResult } from './runner' + +export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { + const { ip, site_url, team_id, now, sent_at, uuid } = event + const distinctId = String(event.distinct_id) + const preIngestionEvent = await runner.hub.eventsProcessor.processEvent( + distinctId, + ip, + event, + team_id, + DateTime.fromISO(now), + sent_at ? DateTime.fromISO(sent_at) : null, + uuid!, // it will throw if it's undefined, + site_url + ) + + if (preIngestionEvent && preIngestionEvent.event !== '$snapshot') { + return runner.nextStep('determineShouldBufferStep', preIngestionEvent) + } else if (preIngestionEvent && preIngestionEvent.event === '$snapshot') { + return runner.nextStep('runAsyncHandlersStep', preIngestionEvent, undefined, undefined) + } else { + return null + } +} diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts new file mode 100644 index 0000000000000..b63ea78bc3111 --- /dev/null +++ b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts @@ -0,0 +1,47 @@ +import { runInstrumentedFunction } from '../../../main/utils' +import { Action, Element, Person, PreIngestionEvent } from '../../../types' +import { convertToProcessedPluginEvent } from '../../../utils/event' +import { runOnAction, runOnEvent, runOnSnapshot } from '../../plugins/run' +import { EventPipelineRunner, StepResult } from './runner' + +export async function runAsyncHandlersStep( + runner: EventPipelineRunner, + event: PreIngestionEvent, + person: Person | undefined, + elements: Element[] | undefined +): Promise { + const promises = [] + let actionMatches: Action[] = [] + if (event.event !== '$snapshot') { + actionMatches = await runner.hub.actionMatcher.match(event, person, elements) + promises.push(runner.hub.hookCannon.findAndFireHooks(event, person, event.siteUrl, actionMatches)) + } + + const processedPluginEvent = convertToProcessedPluginEvent(event) + const isSnapshot = event.event == 'snapshot' + const method = isSnapshot ? runOnSnapshot : runOnEvent + promises.push( + runInstrumentedFunction({ + server: runner.hub, + event: processedPluginEvent, + func: (event) => method(runner.hub, event), + statsKey: `kafka_queue.single_${isSnapshot ? 'on_snapshot' : 'on_event'}`, + timeoutMessage: `After 30 seconds still running ${isSnapshot ? 'onSnapshot' : 'onEvent'}`, + }) + ) + for (const actionMatch of actionMatches) { + promises.push( + runInstrumentedFunction({ + server: runner.hub, + event: processedPluginEvent, + func: (event) => runOnAction(runner.hub, actionMatch, event), + statsKey: `kafka_queue.on_action`, + timeoutMessage: 'After 30 seconds still running onAction', + }) + ) + } + + await Promise.all(promises) + + return null +} diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts new file mode 100644 index 0000000000000..efeca02ce4961 --- /dev/null +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -0,0 +1,74 @@ +import * as Sentry from '@sentry/node' + +import { Hub } from '../../../types' +import { createEventStep } from './createEventStep' +import { determineShouldBufferStep } from './determineShouldBufferStep' +import { pluginsProcessEventStep } from './pluginsProcessEventStep' +import { prepareEventStep } from './prepareEventStep' +import { runAsyncHandlersStep } from './runAsyncHandlersStep' + +type StepParameters any> = T extends ( + runner: EventPipelineRunner, + ...args: infer P +) => any + ? P + : never + +const EVENT_PIPELINE_STEPS = { + pluginsProcessEventStep, + prepareEventStep, + determineShouldBufferStep, + createEventStep, + runAsyncHandlersStep, +} + +type EventPipelineStepsType = typeof EVENT_PIPELINE_STEPS +type StepType = keyof EventPipelineStepsType +type NextStep = [StepType, StepParameters] + +export type StepResult = + | null + | NextStep<'pluginsProcessEventStep'> + | NextStep<'prepareEventStep'> + | NextStep<'determineShouldBufferStep'> + | NextStep<'createEventStep'> + | NextStep<'runAsyncHandlersStep'> + +// :TODO: Timers for every function +// :TODO: DLQ emit for failing on some steps +const EMIT_TO_DLQ_ON_FAILURE: Array = ['prepareEventStep', 'determineShouldBufferStep', 'createEventStep'] + +export class EventPipelineRunner { + hub: Hub + + constructor(hub: Hub) { + this.hub = hub + } + + async runStep>( + name: Step, + ...args: ArgsType + ): Promise { + let currentStepName: StepType = name + let currentArgs: any = args + + while (true) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + const stepResult = await EVENT_PIPELINE_STEPS[currentStepName](this, ...currentArgs) + + if (stepResult) { + ;[currentStepName, currentArgs] = stepResult + } else { + break + } + } + } + + nextStep>( + name: Step, + ...args: ArgsType + ): NextStep { + return [name, args] + } +} diff --git a/plugin-server/src/worker/ingestion/ingest-event.ts b/plugin-server/src/worker/ingestion/ingest-event.ts index f9dbb274f1787..db115455e2ce1 100644 --- a/plugin-server/src/worker/ingestion/ingest-event.ts +++ b/plugin-server/src/worker/ingestion/ingest-event.ts @@ -89,7 +89,7 @@ async function handleActionMatches( // TL;DR: events from a recently created non-anonymous person are sent to a buffer // because their person_id might change. We merge based on the person_id of the anonymous user // so ingestion is delayed for those events to increase our chances of getting person_id correctly -function shouldSendEventToBuffer( +export function shouldSendEventToBuffer( hub: Hub, event: PreIngestionEvent, person: Person | undefined, From 7e26d78c0d00978c517d463dd52ca6fc320ade2e Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Mon, 16 May 2022 13:37:30 +0300 Subject: [PATCH 02/22] Add some initial metrics --- .../src/worker/ingestion/event-pipeline/runner.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index efeca02ce4961..c343443fe2eec 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -1,3 +1,4 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import { Hub } from '../../../types' @@ -40,9 +41,11 @@ const EMIT_TO_DLQ_ON_FAILURE: Array = ['prepareEventStep', 'determineS export class EventPipelineRunner { hub: Hub + originalEvent: PluginEvent - constructor(hub: Hub) { + constructor(hub: Hub, originalEvent: PluginEvent) { this.hub = hub + this.originalEvent = originalEvent } async runStep>( @@ -53,13 +56,21 @@ export class EventPipelineRunner { let currentArgs: any = args while (true) { + const timer = new Date() // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore const stepResult = await EVENT_PIPELINE_STEPS[currentStepName](this, ...currentArgs) + this.hub.statsd?.increment('kafka_queue.event_pipeline.step', { step: currentStepName }) + this.hub.statsd?.timing('kafka_queue.event_pipeline.step.timing', timer, { step: currentStepName }) + if (stepResult) { ;[currentStepName, currentArgs] = stepResult } else { + this.hub.statsd?.increment('kafka_queue.event_pipeline.step.dropped', { + step: currentStepName, + team_id: String(this.originalEvent.team_id), + }) break } } From 6988d0cad72bb5517440cad3c41d53e948b5b693 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Mon, 16 May 2022 13:58:59 +0300 Subject: [PATCH 03/22] Handle DLQ error messages in pipeline runner --- .../worker/ingestion/event-pipeline/runner.ts | 64 +++++++++++++------ 1 file changed, 46 insertions(+), 18 deletions(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index c343443fe2eec..6b29873c37bb8 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -2,6 +2,8 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import { Hub } from '../../../types' +import { status } from '../../../utils/status' +import { generateEventDeadLetterQueueMessage } from '../utils' import { createEventStep } from './createEventStep' import { determineShouldBufferStep } from './determineShouldBufferStep' import { pluginsProcessEventStep } from './pluginsProcessEventStep' @@ -35,15 +37,17 @@ export type StepResult = | NextStep<'createEventStep'> | NextStep<'runAsyncHandlersStep'> -// :TODO: Timers for every function -// :TODO: DLQ emit for failing on some steps -const EMIT_TO_DLQ_ON_FAILURE: Array = ['prepareEventStep', 'determineShouldBufferStep', 'createEventStep'] +const STEPS_TO_EMIT_TO_DLQ_ON_FAILURE: Array = [ + 'prepareEventStep', + 'determineShouldBufferStep', + 'createEventStep', +] export class EventPipelineRunner { hub: Hub - originalEvent: PluginEvent + originalEvent: PluginEvent | undefined - constructor(hub: Hub, originalEvent: PluginEvent) { + constructor(hub: Hub, originalEvent?: PluginEvent) { this.hub = hub this.originalEvent = originalEvent } @@ -57,29 +61,53 @@ export class EventPipelineRunner { while (true) { const timer = new Date() - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - const stepResult = await EVENT_PIPELINE_STEPS[currentStepName](this, ...currentArgs) + try { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + const stepResult = await EVENT_PIPELINE_STEPS[currentStepName](this, ...currentArgs) - this.hub.statsd?.increment('kafka_queue.event_pipeline.step', { step: currentStepName }) - this.hub.statsd?.timing('kafka_queue.event_pipeline.step.timing', timer, { step: currentStepName }) + this.hub.statsd?.increment('kafka_queue.event_pipeline.step', { step: currentStepName }) + this.hub.statsd?.timing('kafka_queue.event_pipeline.step.timing', timer, { step: currentStepName }) - if (stepResult) { - ;[currentStepName, currentArgs] = stepResult - } else { - this.hub.statsd?.increment('kafka_queue.event_pipeline.step.dropped', { - step: currentStepName, - team_id: String(this.originalEvent.team_id), - }) + if (stepResult) { + ;[currentStepName, currentArgs] = stepResult + } else { + this.hub.statsd?.increment('kafka_queue.event_pipeline.step.dropped', { + step: currentStepName, + team_id: String(this.originalEvent?.team_id), + }) + break + } + } catch (err) { + await this.handleError(err, currentStepName, currentArgs) break } } } - nextStep>( + nextStep>( name: Step, ...args: ArgsType ): NextStep { return [name, args] } + + private async handleError(err: any, currentStepName: StepType, currentArgs: any) { + status.info('🔔', err) + Sentry.captureException(err, { extra: { currentStepName, currentArgs, originalEvent: this.originalEvent } }) + this.hub.statsd?.increment('kafka_queue.event_pipeline.step.error', { step: currentStepName }) + + if (this.originalEvent && STEPS_TO_EMIT_TO_DLQ_ON_FAILURE.includes(currentStepName)) { + try { + const message = generateEventDeadLetterQueueMessage(this.originalEvent, err) + await this.hub.db.kafkaProducer!.queueMessage(message) + this.hub.statsd?.increment('events_added_to_dead_letter_queue') + } catch (dlqError) { + status.info('🔔', `Errored trying to add event to dead letter queue. Error: ${dlqError}`) + Sentry.captureException(dlqError, { + extra: { currentStepName, currentArgs, originalEvent: this.originalEvent, err }, + }) + } + } + } } From 9d28061145d83acef98ffea023d7cf8e96a577df Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Mon, 16 May 2022 14:52:28 +0300 Subject: [PATCH 04/22] Add public functions for the pipeline --- .../src/worker/ingestion/event-pipeline/runner.ts | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 6b29873c37bb8..ddcabd0fbb84d 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -1,7 +1,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' -import { Hub } from '../../../types' +import { Hub, PreIngestionEvent } from '../../../types' import { status } from '../../../utils/status' import { generateEventDeadLetterQueueMessage } from '../utils' import { createEventStep } from './createEventStep' @@ -38,6 +38,7 @@ export type StepResult = | NextStep<'runAsyncHandlersStep'> const STEPS_TO_EMIT_TO_DLQ_ON_FAILURE: Array = [ + 'pluginsProcessEventStep', 'prepareEventStep', 'determineShouldBufferStep', 'createEventStep', @@ -52,6 +53,17 @@ export class EventPipelineRunner { this.originalEvent = originalEvent } + async runMainPipeline(event: PluginEvent): Promise { + await this.runStep('prepareEventStep', event) + this.hub.statsd?.increment('kafka_queue.single_event.processed_and_ingested') + } + + async runBufferPipeline(event: PreIngestionEvent): Promise { + const person = await this.hub.db.fetchPerson(event.teamId, event.distinctId) + await this.runStep('createEventStep', event, person) + this.hub.statsd?.increment('kafka_queue.buffer_envent.processed_and_ingested') + } + async runStep>( name: Step, ...args: ArgsType From 8bafa40e80f6912cae67d91c729d0ef9037f39e3 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Mon, 16 May 2022 16:41:32 +0300 Subject: [PATCH 05/22] Tests for runner.ts --- .../worker/ingestion/event-pipeline/runner.ts | 32 +-- .../__snapshots__/runner.test.ts.snap | 122 +++++++++++ .../ingestion/event-pipeline/runner.test.ts | 190 ++++++++++++++++++ 3 files changed, 332 insertions(+), 12 deletions(-) create mode 100644 plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap create mode 100644 plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index ddcabd0fbb84d..8e6fb1bee6f58 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -10,7 +10,7 @@ import { pluginsProcessEventStep } from './pluginsProcessEventStep' import { prepareEventStep } from './prepareEventStep' import { runAsyncHandlersStep } from './runAsyncHandlersStep' -type StepParameters any> = T extends ( +export type StepParameters any> = T extends ( runner: EventPipelineRunner, ...args: infer P ) => any @@ -25,9 +25,9 @@ const EVENT_PIPELINE_STEPS = { runAsyncHandlersStep, } -type EventPipelineStepsType = typeof EVENT_PIPELINE_STEPS -type StepType = keyof EventPipelineStepsType -type NextStep = [StepType, StepParameters] +export type EventPipelineStepsType = typeof EVENT_PIPELINE_STEPS +export type StepType = keyof EventPipelineStepsType +export type NextStep = [StepType, StepParameters] export type StepResult = | null @@ -54,17 +54,17 @@ export class EventPipelineRunner { } async runMainPipeline(event: PluginEvent): Promise { - await this.runStep('prepareEventStep', event) + await this.runPipeline('pluginsProcessEventStep', event) this.hub.statsd?.increment('kafka_queue.single_event.processed_and_ingested') } async runBufferPipeline(event: PreIngestionEvent): Promise { const person = await this.hub.db.fetchPerson(event.teamId, event.distinctId) - await this.runStep('createEventStep', event, person) - this.hub.statsd?.increment('kafka_queue.buffer_envent.processed_and_ingested') + await this.runPipeline('createEventStep', event, person) + this.hub.statsd?.increment('kafka_queue.buffer_event.processed_and_ingested') } - async runStep>( + private async runPipeline>( name: Step, ...args: ArgsType ): Promise { @@ -74,9 +74,7 @@ export class EventPipelineRunner { while (true) { const timer = new Date() try { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - const stepResult = await EVENT_PIPELINE_STEPS[currentStepName](this, ...currentArgs) + const stepResult = await this.runStep(currentStepName, ...currentArgs) this.hub.statsd?.increment('kafka_queue.event_pipeline.step', { step: currentStepName }) this.hub.statsd?.timing('kafka_queue.event_pipeline.step.timing', timer, { step: currentStepName }) @@ -84,7 +82,7 @@ export class EventPipelineRunner { if (stepResult) { ;[currentStepName, currentArgs] = stepResult } else { - this.hub.statsd?.increment('kafka_queue.event_pipeline.step.dropped', { + this.hub.statsd?.increment('kafka_queue.event_pipeline.step.last', { step: currentStepName, team_id: String(this.originalEvent?.team_id), }) @@ -97,6 +95,16 @@ export class EventPipelineRunner { } } + protected runStep>( + name: Step, + ...args: ArgsType + ): Promise { + // :TODO: timeoutGuard per step + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + return EVENT_PIPELINE_STEPS[name](this, ...args) + } + nextStep>( name: Step, ...args: ArgsType diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap new file mode 100644 index 0000000000000..c50ea4fb3cd1b --- /dev/null +++ b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap @@ -0,0 +1,122 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`EventPipelineRunner runBufferPipeline() runs remaining steps 1`] = ` +Array [ + Array [ + "createEventStep", + Array [ + Object { + "distinctId": "my_id", + "elementsList": Array [], + "event": "$pageview", + "eventUuid": "uuid1", + "ip": "127.0.0.1", + "properties": Object {}, + "siteUrl": "example.com", + "teamId": 2, + "timestamp": "2020-02-23T02:15:00Z", + }, + undefined, + ], + ], + Array [ + "runAsyncHandlersStep", + Array [ + Object { + "distinctId": "my_id", + "elementsList": Array [], + "event": "$pageview", + "eventUuid": "uuid1", + "ip": "127.0.0.1", + "properties": Object {}, + "siteUrl": "example.com", + "teamId": 2, + "timestamp": "2020-02-23T02:15:00Z", + }, + ], + ], +] +`; + +exports[`EventPipelineRunner runMainPipeline() runs all steps 1`] = ` +Array [ + Array [ + "pluginsProcessEventStep", + Array [ + Object { + "distinct_id": "my_id", + "event": "default event", + "ip": "127.0.0.1", + "now": "2020-02-23T02:15:00Z", + "properties": Object {}, + "site_url": "http://localhost", + "team_id": 2, + "timestamp": "2020-02-23T02:15:00Z", + }, + ], + ], + Array [ + "prepareEventStep", + Array [ + Object { + "distinct_id": "my_id", + "event": "default event", + "ip": "127.0.0.1", + "now": "2020-02-23T02:15:00Z", + "properties": Object {}, + "site_url": "http://localhost", + "team_id": 2, + "timestamp": "2020-02-23T02:15:00Z", + }, + ], + ], + Array [ + "determineShouldBufferStep", + Array [ + Object { + "distinctId": "my_id", + "elementsList": Array [], + "event": "$pageview", + "eventUuid": "uuid1", + "ip": "127.0.0.1", + "properties": Object {}, + "siteUrl": "example.com", + "teamId": 2, + "timestamp": "2020-02-23T02:15:00Z", + }, + ], + ], + Array [ + "createEventStep", + Array [ + Object { + "distinctId": "my_id", + "elementsList": Array [], + "event": "$pageview", + "eventUuid": "uuid1", + "ip": "127.0.0.1", + "properties": Object {}, + "siteUrl": "example.com", + "teamId": 2, + "timestamp": "2020-02-23T02:15:00Z", + }, + ], + ], + Array [ + "runAsyncHandlersStep", + Array [ + Object { + "distinctId": "my_id", + "elementsList": Array [], + "event": "$pageview", + "eventUuid": "uuid1", + "ip": "127.0.0.1", + "properties": Object {}, + "siteUrl": "example.com", + "teamId": 2, + "timestamp": "2020-02-23T02:15:00Z", + }, + ], + ], +] +`; diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts new file mode 100644 index 0000000000000..2e3cb5bc247e7 --- /dev/null +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -0,0 +1,190 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' +import { mocked } from 'ts-jest/utils' + +import { PreIngestionEvent } from '../../../../src/types' +import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/createEventStep' +import { determineShouldBufferStep } from '../../../../src/worker/ingestion/event-pipeline/determineShouldBufferStep' +import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep' +import { prepareEventStep } from '../../../../src/worker/ingestion/event-pipeline/prepareEventStep' +import { runAsyncHandlersStep } from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' +import { + EventPipelineRunner, + EventPipelineStepsType, + StepParameters, + StepResult, + StepType, +} from '../../../../src/worker/ingestion/event-pipeline/runner' +import { generateEventDeadLetterQueueMessage } from '../../../../src/worker/ingestion/utils' + +jest.mock('../../../../src/utils/status') +jest.mock('../../../../src/worker/ingestion/event-pipeline/createEventStep') +jest.mock('../../../../src/worker/ingestion/event-pipeline/determineShouldBufferStep') +jest.mock('../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep') +jest.mock('../../../../src/worker/ingestion/event-pipeline/prepareEventStep') +jest.mock('../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep') +jest.mock('../../../../src/worker/ingestion/utils') + +class TestEventPipelineRunner extends EventPipelineRunner { + steps: Array = [] + stepsWithArgs: Array<[string, any[]]> = [] + + protected runStep>( + name: Step, + ...args: ArgsType + ): Promise { + this.steps.push(name) + this.stepsWithArgs.push([name, args]) + return super.runStep(name, ...args) + } +} + +const pluginEvent: PluginEvent = { + distinct_id: 'my_id', + ip: '127.0.0.1', + site_url: 'http://localhost', + team_id: 2, + now: '2020-02-23T02:15:00Z', + timestamp: '2020-02-23T02:15:00Z', + event: 'default event', + properties: {}, +} + +const preIngestionEvent: PreIngestionEvent = { + eventUuid: 'uuid1', + distinctId: 'my_id', + ip: '127.0.0.1', + siteUrl: 'example.com', + teamId: 2, + timestamp: '2020-02-23T02:15:00Z', + event: '$pageview', + properties: {}, + elementsList: [], +} + +describe('EventPipelineRunner', () => { + let runner: TestEventPipelineRunner + let hub: any + + beforeEach(() => { + hub = { + db: { + kafkaProducer: { queueMessage: jest.fn() }, + fetchPerson: jest.fn(), + }, + statsd: { + increment: jest.fn(), + timing: jest.fn(), + }, + } + runner = new TestEventPipelineRunner(hub, pluginEvent) + + mocked(pluginsProcessEventStep).mockResolvedValue(['prepareEventStep', [pluginEvent]]) + mocked(prepareEventStep).mockResolvedValue(['determineShouldBufferStep', [preIngestionEvent]]) + mocked(determineShouldBufferStep).mockResolvedValue(['createEventStep', [preIngestionEvent]]) + mocked(createEventStep).mockResolvedValue(['runAsyncHandlersStep', [preIngestionEvent]]) + mocked(runAsyncHandlersStep).mockResolvedValue(null) + }) + + describe('runMainPipeline()', () => { + it('runs all steps', async () => { + await runner.runMainPipeline(pluginEvent) + + expect(runner.steps).toEqual([ + 'pluginsProcessEventStep', + 'prepareEventStep', + 'determineShouldBufferStep', + 'createEventStep', + 'runAsyncHandlersStep', + ]) + expect(runner.stepsWithArgs).toMatchSnapshot() + }) + + it('emits metrics for every step', async () => { + await runner.runMainPipeline(pluginEvent) + + expect(hub.statsd.timing).toHaveBeenCalledTimes(5) + expect(hub.statsd.increment).toBeCalledTimes(7) + + expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step', { + step: 'createEventStep', + }) + expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step.last', { + step: 'runAsyncHandlersStep', + team_id: '2', + }) + expect(hub.statsd.increment).not.toHaveBeenCalledWith('kafka_queue.event_pipeline.step.error') + }) + + describe('early exits from pipeline', () => { + beforeEach(() => { + mocked(prepareEventStep).mockResolvedValue(null) + }) + + it('stops processing after step', async () => { + await runner.runMainPipeline(pluginEvent) + + expect(runner.steps).toEqual(['pluginsProcessEventStep', 'prepareEventStep']) + }) + + it('reports metrics and last step correctly', async () => { + await runner.runMainPipeline(pluginEvent) + + expect(hub.statsd.timing).toHaveBeenCalledTimes(2) + expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step.last', { + step: 'prepareEventStep', + team_id: '2', + }) + expect(hub.statsd.increment).not.toHaveBeenCalledWith('kafka_queue.event_pipeline.step.error') + }) + }) + + describe('errors during processing', () => { + const error = new Error('testError') + + it('runs and increments metrics', async () => { + mocked(prepareEventStep).mockRejectedValue(error) + + await runner.runMainPipeline(pluginEvent) + + expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step', { + step: 'pluginsProcessEventStep', + }) + expect(hub.statsd.increment).not.toHaveBeenCalledWith('kafka_queue.event_pipeline.step', { + step: 'prepareEventStep', + }) + expect(hub.statsd.increment).not.toHaveBeenCalledWith('kafka_queue.event_pipeline.step.last') + expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step.error', { + step: 'prepareEventStep', + }) + }) + + it('emits failures to dead letter queue until createEvent', async () => { + mocked(generateEventDeadLetterQueueMessage).mockReturnValue('DLQ event' as any) + mocked(prepareEventStep).mockRejectedValue(error) + + await runner.runMainPipeline(pluginEvent) + + expect(hub.db.kafkaProducer.queueMessage).toHaveBeenCalledWith('DLQ event' as any) + expect(hub.statsd.increment).toHaveBeenCalledWith('events_added_to_dead_letter_queue') + }) + + it('does not emit to dead letter queue for runAsyncHandlersStep', async () => { + mocked(runAsyncHandlersStep).mockRejectedValue(error) + + await runner.runMainPipeline(pluginEvent) + + expect(hub.db.kafkaProducer.queueMessage).not.toHaveBeenCalled() + expect(hub.statsd.increment).not.toHaveBeenCalledWith('events_added_to_dead_letter_queue') + }) + }) + }) + + describe('runBufferPipeline()', () => { + it('runs remaining steps', async () => { + await runner.runBufferPipeline(preIngestionEvent) + + expect(runner.steps).toEqual(['createEventStep', 'runAsyncHandlersStep']) + expect(runner.stepsWithArgs).toMatchSnapshot() + }) + }) +}) From f5c88cdb16c89893681bf5491fdf16af04b80998 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 17 May 2022 10:07:22 +0300 Subject: [PATCH 06/22] Tests for every step in event pipeline --- .../event-pipeline/runAsyncHandlersStep.ts | 2 +- .../event-pipeline/createEventStep.test.ts | 40 +++++++ .../determineShouldBufferStep.test.ts | 55 +++++++++ .../pluginsProcessEventStep.test.ts | 62 ++++++++++ .../event-pipeline/prepareEventStep.test.ts | 111 ++++++++++++++++++ .../runAsyncHandlersStep.test.ts | 95 +++++++++++++++ 6 files changed, 364 insertions(+), 1 deletion(-) create mode 100644 plugin-server/tests/worker/ingestion/event-pipeline/createEventStep.test.ts create mode 100644 plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts create mode 100644 plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts create mode 100644 plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts create mode 100644 plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts index b63ea78bc3111..d59e9bce897c9 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts @@ -18,7 +18,7 @@ export async function runAsyncHandlersStep( } const processedPluginEvent = convertToProcessedPluginEvent(event) - const isSnapshot = event.event == 'snapshot' + const isSnapshot = event.event === '$snapshot' const method = isSnapshot ? runOnSnapshot : runOnEvent promises.push( runInstrumentedFunction({ diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/createEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/createEventStep.test.ts new file mode 100644 index 0000000000000..1f34b1e9bdf39 --- /dev/null +++ b/plugin-server/tests/worker/ingestion/event-pipeline/createEventStep.test.ts @@ -0,0 +1,40 @@ +import { PreIngestionEvent } from '../../../../src/types' +import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/createEventStep' + +jest.mock('../../../../src/worker/plugins/run') + +const preIngestionEvent: PreIngestionEvent = { + eventUuid: 'uuid1', + distinctId: 'my_id', + ip: '127.0.0.1', + siteUrl: 'example.com', + teamId: 2, + timestamp: '2020-02-23T02:15:00Z', + event: '$pageview', + properties: {}, + elementsList: [], +} + +const testPerson: any = { id: 'testid' } +const testElements: any = ['element1', 'element2'] + +describe('createEventStep()', () => { + let runner: any + + beforeEach(() => { + runner = { + nextStep: (...args: any[]) => args, + hub: { + eventsProcessor: { + createEvent: () => [null, null, testElements], + }, + }, + } + }) + + it('calls `createEvent` and forwards to `runAsyncHandlersStep`', async () => { + const response = await createEventStep(runner, preIngestionEvent, testPerson) + + expect(response).toEqual(['runAsyncHandlersStep', preIngestionEvent, testPerson, testElements]) + }) +}) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts new file mode 100644 index 0000000000000..c7c2373fadc41 --- /dev/null +++ b/plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts @@ -0,0 +1,55 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' +import { mocked } from 'ts-jest/utils' + +import { PreIngestionEvent } from '../../../../src/types' +import { determineShouldBufferStep } from '../../../../src/worker/ingestion/event-pipeline/determineShouldBufferStep' +import { shouldSendEventToBuffer } from '../../../../src/worker/ingestion/ingest-event' + +jest.mock('../../../../src/worker/ingestion/ingest-event') + +const preIngestionEvent: PreIngestionEvent = { + eventUuid: 'uuid1', + distinctId: 'my_id', + ip: '127.0.0.1', + siteUrl: 'example.com', + teamId: 2, + timestamp: '2020-02-23T02:15:00Z', + event: '$pageview', + properties: {}, + elementsList: [], +} + +describe('determineShouldBufferStep()', () => { + let runner: any + const testPerson: any = { id: 'testid' } + + beforeEach(() => { + runner = { + nextStep: (...args: any[]) => args, + hub: { + db: { fetchPerson: () => Promise.resolve(testPerson) }, + eventsProcessor: { + produceEventToBuffer: jest.fn(), + }, + }, + } + }) + + it('calls `produceEventToBuffer` if event should be buffered, stops processing', async () => { + mocked(shouldSendEventToBuffer).mockReturnValue(true) + + const response = await determineShouldBufferStep(runner, preIngestionEvent) + + expect(runner.hub.eventsProcessor.produceEventToBuffer).toHaveBeenCalledWith(preIngestionEvent) + expect(response).toEqual(null) + }) + + it('calls `createEventStep` next if not buffering', async () => { + mocked(shouldSendEventToBuffer).mockReturnValue(false) + + const response = await determineShouldBufferStep(runner, preIngestionEvent) + + expect(response).toEqual(['createEventStep', preIngestionEvent, testPerson]) + expect(runner.hub.eventsProcessor.produceEventToBuffer).not.toHaveBeenCalled() + }) +}) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts new file mode 100644 index 0000000000000..2f7acce893656 --- /dev/null +++ b/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts @@ -0,0 +1,62 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' +import { mocked } from 'ts-jest/utils' + +import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep' +import { runProcessEvent } from '../../../../src/worker/plugins/run' + +jest.mock('../../../../src/worker/plugins/run') + +const pluginEvent: PluginEvent = { + distinct_id: 'my_id', + ip: '127.0.0.1', + site_url: 'http://localhost', + team_id: 2, + now: '2020-02-23T02:15:00Z', + timestamp: '2020-02-23T02:15:00Z', + event: 'default event', + properties: {}, + uuid: '017ef865-19da-0000-3b60-1506093bf40f', +} + +describe('pluginsProcessEventStep()', () => { + let runner: any + + beforeEach(() => { + runner = { + nextStep: (...args: any[]) => args, + hub: { + statsd: { + increment: jest.fn(), + timing: jest.fn(), + }, + }, + } + }) + + it('forwards processed plugin event to `prepareEventStep`', async () => { + const processedEvent = { ...pluginEvent, event: 'processed' } + mocked(runProcessEvent).mockResolvedValue(processedEvent) + + const response = await pluginsProcessEventStep(runner, pluginEvent) + + expect(response).toEqual(['prepareEventStep', processedEvent]) + }) + + it('automatically forwards `$snapshot` events', async () => { + const event = { ...pluginEvent, event: '$snapshot' } + + const response = await pluginsProcessEventStep(runner, event) + + expect(runProcessEvent).not.toHaveBeenCalled() + expect(response).toEqual(['prepareEventStep', event]) + }) + + it('does not forward but counts dropped events by plugins', async () => { + mocked(runProcessEvent).mockResolvedValue(null) + + const response = await pluginsProcessEventStep(runner, pluginEvent) + + expect(response).toEqual(null) + expect(runner.hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.dropped_event', { teamID: '2' }) + }) +}) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts new file mode 100644 index 0000000000000..271ba847c5cd6 --- /dev/null +++ b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts @@ -0,0 +1,111 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' +import { DateTime } from 'luxon' + +import { Hub } from '../../../../src/types' +import { createHub } from '../../../../src/utils/db/hub' +import { UUIDT } from '../../../../src/utils/utils' +import { prepareEventStep } from '../../../../src/worker/ingestion/event-pipeline/prepareEventStep' +import { resetTestDatabase } from '../../../helpers/sql' + +jest.mock('../../../../src/utils/status') + +const pluginEvent: PluginEvent = { + distinct_id: 'my_id', + ip: '127.0.0.1', + site_url: 'http://localhost', + team_id: 2, + now: '2020-02-23T02:15:00Z', + timestamp: '2020-02-23T02:15:00Z', + event: 'default event', + properties: {}, + uuid: '017ef865-19da-0000-3b60-1506093bf40f', +} + +describe('prepareEventStep()', () => { + let runner: any + let hub: Hub + let closeHub: () => Promise + + beforeEach(async () => { + await resetTestDatabase() + ;[hub, closeHub] = await createHub() + + // :KLUDGE: We test below whether kafka messages are produced, so make sure the person exists beforehand. + await hub.db.createPerson( + DateTime.utc(), + {}, + {}, + {}, + pluginEvent.team_id, + null, + false, + new UUIDT().toString(), + ['my_id'] + ) + hub.db.kafkaProducer!.queueMessage = jest.fn() + + runner = { + nextStep: (...args: any[]) => args, + hub, + } + }) + + afterEach(async () => { + await closeHub() + }) + + it('goes to `determineShouldBufferStep` for normal events', async () => { + const response = await prepareEventStep(runner, pluginEvent) + + expect(response).toEqual([ + 'determineShouldBufferStep', + { + distinctId: 'my_id', + elementsList: [], + event: 'default event', + eventUuid: '017ef865-19da-0000-3b60-1506093bf40f', + ip: '127.0.0.1', + properties: { + $ip: '127.0.0.1', + }, + siteUrl: 'http://localhost', + teamId: 2, + timestamp: expect.any(DateTime), + }, + ]) + expect(hub.db.kafkaProducer!.queueMessage).not.toHaveBeenCalled() + }) + + it('produces to kafka and to `runAsyncHandlersStep` for $snapshot events', async () => { + const response = await prepareEventStep(runner, { ...pluginEvent, event: '$snapshot' }) + + expect(response).toEqual([ + 'runAsyncHandlersStep', + { + distinctId: 'my_id', + elementsList: [], + event: '$snapshot', + eventUuid: '017ef865-19da-0000-3b60-1506093bf40f', + ip: '127.0.0.1', + properties: { + $ip: '127.0.0.1', + }, + siteUrl: 'http://localhost', + teamId: 2, + timestamp: '2020-02-23 02:15:00.000', + }, + undefined, + undefined, + ]) + expect(hub.db.kafkaProducer!.queueMessage).toHaveBeenCalled() + }) + + it('does not continue if event is ignored', async () => { + await hub.db.postgresQuery('UPDATE posthog_team SET session_recording_opt_in = $1', [false], 'testRecordings') + + const response = await prepareEventStep(runner, { ...pluginEvent, event: '$snapshot' }) + + expect(response).toEqual(null) + expect(hub.db.kafkaProducer!.queueMessage).not.toHaveBeenCalled() + }) +}) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts new file mode 100644 index 0000000000000..e806f934d4ced --- /dev/null +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts @@ -0,0 +1,95 @@ +import { PreIngestionEvent } from '../../../../src/types' +import { convertToProcessedPluginEvent } from '../../../../src/utils/event' +import { runAsyncHandlersStep } from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' +import { runOnAction, runOnEvent, runOnSnapshot } from '../../../../src/worker/plugins/run' + +jest.mock('../../../../src/worker/plugins/run') + +const preIngestionEvent: PreIngestionEvent = { + eventUuid: 'uuid1', + distinctId: 'my_id', + ip: '127.0.0.1', + siteUrl: 'example.com', + teamId: 2, + timestamp: '2020-02-23T02:15:00Z', + event: '$pageview', + properties: {}, + elementsList: [], +} +const snapshotEvent = { + ...preIngestionEvent, + event: '$snapshot', +} + +const testPerson: any = { id: 'testid' } +const testElements: any = ['element1', 'element2'] + +describe('runAsyncHandlersStep()', () => { + let runner: any + + beforeEach(() => { + runner = { + nextStep: (...args: any[]) => args, + hub: { + actionMatcher: { + match: jest.fn().mockResolvedValue(['action1', 'action2']), + }, + hookCannon: { + findAndFireHooks: jest.fn().mockResolvedValue(true), + }, + }, + } + }) + + it('stops processing', async () => { + const response = await runAsyncHandlersStep(runner, preIngestionEvent, testPerson, testElements) + + expect(response).toEqual(null) + }) + + it('does action matching and fires webhooks', async () => { + await runAsyncHandlersStep(runner, preIngestionEvent, testPerson, testElements) + + expect(runner.hub.actionMatcher.match).toHaveBeenCalled() + expect(runner.hub.hookCannon.findAndFireHooks).toHaveBeenCalledWith( + preIngestionEvent, + testPerson, + 'example.com', + ['action1', 'action2'] + ) + }) + + it('calls onEvent and onAction plugin methods', async () => { + await runAsyncHandlersStep(runner, preIngestionEvent, testPerson, testElements) + + expect(runOnEvent).toHaveBeenCalledWith(runner.hub, convertToProcessedPluginEvent(preIngestionEvent)) + expect(runOnAction).toHaveBeenCalledWith( + runner.hub, + 'action1', + convertToProcessedPluginEvent(preIngestionEvent) + ) + expect(runOnAction).toHaveBeenCalledWith( + runner.hub, + 'action2', + convertToProcessedPluginEvent(preIngestionEvent) + ) + expect(runOnSnapshot).not.toHaveBeenCalled() + }) + + describe('$snapshot events', () => { + it('does not do action matching or webhook firing', async () => { + await runAsyncHandlersStep(runner, snapshotEvent, testPerson, testElements) + + expect(runner.hub.actionMatcher.match).not.toHaveBeenCalled() + expect(runner.hub.hookCannon.findAndFireHooks).not.toHaveBeenCalled() + }) + + it('calls only onSnapshot plugin methods', async () => { + await runAsyncHandlersStep(runner, snapshotEvent, testPerson, testElements) + + expect(runOnSnapshot).toHaveBeenCalledWith(runner.hub, convertToProcessedPluginEvent(snapshotEvent)) + expect(runOnEvent).not.toHaveBeenCalled() + expect(runOnAction).not.toHaveBeenCalled() + }) + }) +}) From 9f5f764935e9eef7cf426b4633e744830b3026eb Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 17 May 2022 11:06:30 +0300 Subject: [PATCH 07/22] yeet some now-unneeded worker code --- .../src/main/ingestion-queues/queue.ts | 25 ------ plugin-server/src/types.ts | 5 -- .../src/worker/ingestion/event-pipeline.ts | 79 ----------------- .../src/worker/ingestion/ingest-event.ts | 85 +------------------ plugin-server/src/worker/tasks.ts | 27 ++---- 5 files changed, 7 insertions(+), 214 deletions(-) delete mode 100644 plugin-server/src/worker/ingestion/event-pipeline.ts diff --git a/plugin-server/src/main/ingestion-queues/queue.ts b/plugin-server/src/main/ingestion-queues/queue.ts index 1643007051934..13edf3951dd94 100644 --- a/plugin-server/src/main/ingestion-queues/queue.ts +++ b/plugin-server/src/main/ingestion-queues/queue.ts @@ -39,31 +39,6 @@ export async function startQueues( workerMethods: Partial = {} ): Promise { const mergedWorkerMethods = { - onEvent: (event: ProcessedPluginEvent) => { - server.lastActivity = new Date().valueOf() - server.lastActivityType = 'onEvent' - return piscina.run({ task: 'onEvent', args: { event } }) - }, - onAction: (action: Action, event: ProcessedPluginEvent) => { - server.lastActivity = new Date().valueOf() - server.lastActivityType = 'onAction' - return piscina.run({ task: 'onAction', args: { event, action } }) - }, - onSnapshot: (event: ProcessedPluginEvent) => { - server.lastActivity = new Date().valueOf() - server.lastActivityType = 'onSnapshot' - return piscina.run({ task: 'onSnapshot', args: { event } }) - }, - processEvent: (event: PluginEvent) => { - server.lastActivity = new Date().valueOf() - server.lastActivityType = 'processEvent' - return piscina.run({ task: 'processEvent', args: { event } }) - }, - ingestEvent: (event: PluginEvent) => { - server.lastActivity = new Date().valueOf() - server.lastActivityType = 'ingestEvent' - return piscina.run({ task: 'ingestEvent', args: { event } }) - }, ingestBufferEvent: (event: PreIngestionEvent) => { server.lastActivity = new Date().valueOf() server.lastActivityType = 'ingestBufferEvent' diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 55b6857d36f33..56181a43ed4d7 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -375,11 +375,6 @@ export interface PluginTask { } export type WorkerMethods = { - onEvent: (event: ProcessedPluginEvent) => Promise - onAction: (action: Action, event: ProcessedPluginEvent) => Promise - onSnapshot: (event: ProcessedPluginEvent) => Promise - processEvent: (event: PluginEvent) => Promise - ingestEvent: (event: PluginEvent) => Promise ingestBufferEvent: (event: PreIngestionEvent) => Promise runEventPipeline: (event: PluginEvent) => Promise } diff --git a/plugin-server/src/worker/ingestion/event-pipeline.ts b/plugin-server/src/worker/ingestion/event-pipeline.ts deleted file mode 100644 index 7260a479b376f..0000000000000 --- a/plugin-server/src/worker/ingestion/event-pipeline.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { PluginEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold' - -import { runInstrumentedFunction } from '../../main/utils' -import { Hub } from '../../types' -import { convertToProcessedPluginEvent } from '../../utils/event' -import { runOnAction, runOnEvent, runOnSnapshot, runProcessEvent } from '../plugins/run' -import { ingestEvent } from './ingest-event' - -export async function runEventPipeline(server: Hub, event: PluginEvent): Promise { - const processedEvent = await processEvent(server, event) - - if (processedEvent) { - const ingestEventResult = await runInstrumentedFunction({ - server, - event: processedEvent, - func: (event) => ingestEvent(server, event), - statsKey: 'kafka_queue.single_ingestion', - timeoutMessage: 'After 30 seconds still ingesting event', - }) - - server.statsd?.increment('kafka_queue_single_event_processed_and_ingested') - - if (ingestEventResult.success && ingestEventResult.preIngestionEvent) { - const processedPluginEvent = convertToProcessedPluginEvent(ingestEventResult.preIngestionEvent) - const promises = [onEvent(server, processedPluginEvent)] - for (const actionMatch of ingestEventResult.actionMatches) { - promises.push( - runInstrumentedFunction({ - server, - event: processedPluginEvent, - func: (event) => runOnAction(server, actionMatch, event), - statsKey: `kafka_queue.on_action`, - timeoutMessage: 'After 30 seconds still running onAction', - }) - ) - } - - await Promise.all(promises) - } - } else { - // processEvent might not return an event. This is expected and plugins, e.g. downsample plugin uses it. - server.statsd?.increment('kafka_queue.dropped_event', { - teamID: String(event.team_id), - }) - } -} - -export async function processEvent(server: Hub, event: PluginEvent): Promise { - const isSnapshot = event.event === '$snapshot' - - let processedEvent: PluginEvent | null = event - - // run processEvent on all events that are not $snapshot - if (!isSnapshot) { - processedEvent = await runInstrumentedFunction({ - server, - event, - func: (event) => runProcessEvent(server, event), - statsKey: 'kafka_queue.single_event', - timeoutMessage: 'Still running plugins on event. Timeout warning after 30 sec!', - }) - } - - return processedEvent -} - -export async function onEvent(server: Hub, event: ProcessedPluginEvent): Promise { - const isSnapshot = event.event === '$snapshot' - - const method = isSnapshot ? runOnSnapshot : runOnEvent - - await runInstrumentedFunction({ - server, - event: event, - func: (event) => method(server, event), - statsKey: `kafka_queue.single_${isSnapshot ? 'on_snapshot' : 'on_event'}`, - timeoutMessage: `After 30 seconds still running ${isSnapshot ? 'onSnapshot' : 'onEvent'}`, - }) -} diff --git a/plugin-server/src/worker/ingestion/ingest-event.ts b/plugin-server/src/worker/ingestion/ingest-event.ts index db115455e2ce1..ef965fc90ca6d 100644 --- a/plugin-server/src/worker/ingestion/ingest-event.ts +++ b/plugin-server/src/worker/ingestion/ingest-event.ts @@ -1,89 +1,6 @@ -import { PluginEvent } from '@posthog/plugin-scaffold' -import * as Sentry from '@sentry/node' import { DateTime } from 'luxon' -import { CachedPersonData } from 'utils/db/db' -import { Element, Hub, IngestEventResponse, Person, PreIngestionEvent, TeamId } from '../../types' -import { timeoutGuard } from '../../utils/db/utils' -import { status } from '../../utils/status' -import { Action } from './../../types' -import { generateEventDeadLetterQueueMessage } from './utils' - -export async function ingestEvent(hub: Hub, event: PluginEvent): Promise { - const timeout = timeoutGuard('Still ingesting event inside worker. Timeout warning after 30 sec!', { - event: JSON.stringify(event), - }) - try { - const { ip, site_url, team_id, now, sent_at, uuid } = event - const distinctId = String(event.distinct_id) - const preIngestionEvent = await hub.eventsProcessor.processEvent( - distinctId, - ip, - event, - team_id, - DateTime.fromISO(now), - sent_at ? DateTime.fromISO(sent_at) : null, - uuid!, // it will throw if it's undefined, - site_url - ) - - let actionMatches: Action[] = [] - - if (preIngestionEvent && preIngestionEvent.event !== '$snapshot') { - const person = await hub.db.fetchPerson(team_id, distinctId) - - // even if the buffer is disabled we want to get metrics on how many events would have gone to it - const sendEventToBuffer = shouldSendEventToBuffer(hub, preIngestionEvent, person, team_id) - - if (sendEventToBuffer) { - await hub.eventsProcessor.produceEventToBuffer(preIngestionEvent) - } else { - const [, , elements] = await hub.eventsProcessor.createEvent(preIngestionEvent) - actionMatches = await handleActionMatches(hub, preIngestionEvent, elements, person) - } - } - - return { actionMatches, preIngestionEvent, success: true } - } catch (e) { - status.info('🔔', e) - Sentry.captureException(e, { extra: { event } }) - - if (hub.db.kafkaProducer) { - try { - const message = generateEventDeadLetterQueueMessage(event, e) - await hub.db.kafkaProducer.queueMessage(message) - hub.statsd?.increment('events_added_to_dead_letter_queue') - } catch (dlqError) { - status.info('🔔', `Errored trying to add event ${event.event} to dead letter queue. Error: ${dlqError}`) - Sentry.captureException(e, { extra: { event } }) - } - } - return { success: false, error: e.message } - } finally { - clearTimeout(timeout) - } -} - -export async function ingestBufferEvent(hub: Hub, event: PreIngestionEvent): Promise { - const person = await hub.db.getPersonData(event.teamId, event.distinctId) - const [, , elements] = await hub.eventsProcessor.createEvent(event) - const actionMatches = await handleActionMatches(hub, event, elements, person ?? undefined) - return { success: true, actionMatches, preIngestionEvent: event } -} - -async function handleActionMatches( - hub: Hub, - event: PreIngestionEvent, - elements?: Element[], - person?: CachedPersonData | Person -): Promise { - let actionMatches: Action[] = [] - - actionMatches = await hub.actionMatcher.match(event, person, elements) - await hub.hookCannon.findAndFireHooks(event, person, event.siteUrl, actionMatches) - - return actionMatches -} +import { Hub, Person, PreIngestionEvent, TeamId } from '../../types' // context: https://github.com/PostHog/posthog/issues/9182 // TL;DR: events from a recently created non-anonymous person are sent to a buffer diff --git a/plugin-server/src/worker/tasks.ts b/plugin-server/src/worker/tasks.ts index 2ff99c9c830b2..237e0dada5ef0 100644 --- a/plugin-server/src/worker/tasks.ts +++ b/plugin-server/src/worker/tasks.ts @@ -1,28 +1,14 @@ -import { ProcessedPluginEvent } from '@posthog/plugin-scaffold' import { PluginEvent } from '@posthog/plugin-scaffold/src/types' +import { EventPipelineRunner } from 'worker/ingestion/event-pipeline/runner' import { Action, Alert, EnqueuedJob, Hub, PluginTaskType, PreIngestionEvent, Team } from '../types' -import { runEventPipeline } from './ingestion/event-pipeline' -import { ingestBufferEvent, ingestEvent } from './ingestion/ingest-event' -import { runHandleAlert, runOnAction, runOnEvent, runOnSnapshot, runPluginTask, runProcessEvent } from './plugins/run' +import { runHandleAlert, runPluginTask } from './plugins/run' import { loadSchedule, setupPlugins } from './plugins/setup' import { teardownPlugins } from './plugins/teardown' type TaskRunner = (hub: Hub, args: any) => Promise | any export const workerTasks: Record = { - onEvent: (hub, args: { event: ProcessedPluginEvent }) => { - return runOnEvent(hub, args.event) - }, - onAction: (hub, args: { event: ProcessedPluginEvent; action: Action }) => { - return runOnAction(hub, args.action, args.event) - }, - onSnapshot: (hub, args: { event: ProcessedPluginEvent }) => { - return runOnSnapshot(hub, args.event) - }, - processEvent: (hub, args: { event: PluginEvent }) => { - return runProcessEvent(hub, args.event) - }, handleAlert: async (hub, args: { alert: Alert }) => { return runHandleAlert(hub, args.alert) }, @@ -42,13 +28,12 @@ export const workerTasks: Record = { return hub.pluginSchedule }, runEventPipeline: async (hub, args: { event: PluginEvent }) => { - return await runEventPipeline(hub, args.event) - }, - ingestEvent: async (hub, args: { event: PluginEvent }) => { - return await ingestEvent(hub, args.event) + const runner = new EventPipelineRunner(hub, args.event) + await runner.runMainPipeline(args.event) }, ingestBufferEvent: async (hub, args: { event: PreIngestionEvent }) => { - return await ingestBufferEvent(hub, args.event) + const runner = new EventPipelineRunner(hub) + await runner.runBufferPipeline(args.event) }, reloadPlugins: async (hub) => { await setupPlugins(hub) From 05901a1e5b775072eddec7cee2a5efd269003507 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 17 May 2022 11:46:13 +0300 Subject: [PATCH 08/22] Add timeoutGuard --- .../worker/ingestion/event-pipeline/runner.ts | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 8e6fb1bee6f58..684467bc517d6 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -2,6 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import { Hub, PreIngestionEvent } from '../../../types' +import { timeoutGuard } from '../../../utils/db/utils' import { status } from '../../../utils/status' import { generateEventDeadLetterQueueMessage } from '../utils' import { createEventStep } from './createEventStep' @@ -99,10 +100,17 @@ export class EventPipelineRunner { name: Step, ...args: ArgsType ): Promise { - // :TODO: timeoutGuard per step - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - return EVENT_PIPELINE_STEPS[name](this, ...args) + const timeout = timeoutGuard('Event pipeline step stalled. Timeout warning after 30 sec!', { + step: name, + event: JSON.stringify(this.originalEvent), + }) + try { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + return EVENT_PIPELINE_STEPS[name](this, ...args) + } finally { + clearTimeout(timeout) + } } nextStep>( From 14c6a7a827f64a681287282c5ec9b5f58a4d1c19 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 17 May 2022 11:58:05 +0300 Subject: [PATCH 09/22] Emit to DLQ from buffer --- .../src/worker/ingestion/event-pipeline/runner.ts | 8 ++++---- plugin-server/src/worker/ingestion/utils.ts | 9 +++++---- plugin-server/src/worker/tasks.ts | 3 ++- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 684467bc517d6..e66f610163a42 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -1,4 +1,4 @@ -import { PluginEvent } from '@posthog/plugin-scaffold' +import { PluginEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import { Hub, PreIngestionEvent } from '../../../types' @@ -47,9 +47,9 @@ const STEPS_TO_EMIT_TO_DLQ_ON_FAILURE: Array = [ export class EventPipelineRunner { hub: Hub - originalEvent: PluginEvent | undefined + originalEvent: PluginEvent | ProcessedPluginEvent - constructor(hub: Hub, originalEvent?: PluginEvent) { + constructor(hub: Hub, originalEvent: PluginEvent | ProcessedPluginEvent) { this.hub = hub this.originalEvent = originalEvent } @@ -125,7 +125,7 @@ export class EventPipelineRunner { Sentry.captureException(err, { extra: { currentStepName, currentArgs, originalEvent: this.originalEvent } }) this.hub.statsd?.increment('kafka_queue.event_pipeline.step.error', { step: currentStepName }) - if (this.originalEvent && STEPS_TO_EMIT_TO_DLQ_ON_FAILURE.includes(currentStepName)) { + if (STEPS_TO_EMIT_TO_DLQ_ON_FAILURE.includes(currentStepName)) { try { const message = generateEventDeadLetterQueueMessage(this.originalEvent, err) await this.hub.db.kafkaProducer!.queueMessage(message) diff --git a/plugin-server/src/worker/ingestion/utils.ts b/plugin-server/src/worker/ingestion/utils.ts index 9ac901540f318..0906bd8194dc3 100644 --- a/plugin-server/src/worker/ingestion/utils.ts +++ b/plugin-server/src/worker/ingestion/utils.ts @@ -1,4 +1,4 @@ -import { PluginEvent } from '@posthog/plugin-scaffold' +import { PluginEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold' import { ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' @@ -14,7 +14,7 @@ function getClickhouseTimestampOrNull(isoTimestamp?: string): string | null { } export function generateEventDeadLetterQueueMessage( - event: PluginEvent, + event: PluginEvent | ProcessedPluginEvent, error: unknown, errorLocation = 'plugin_server_ingest_event' ): ProducerRecord { @@ -22,7 +22,8 @@ export function generateEventDeadLetterQueueMessage( if (error instanceof Error) { errorMessage += `Error: ${error.message}` } - const { now, sent_at, timestamp, ...usefulEvent } = event + const pluginEvent: PluginEvent = { now: event.timestamp, sent_at: event.timestamp, ...event } as any as PluginEvent + const { now, sent_at, timestamp, ...usefulEvent } = pluginEvent const currentTimestamp = getClickhouseTimestampOrNull(new Date().toISOString()) const eventNow = getClickhouseTimestampOrNull(now) @@ -30,7 +31,7 @@ export function generateEventDeadLetterQueueMessage( ...usefulEvent, event: safeClickhouseString(usefulEvent.event), distinct_id: safeClickhouseString(usefulEvent.distinct_id), - site_url: safeClickhouseString(usefulEvent.site_url), + site_url: safeClickhouseString(usefulEvent.site_url || ''), ip: safeClickhouseString(usefulEvent.ip || ''), id: new UUIDT().toString(), event_uuid: event.uuid, diff --git a/plugin-server/src/worker/tasks.ts b/plugin-server/src/worker/tasks.ts index 237e0dada5ef0..dac2de6d1157c 100644 --- a/plugin-server/src/worker/tasks.ts +++ b/plugin-server/src/worker/tasks.ts @@ -1,4 +1,5 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' +import { convertToProcessedPluginEvent } from 'utils/event' import { EventPipelineRunner } from 'worker/ingestion/event-pipeline/runner' import { Action, Alert, EnqueuedJob, Hub, PluginTaskType, PreIngestionEvent, Team } from '../types' @@ -32,7 +33,7 @@ export const workerTasks: Record = { await runner.runMainPipeline(args.event) }, ingestBufferEvent: async (hub, args: { event: PreIngestionEvent }) => { - const runner = new EventPipelineRunner(hub) + const runner = new EventPipelineRunner(hub, convertToProcessedPluginEvent(args.event)) await runner.runBufferPipeline(args.event) }, reloadPlugins: async (hub) => { From d897eafb3f775b2b26b048e607c88a9726c3a718 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 17 May 2022 12:24:46 +0300 Subject: [PATCH 10/22] Move some tests to a separate file --- .../event-pipeline-integration.test.ts | 126 ++++++++++++++++++ .../worker/ingestion/ingest-event.test.ts | 97 -------------- 2 files changed, 126 insertions(+), 97 deletions(-) create mode 100644 plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts new file mode 100644 index 0000000000000..43aa3ceb01b17 --- /dev/null +++ b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts @@ -0,0 +1,126 @@ +import { PluginEvent } from '@posthog/plugin-scaffold/src/types' +import fetch from 'node-fetch' +import { MockedFunction } from 'ts-jest/dist/utils/testing' + +import { Hook, Hub } from '../../../../src/types' +import { createHub } from '../../../../src/utils/db/hub' +import { UUIDT } from '../../../../src/utils/utils' +import { ActionMatcher } from '../../../../src/worker/ingestion/action-matcher' +import { EventPipelineRunner } from '../../../../src/worker/ingestion/event-pipeline/runner' +import { commonUserId } from '../../../helpers/plugins' +import { insertRow, resetTestDatabase } from '../../../helpers/sql' + +describe('Event Pipeline integration test', () => { + let hub: Hub + let closeServer: () => Promise + let actionMatcher: ActionMatcher + + const ingestEvent = (event: PluginEvent) => new EventPipelineRunner(hub, event).runMainPipeline(event) + + beforeEach(async () => { + await resetTestDatabase() + ;[hub, closeServer] = await createHub() + actionMatcher = hub.actionMatcher + }) + + afterEach(async () => { + await closeServer() + }) + + it('fires a webhook', async () => { + await hub.db.postgresQuery( + `UPDATE posthog_team SET slack_incoming_webhook = 'https://webhook.example.com/'`, + [], + 'testTag' + ) + + const event: PluginEvent = { + event: 'xyz', + properties: { foo: 'bar' }, + timestamp: new Date().toISOString(), + now: new Date().toISOString(), + team_id: 2, + distinct_id: 'abc', + ip: null, + site_url: 'https://example.com', + uuid: new UUIDT().toString(), + } + + await ingestEvent(event) + + const expectedPayload = { + text: '[Test Action](https://example.com/action/69) was triggered by [abc](https://example.com/person/abc)', + } + + expect(fetch).toHaveBeenCalledWith('https://webhook.example.com/', { + body: JSON.stringify(expectedPayload, undefined, 4), + headers: { 'Content-Type': 'application/json' }, + method: 'POST', + }) + }) + + it('fires a REST hook', async () => { + await hub.db.postgresQuery(`UPDATE posthog_organization SET available_features = '{"zapier"}'`, [], 'testTag') + await insertRow(hub.db.postgres, 'ee_hook', { + id: 'abc', + team_id: 2, + user_id: commonUserId, + resource_id: 69, + event: 'action_performed', + target: 'https://rest-hooks.example.com/', + created: new Date().toISOString(), + updated: new Date().toISOString(), + } as Hook) + + const event: PluginEvent = { + event: 'xyz', + properties: { foo: 'bar' }, + timestamp: new Date().toISOString(), + now: new Date().toISOString(), + team_id: 2, + distinct_id: 'abc', + ip: null, + site_url: 'https://example.com', + uuid: new UUIDT().toString(), + } + + await ingestEvent(event) + + const expectedPayload = { + hook: { + id: 'abc', + event: 'action_performed', + target: 'https://rest-hooks.example.com/', + }, + data: { + event: 'xyz', + properties: { + foo: 'bar', + }, + eventUuid: expect.any(String), + timestamp: expect.any(String), + teamId: 2, + distinctId: 'abc', + ip: null, + siteUrl: 'https://example.com', + elementsList: [], + person: { + id: expect.any(Number), + created_at: expect.any(String), + team_id: 2, + properties: {}, + uuid: expect.any(String), + }, + }, + } + + // Using a more verbose way instead of toHaveBeenCalledWith because we need to parse request body + // and use expect.any for a few payload properties, which wouldn't be possible in a simpler way + expect((fetch as MockedFunction).mock.calls[0][0]).toBe('https://rest-hooks.example.com/') + const secondArg = (fetch as MockedFunction).mock.calls[0][1] + expect(JSON.parse(secondArg!.body as unknown as string)).toStrictEqual(expectedPayload) + expect(JSON.parse(secondArg!.body as unknown as string)).toStrictEqual(expectedPayload) + expect(secondArg!.headers).toStrictEqual({ 'Content-Type': 'application/json' }) + expect(secondArg!.method).toBe('POST') + }) +}) diff --git a/plugin-server/tests/worker/ingestion/ingest-event.test.ts b/plugin-server/tests/worker/ingestion/ingest-event.test.ts index 1426168c818a6..5ff298e0b6b61 100644 --- a/plugin-server/tests/worker/ingestion/ingest-event.test.ts +++ b/plugin-server/tests/worker/ingestion/ingest-event.test.ts @@ -27,103 +27,6 @@ describe('ingestEvent', () => { await closeServer() }) - it('fires a webhook', async () => { - await hub.db.postgresQuery( - `UPDATE posthog_team SET slack_incoming_webhook = 'https://webhook.example.com/'`, - [], - 'testTag' - ) - - const event: PluginEvent = { - event: 'xyz', - properties: { foo: 'bar' }, - timestamp: new Date().toISOString(), - now: new Date().toISOString(), - team_id: 2, - distinct_id: 'abc', - ip: null, - site_url: 'https://example.com', - uuid: new UUIDT().toString(), - } - - await ingestEvent(hub, event) - - const expectedPayload = { - text: '[Test Action](https://example.com/action/69) was triggered by [abc](https://example.com/person/abc)', - } - - expect(fetch).toHaveBeenCalledWith('https://webhook.example.com/', { - body: JSON.stringify(expectedPayload, undefined, 4), - headers: { 'Content-Type': 'application/json' }, - method: 'POST', - }) - }) - - it('fires a REST hook', async () => { - await hub.db.postgresQuery(`UPDATE posthog_organization SET available_features = '{"zapier"}'`, [], 'testTag') - await insertRow(hub.db.postgres, 'ee_hook', { - id: 'abc', - team_id: 2, - user_id: commonUserId, - resource_id: 69, - event: 'action_performed', - target: 'https://rest-hooks.example.com/', - created: new Date().toISOString(), - updated: new Date().toISOString(), - } as Hook) - - const event: PluginEvent = { - event: 'xyz', - properties: { foo: 'bar' }, - timestamp: new Date().toISOString(), - now: new Date().toISOString(), - team_id: 2, - distinct_id: 'abc', - ip: null, - site_url: 'https://example.com', - uuid: new UUIDT().toString(), - } - - await ingestEvent(hub, event) - - const expectedPayload = { - hook: { - id: 'abc', - event: 'action_performed', - target: 'https://rest-hooks.example.com/', - }, - data: { - event: 'xyz', - properties: { - foo: 'bar', - }, - eventUuid: expect.any(String), - timestamp: expect.any(String), - teamId: 2, - distinctId: 'abc', - ip: null, - siteUrl: 'https://example.com', - elementsList: [], - person: { - id: expect.any(Number), - created_at: expect.any(String), - team_id: 2, - properties: {}, - uuid: expect.any(String), - }, - }, - } - - // Using a more verbose way instead of toHaveBeenCalledWith because we need to parse request body - // and use expect.any for a few payload properties, which wouldn't be possible in a simpler way - expect((fetch as MockedFunction).mock.calls[0][0]).toBe('https://rest-hooks.example.com/') - const secondArg = (fetch as MockedFunction).mock.calls[0][1] - expect(JSON.parse(secondArg!.body as unknown as string)).toStrictEqual(expectedPayload) - expect(JSON.parse(secondArg!.body as unknown as string)).toStrictEqual(expectedPayload) - expect(secondArg!.headers).toStrictEqual({ 'Content-Type': 'application/json' }) - expect(secondArg!.method).toBe('POST') - }) - describe('conversion buffer', () => { beforeEach(() => { hub.CONVERSION_BUFFER_ENABLED = true From 40a0aa79eeacd58f4a7bfffcb3d95217d2241240 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 17 May 2022 12:43:05 +0300 Subject: [PATCH 11/22] fix internal metrics --- plugin-server/src/utils/internal-metrics.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/src/utils/internal-metrics.ts b/plugin-server/src/utils/internal-metrics.ts index 90c3fab490d63..950f165db5c50 100644 --- a/plugin-server/src/utils/internal-metrics.ts +++ b/plugin-server/src/utils/internal-metrics.ts @@ -43,7 +43,7 @@ export class InternalMetrics { uuid: new UUIDT().toString(), } - promises.push(piscina.run({ task: 'ingestEvent', args: { event } })) + promises.push(piscina.run({ task: 'runEventPipeline', args: { event } })) } await Promise.all(promises) From 2cac16cc0c447f82a7dd18db304d17f6b68af1f9 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 17 May 2022 12:43:55 +0300 Subject: [PATCH 12/22] Refactor method location, WIP --- .../determineShouldBufferStep.ts | 45 +++++- .../src/worker/ingestion/ingest-event.ts | 30 ---- .../determineShouldBufferStep.test.ts | 151 +++++++++++++++--- 3 files changed, 163 insertions(+), 63 deletions(-) delete mode 100644 plugin-server/src/worker/ingestion/ingest-event.ts diff --git a/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts index 9b5a792369c06..6e1729a474eb6 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts @@ -1,20 +1,51 @@ -import { PreIngestionEvent } from '../../../types' -import { shouldSendEventToBuffer } from '../ingest-event' +import { DateTime } from 'luxon' + +import { Hub, Person, PreIngestionEvent, TeamId } from '../../../types' import { EventPipelineRunner, StepResult } from './runner' export async function determineShouldBufferStep( runner: EventPipelineRunner, - event: PreIngestionEvent + event: PreIngestionEvent, + shouldBuffer: ( + hub: Hub, + event: PreIngestionEvent, + person: Person | undefined, + teamId: TeamId + ) => boolean = shouldSendEventToBuffer ): Promise { const person = await runner.hub.db.fetchPerson(event.teamId, event.distinctId) - // even if the buffer is disabled we want to get metrics on how many events would have gone to it - const sendEventToBuffer = shouldSendEventToBuffer(runner.hub, event, person, event.teamId) - - if (sendEventToBuffer) { + if (shouldBuffer(runner.hub, event, person, event.teamId)) { await runner.hub.eventsProcessor.produceEventToBuffer(event) return null } else { return runner.nextStep('createEventStep', event, person) } } + +// context: https://github.com/PostHog/posthog/issues/9182 +// TL;DR: events from a recently created non-anonymous person are sent to a buffer +// because their person_id might change. We merge based on the person_id of the anonymous user +// so ingestion is delayed for those events to increase our chances of getting person_id correctly +export function shouldSendEventToBuffer( + hub: Hub, + event: PreIngestionEvent, + person: Person | undefined, + teamId: TeamId +): boolean { + const isAnonymousEvent = + event.properties && event.properties['$device_id'] && event.distinctId === event.properties['$device_id'] + const isRecentPerson = !person || DateTime.now().diff(person.created_at).seconds < hub.BUFFER_CONVERSION_SECONDS + const ingestEventDirectly = isAnonymousEvent || event.event === '$identify' || !isRecentPerson + const sendToBuffer = !ingestEventDirectly + + if (sendToBuffer) { + hub.statsd?.increment('conversion_events_buffer_size', { teamId: event.teamId.toString() }) + } + + if (!hub.CONVERSION_BUFFER_ENABLED && !hub.conversionBufferEnabledTeams.has(teamId)) { + return false + } + + return sendToBuffer +} diff --git a/plugin-server/src/worker/ingestion/ingest-event.ts b/plugin-server/src/worker/ingestion/ingest-event.ts deleted file mode 100644 index ef965fc90ca6d..0000000000000 --- a/plugin-server/src/worker/ingestion/ingest-event.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { DateTime } from 'luxon' - -import { Hub, Person, PreIngestionEvent, TeamId } from '../../types' - -// context: https://github.com/PostHog/posthog/issues/9182 -// TL;DR: events from a recently created non-anonymous person are sent to a buffer -// because their person_id might change. We merge based on the person_id of the anonymous user -// so ingestion is delayed for those events to increase our chances of getting person_id correctly -export function shouldSendEventToBuffer( - hub: Hub, - event: PreIngestionEvent, - person: Person | undefined, - teamId: TeamId -): boolean { - const isAnonymousEvent = - event.properties && event.properties['$device_id'] && event.distinctId === event.properties['$device_id'] - const isRecentPerson = !person || DateTime.now().diff(person.created_at).seconds < hub.BUFFER_CONVERSION_SECONDS - const ingestEventDirectly = isAnonymousEvent || event.event === '$identify' || !isRecentPerson - const sendToBuffer = !ingestEventDirectly - - if (sendToBuffer) { - hub.statsd?.increment('conversion_events_buffer_size', { teamId: event.teamId.toString() }) - } - - if (!hub.CONVERSION_BUFFER_ENABLED && !hub.conversionBufferEnabledTeams.has(teamId)) { - return false - } - - return sendToBuffer -} diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts index c7c2373fadc41..2af2fa8fba674 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts @@ -1,11 +1,12 @@ -import { PluginEvent } from '@posthog/plugin-scaffold' -import { mocked } from 'ts-jest/utils' +import { DateTime } from 'luxon' -import { PreIngestionEvent } from '../../../../src/types' -import { determineShouldBufferStep } from '../../../../src/worker/ingestion/event-pipeline/determineShouldBufferStep' -import { shouldSendEventToBuffer } from '../../../../src/worker/ingestion/ingest-event' +import { Person, PreIngestionEvent } from '../../../../src/types' +import { + determineShouldBufferStep, + shouldSendEventToBuffer, +} from '../../../../src/worker/ingestion/event-pipeline/determineShouldBufferStep' -jest.mock('../../../../src/worker/ingestion/ingest-event') +const now = DateTime.fromISO('2020-01-01T12:00:05.200Z') const preIngestionEvent: PreIngestionEvent = { eventUuid: 'uuid1', @@ -19,37 +20,135 @@ const preIngestionEvent: PreIngestionEvent = { elementsList: [], } -describe('determineShouldBufferStep()', () => { - let runner: any - const testPerson: any = { id: 'testid' } +const existingPerson: Person = { + id: 123, + team_id: 2, + properties: {}, + is_user_id: 0, + is_identified: true, + uuid: 'uuid', + properties_last_updated_at: {}, + properties_last_operation: {}, + created_at: now.minus({ days: 1 }), + version: 0, +} - beforeEach(() => { - runner = { - nextStep: (...args: any[]) => args, - hub: { - db: { fetchPerson: () => Promise.resolve(testPerson) }, - eventsProcessor: { - produceEventToBuffer: jest.fn(), - }, +let runner: any + +beforeEach(() => { + runner = { + nextStep: (...args: any[]) => args, + hub: { + CONVERSION_BUFFER_ENABLED: true, + BUFFER_CONVERSION_SECONDS: 60, + db: { fetchPerson: () => Promise.resolve(existingPerson) }, + eventsProcessor: { + produceEventToBuffer: jest.fn(), }, - } - }) + }, + } +}) +describe('determineShouldBufferStep()', () => { it('calls `produceEventToBuffer` if event should be buffered, stops processing', async () => { - mocked(shouldSendEventToBuffer).mockReturnValue(true) - - const response = await determineShouldBufferStep(runner, preIngestionEvent) + const response = await determineShouldBufferStep(runner, preIngestionEvent, () => true) expect(runner.hub.eventsProcessor.produceEventToBuffer).toHaveBeenCalledWith(preIngestionEvent) expect(response).toEqual(null) }) it('calls `createEventStep` next if not buffering', async () => { - mocked(shouldSendEventToBuffer).mockReturnValue(false) - - const response = await determineShouldBufferStep(runner, preIngestionEvent) + const response = await determineShouldBufferStep(runner, preIngestionEvent, () => false) - expect(response).toEqual(['createEventStep', preIngestionEvent, testPerson]) + expect(response).toEqual(['createEventStep', preIngestionEvent, existingPerson]) expect(runner.hub.eventsProcessor.produceEventToBuffer).not.toHaveBeenCalled() }) }) + +describe('shouldSendEventToBuffer()', () => { + beforeEach(() => { + jest.spyOn(DateTime, 'now').mockReturnValue(now) + }) + + it('returns false for an existing non-anonymous person', () => { + const result = shouldSendEventToBuffer(runner.hub, preIngestionEvent, existingPerson, 2) + expect(result).toEqual(false) + }) + + it('returns true for recently created person', () => { + const person = { + ...existingPerson, + created_at: now.minus({ seconds: 5 }), + } + + const result = shouldSendEventToBuffer(runner.hub, preIngestionEvent, person, 2) + expect(result).toEqual(true) + }) + + it('returns false for anonymous person', () => { + const anonEvent = { + ...preIngestionEvent, + distinctId: '$some_device_id', + properties: { $device_id: '$some_device_id' }, + } + + const result = shouldSendEventToBuffer(runner.hub, anonEvent, existingPerson, 2) + expect(result).toEqual(false) + }) + + it('returns false for recently created anonymous person', () => { + const anonEvent = { + ...preIngestionEvent, + distinctId: '$some_device_id', + properties: { $device_id: '$some_device_id' }, + } + + const person = { + ...existingPerson, + created_at: now.minus({ seconds: 5 }), + } + + const result = shouldSendEventToBuffer(runner.hub, anonEvent, person, 2) + expect(result).toEqual(false) + }) + + it('returns true for non-existing person', () => { + const result = shouldSendEventToBuffer(runner.hub, preIngestionEvent, undefined, 2) + expect(result).toEqual(true) + }) + + it('returns false for $identify events for non-existing users', () => { + const event = { + ...preIngestionEvent, + event: '$identify', + } + + const result = shouldSendEventToBuffer(runner.hub, event, undefined, 2) + expect(result).toEqual(false) + }) + + it('returns false for $identify events for new users', () => { + const event = { + ...preIngestionEvent, + event: '$identify', + } + const person = { + ...existingPerson, + created_at: now.minus({ seconds: 5 }), + } + + const result = shouldSendEventToBuffer(runner.hub, event, person, 2) + expect(result).toEqual(false) + }) + + it('handles CONVERSION_BUFFER_ENABLED and conversionBufferEnabledTeams', () => { + runner.hub.CONVERSION_BUFFER_ENABLED = false + runner.hub.conversionBufferEnabledTeams = new Set([2]) + + expect(shouldSendEventToBuffer(runner.hub, preIngestionEvent, undefined, 2)).toEqual(true) + expect(shouldSendEventToBuffer(runner.hub, preIngestionEvent, undefined, 3)).toEqual(false) + + runner.hub.CONVERSION_BUFFER_ENABLED = true + expect(shouldSendEventToBuffer(runner.hub, preIngestionEvent, undefined, 3)).toEqual(true) + }) +}) From 6766a5db0ebe760e8144316136b6b2992d535407 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Tue, 17 May 2022 14:17:14 +0300 Subject: [PATCH 13/22] Fix code determining if user is a recent person or not --- .../ingestion/event-pipeline/determineShouldBufferStep.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts index 6e1729a474eb6..49281eac765f3 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts @@ -35,7 +35,8 @@ export function shouldSendEventToBuffer( ): boolean { const isAnonymousEvent = event.properties && event.properties['$device_id'] && event.distinctId === event.properties['$device_id'] - const isRecentPerson = !person || DateTime.now().diff(person.created_at).seconds < hub.BUFFER_CONVERSION_SECONDS + const isRecentPerson = + !person || DateTime.now().diff(person.created_at).as('seconds') < hub.BUFFER_CONVERSION_SECONDS const ingestEventDirectly = isAnonymousEvent || event.event === '$identify' || !isRecentPerson const sendToBuffer = !ingestEventDirectly From bfbbc84a9ab48f7b8fb194aa9b098453ea43390c Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Wed, 18 May 2022 10:32:25 +0300 Subject: [PATCH 14/22] Update tests to deal with new pipeline --- .../benchmarks/postgres/helpers/piscina.ts | 2 +- .../benchmarks/vm/worker.benchmark.ts | 2 +- .../worker/ingestion/event-pipeline/runner.ts | 34 +++++--- plugin-server/src/worker/tasks.ts | 14 ++-- plugin-server/tests/postgres/teardown.test.ts | 17 +++- plugin-server/tests/postgres/worker.test.ts | 77 ++++++++++++------- plugin-server/tests/schedule.test.ts | 12 ++- 7 files changed, 106 insertions(+), 52 deletions(-) diff --git a/plugin-server/benchmarks/postgres/helpers/piscina.ts b/plugin-server/benchmarks/postgres/helpers/piscina.ts index 5b3dd73fabaad..746aff5ea0bf8 100644 --- a/plugin-server/benchmarks/postgres/helpers/piscina.ts +++ b/plugin-server/benchmarks/postgres/helpers/piscina.ts @@ -35,7 +35,7 @@ export function ingestOneEvent( export async function ingestCountEvents(piscina: ReturnType, count: number): Promise { const maxPromises = 500 const promises = Array(maxPromises) - const ingestEvent = (event: PluginEvent) => piscina.run({ task: 'ingestEvent', args: { event } }) + const ingestEvent = (event: PluginEvent) => piscina.run({ task: 'runEventPipeline', args: { event } }) const groups = Math.ceil(count / maxPromises) for (let j = 0; j < groups; j++) { diff --git a/plugin-server/benchmarks/vm/worker.benchmark.ts b/plugin-server/benchmarks/vm/worker.benchmark.ts index 01b9dc19ba554..76c30406950bd 100644 --- a/plugin-server/benchmarks/vm/worker.benchmark.ts +++ b/plugin-server/benchmarks/vm/worker.benchmark.ts @@ -30,7 +30,7 @@ function processOneEvent( async function processCountEvents(piscina: ReturnType, count: number, batchSize = 1) { const maxPromises = 1000 const promises = Array(maxPromises) - const processEvent = (event: PluginEvent) => piscina.run({ task: 'processEvent', args: { event } }) + const processEvent = (event: PluginEvent) => piscina.run({ task: '_testsRunProcessEvent', args: { event } }) const groups = Math.ceil((count * batchSize) / maxPromises) for (let j = 0; j < groups; j++) { diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index e66f610163a42..2fb0f47ef4616 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -38,6 +38,13 @@ export type StepResult = | NextStep<'createEventStep'> | NextStep<'runAsyncHandlersStep'> +// Only used in tests +export type EventPipelineResult = { + lastStep: StepType + args: any[] + error?: string +} + const STEPS_TO_EMIT_TO_DLQ_ON_FAILURE: Array = [ 'pluginsProcessEventStep', 'prepareEventStep', @@ -54,21 +61,23 @@ export class EventPipelineRunner { this.originalEvent = originalEvent } - async runMainPipeline(event: PluginEvent): Promise { - await this.runPipeline('pluginsProcessEventStep', event) + async runMainPipeline(event: PluginEvent): Promise { + const result = await this.runPipeline('pluginsProcessEventStep', event) this.hub.statsd?.increment('kafka_queue.single_event.processed_and_ingested') + return result } - async runBufferPipeline(event: PreIngestionEvent): Promise { + async runBufferPipeline(event: PreIngestionEvent): Promise { const person = await this.hub.db.fetchPerson(event.teamId, event.distinctId) - await this.runPipeline('createEventStep', event, person) + const result = await this.runPipeline('createEventStep', event, person) this.hub.statsd?.increment('kafka_queue.buffer_event.processed_and_ingested') + return result } private async runPipeline>( name: Step, ...args: ArgsType - ): Promise { + ): Promise { let currentStepName: StepType = name let currentArgs: any = args @@ -87,11 +96,18 @@ export class EventPipelineRunner { step: currentStepName, team_id: String(this.originalEvent?.team_id), }) - break + return { + lastStep: currentStepName, + args: currentArgs, + } + } + } catch (error) { + await this.handleError(error, currentStepName, currentArgs) + return { + lastStep: currentStepName, + args: currentArgs, + error: error.message, } - } catch (err) { - await this.handleError(err, currentStepName, currentArgs) - break } } } diff --git a/plugin-server/src/worker/tasks.ts b/plugin-server/src/worker/tasks.ts index dac2de6d1157c..95dbe8845ad14 100644 --- a/plugin-server/src/worker/tasks.ts +++ b/plugin-server/src/worker/tasks.ts @@ -1,9 +1,9 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' -import { convertToProcessedPluginEvent } from 'utils/event' -import { EventPipelineRunner } from 'worker/ingestion/event-pipeline/runner' import { Action, Alert, EnqueuedJob, Hub, PluginTaskType, PreIngestionEvent, Team } from '../types' -import { runHandleAlert, runPluginTask } from './plugins/run' +import { convertToProcessedPluginEvent } from '../utils/event' +import { EventPipelineRunner } from './ingestion/event-pipeline/runner' +import { runHandleAlert, runPluginTask, runProcessEvent } from './plugins/run' import { loadSchedule, setupPlugins } from './plugins/setup' import { teardownPlugins } from './plugins/teardown' @@ -30,11 +30,11 @@ export const workerTasks: Record = { }, runEventPipeline: async (hub, args: { event: PluginEvent }) => { const runner = new EventPipelineRunner(hub, args.event) - await runner.runMainPipeline(args.event) + return await runner.runMainPipeline(args.event) }, ingestBufferEvent: async (hub, args: { event: PreIngestionEvent }) => { const runner = new EventPipelineRunner(hub, convertToProcessedPluginEvent(args.event)) - await runner.runBufferPipeline(args.event) + return await runner.runBufferPipeline(args.event) }, reloadPlugins: async (hub) => { await setupPlugins(hub) @@ -60,4 +60,8 @@ export const workerTasks: Record = { enqueueJob: async (hub, { job }: { job: EnqueuedJob }) => { await hub.jobQueueManager.enqueue(job) }, + // Exported only for tests + _testsRunProcessEvent: async (hub, args: { event: PluginEvent }) => { + return runProcessEvent(hub, args.event) + }, } diff --git a/plugin-server/tests/postgres/teardown.test.ts b/plugin-server/tests/postgres/teardown.test.ts index 67d45bf4ad1ea..42442f24bb6c0 100644 --- a/plugin-server/tests/postgres/teardown.test.ts +++ b/plugin-server/tests/postgres/teardown.test.ts @@ -1,3 +1,5 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' + import { startPluginsServer } from '../../src/main/pluginsServer' import { LogLevel } from '../../src/types' import { delay } from '../../src/utils/utils' @@ -5,6 +7,7 @@ import { makePiscina } from '../../src/worker/piscina' import { pluginConfig39 } from '../helpers/plugins' import { getErrorForPluginConfig, resetTestDatabase } from '../helpers/sql' +jest.mock('@graphile/logger') jest.mock('../../src/utils/status') jest.setTimeout(60000) // 60 sec timeout @@ -19,6 +22,12 @@ const defaultEvent = { } describe('teardown', () => { + const processEvent = async (piscina: any, event: PluginEvent) => { + const result = await piscina.run({ task: 'runEventPipeline', args: { event } }) + const resultEvent = result.args[0] + return resultEvent + } + test('teardown code runs when stopping', async () => { await resetTestDatabase(` async function processEvent (event) { @@ -42,7 +51,7 @@ describe('teardown', () => { const error1 = await getErrorForPluginConfig(pluginConfig39.id) expect(error1).toBe(null) - await piscina!.run({ task: 'processEvent', args: { event: { ...defaultEvent } } }) + await processEvent(piscina, defaultEvent) await stop() @@ -111,13 +120,15 @@ describe('teardown', () => { [pluginConfig39.id], 'testTag' ) - const event1 = await piscina!.run({ task: 'processEvent', args: { event: { ...defaultEvent } } }) + const event1 = await processEvent(piscina, defaultEvent) + // const event1 = await piscina!.run({ task: 'runEventPipeline', args: { event: { ...defaultEvent } } }) expect(event1.properties.storage).toBe('nope') await piscina!.broadcastTask({ task: 'reloadPlugins' }) await delay(10000) - const event2 = await piscina!.run({ task: 'processEvent', args: { event: { ...defaultEvent } } }) + // const event2 = await piscina!.run({ task: 'runEventPipeline', args: { event: { ...defaultEvent } } }) + const event2 = await processEvent(piscina, defaultEvent) expect(event2.properties.storage).toBe('tore down') await stop() diff --git a/plugin-server/tests/postgres/worker.test.ts b/plugin-server/tests/postgres/worker.test.ts index 89565b660c788..3477d72533839 100644 --- a/plugin-server/tests/postgres/worker.test.ts +++ b/plugin-server/tests/postgres/worker.test.ts @@ -2,14 +2,14 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' import { mocked } from 'ts-jest/utils' import { loadPluginSchedule } from '../../src/main/services/schedule' -import { Hub } from '../../src/types' +import { Hub, PreIngestionEvent } from '../../src/types' import { createHub } from '../../src/utils/db/hub' import { KafkaProducerWrapper } from '../../src/utils/db/kafka-producer-wrapper' import { delay, UUIDT } from '../../src/utils/utils' import { ActionManager } from '../../src/worker/ingestion/action-manager' import { ActionMatcher } from '../../src/worker/ingestion/action-matcher' -import { ingestEvent } from '../../src/worker/ingestion/ingest-event' -import { runPluginTask, runProcessEvent } from '../../src/worker/plugins/run' +import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/runner' +import { runPluginTask } from '../../src/worker/plugins/run' import { loadSchedule, setupPlugins } from '../../src/worker/plugins/setup' import { teardownPlugins } from '../../src/worker/plugins/teardown' import { createTaskRunner } from '../../src/worker/worker' @@ -20,7 +20,6 @@ jest.mock('../../src/worker/ingestion/action-manager') jest.mock('../../src/worker/ingestion/action-matcher') jest.mock('../../src/utils/db/sql') jest.mock('../../src/utils/status') -jest.mock('../../src/worker/ingestion/ingest-event') jest.mock('../../src/worker/plugins/run') jest.mock('../../src/worker/plugins/setup') jest.mock('../../src/worker/plugins/teardown') @@ -35,6 +34,7 @@ function createEvent(index = 0): PluginEvent { now: new Date().toISOString(), event: 'default event', properties: { key: 'value', index }, + uuid: new UUIDT().toString(), } } @@ -61,24 +61,31 @@ describe('worker', () => { await resetTestDatabase(testCode) const piscina = setupPiscina(workerThreads, 10) - const processEvent = (event: PluginEvent) => piscina.run({ task: 'processEvent', args: { event } }) const runEveryDay = (pluginConfigId: number) => piscina.run({ task: 'runEveryDay', args: { pluginConfigId } }) - const ingestEvent = (event: PluginEvent) => piscina.run({ task: 'ingestEvent', args: { event } }) + const ingestEvent = async (event: PluginEvent) => { + const result = await piscina.run({ task: 'runEventPipeline', args: { event } }) + const resultEvent = result.args[0] + return { ...result, event: resultEvent } + } const pluginSchedule = await loadPluginSchedule(piscina) expect(pluginSchedule).toEqual({ runEveryDay: [39], runEveryHour: [], runEveryMinute: [] }) - const event = await processEvent(createEvent()) - expect(event.properties['somewhere']).toBe('over the rainbow') + const ingestResponse1 = await ingestEvent(createEvent()) + expect(ingestResponse1.event.properties['somewhere']).toBe('over the rainbow') const everyDayReturn = await runEveryDay(39) expect(everyDayReturn).toBe(4) - const ingestResponse1 = await ingestEvent(createEvent()) - expect(ingestResponse1).toEqual({ success: false, error: 'Not a valid UUID: "undefined"' }) + const ingestResponse2 = await ingestEvent(createEvent()) + expect(ingestResponse2).toEqual({ + lastStep: 'runAsyncHandlersStep', + args: expect.anything(), + event: expect.anything(), + }) - const ingestResponse2 = await ingestEvent({ ...createEvent(), uuid: new UUIDT().toString() }) - expect(ingestResponse2).toEqual({ success: true, actionMatches: [], preIngestionEvent: expect.anything() }) + const ingestResponse3 = await ingestEvent({ ...createEvent(), uuid: undefined }) + expect(ingestResponse3.error).toEqual('Not a valid UUID: "undefined"') await delay(2000) await piscina.destroy() @@ -95,7 +102,7 @@ describe('worker', () => { ` await resetTestDatabase(testCode) const piscina = setupPiscina(workerThreads, tasksPerWorker) - const processEvent = (event: PluginEvent) => piscina.run({ task: 'processEvent', args: { event } }) + const processEvent = (event: PluginEvent) => piscina.run({ task: '_testsRunProcessEvent', args: { event } }) const promises: Array> = [] // warmup 2x @@ -133,30 +140,42 @@ describe('worker', () => { await closeHub() }) - it('handles `processEvent` task', async () => { - mocked(runProcessEvent).mockReturnValue('runProcessEvent response' as any) - - expect(await taskRunner({ task: 'processEvent', args: { event: 'someEvent' } })).toEqual( - 'runProcessEvent response' - ) - - expect(runProcessEvent).toHaveBeenCalledWith(hub, 'someEvent') - }) - it('handles `getPluginSchedule` task', async () => { hub.pluginSchedule = { runEveryDay: [66] } expect(await taskRunner({ task: 'getPluginSchedule' })).toEqual(hub.pluginSchedule) }) - it('handles `ingestEvent` task', async () => { - mocked(ingestEvent).mockReturnValue('ingestEvent response' as any) + it('handles `runEventPipeline` tasks', async () => { + const spy = jest + .spyOn(EventPipelineRunner.prototype, 'runMainPipeline') + .mockResolvedValue('runMainPipeline result' as any) + const event = createEvent() - expect(await taskRunner({ task: 'ingestEvent', args: { event: 'someEvent' } })).toEqual( - 'ingestEvent response' - ) + expect(await taskRunner({ task: 'runEventPipeline', args: { event } })).toEqual('runMainPipeline result') + + expect(spy).toHaveBeenCalledWith(event) + }) - expect(ingestEvent).toHaveBeenCalledWith(hub, 'someEvent') + it('handles `ingestBufferEvent` tasks', async () => { + const spy = jest + .spyOn(EventPipelineRunner.prototype, 'runBufferPipeline') + .mockResolvedValue('runBufferPipeline result' as any) + const event: PreIngestionEvent = { + eventUuid: 'uuid1', + distinctId: 'my_id', + ip: '127.0.0.1', + siteUrl: 'example.com', + teamId: 2, + timestamp: '2020-02-23T02:15:00Z', + event: '$pageview', + properties: {}, + elementsList: [], + } + + expect(await taskRunner({ task: 'ingestBufferEvent', args: { event } })).toEqual('runBufferPipeline result') + + expect(spy).toHaveBeenCalledWith(event) }) it('handles `runEvery` tasks', async () => { diff --git a/plugin-server/tests/schedule.test.ts b/plugin-server/tests/schedule.test.ts index c8bc5c0e18db8..4053dcbfd1b9b 100644 --- a/plugin-server/tests/schedule.test.ts +++ b/plugin-server/tests/schedule.test.ts @@ -57,13 +57,17 @@ describe('schedule', () => { ` await resetTestDatabase(testCode) const piscina = setupPiscina(workerThreads, 10) - const processEvent = (event: PluginEvent) => piscina.run({ task: 'processEvent', args: { event } }) + const ingestEvent = async (event: PluginEvent) => { + const result = await piscina.run({ task: 'runEventPipeline', args: { event } }) + const resultEvent = result.args[0] + return resultEvent + } const [hub, closeHub] = await createHub({ LOG_LEVEL: LogLevel.Log }) hub.pluginSchedule = await loadPluginSchedule(piscina) expect(hub.pluginSchedule).toEqual({ runEveryDay: [], runEveryHour: [], runEveryMinute: [39] }) - const event1 = await processEvent(createEvent()) + const event1 = await ingestEvent(createEvent()) expect(event1.properties['counter']).toBe(0) runScheduleDebounced(hub, piscina, 'runEveryMinute') @@ -71,12 +75,12 @@ describe('schedule', () => { runScheduleDebounced(hub, piscina, 'runEveryMinute') await delay(100) - const event2 = await processEvent(createEvent()) + const event2 = await ingestEvent(createEvent()) expect(event2.properties['counter']).toBe(0) await delay(500) - const event3 = await processEvent(createEvent()) + const event3 = await ingestEvent(createEvent()) expect(event3.properties['counter']).toBe(1) await waitForTasksToFinish(hub) From c8a15371e819773fa256329b29f0853e74d9b7b6 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Wed, 18 May 2022 10:33:47 +0300 Subject: [PATCH 15/22] Rename methods for consistency --- .../src/main/ingestion-queues/kafka-queue.ts | 4 ++-- .../src/main/ingestion-queues/queue.ts | 6 +++--- plugin-server/src/types.ts | 2 +- .../worker/ingestion/event-pipeline/runner.ts | 4 ++-- plugin-server/src/worker/tasks.ts | 6 +++--- plugin-server/tests/postgres/worker.test.ts | 16 ++++++++------- .../__snapshots__/runner.test.ts.snap | 4 ++-- .../event-pipeline-integration.test.ts | 2 +- .../ingestion/event-pipeline/runner.test.ts | 20 +++++++++---------- 9 files changed, 33 insertions(+), 31 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/kafka-queue.ts b/plugin-server/src/main/ingestion-queues/kafka-queue.ts index 16b6a9928be7f..272c366d231d4 100644 --- a/plugin-server/src/main/ingestion-queues/kafka-queue.ts +++ b/plugin-server/src/main/ingestion-queues/kafka-queue.ts @@ -60,9 +60,9 @@ export class KafkaQueue implements Queue { await runInstrumentedFunction({ server: this.pluginsServer, event: bufferEvent, - func: (_) => this.workerMethods.ingestBufferEvent(bufferEvent), + func: (_) => this.workerMethods.runBufferEventPipeline(bufferEvent), statsKey: `kafka_queue.ingest_buffer_event`, - timeoutMessage: 'After 30 seconds still running ingestBufferEvent', + timeoutMessage: 'After 30 seconds still running runBufferEventPipeline', }) resolveOffset(message.offset) } diff --git a/plugin-server/src/main/ingestion-queues/queue.ts b/plugin-server/src/main/ingestion-queues/queue.ts index 13edf3951dd94..fc3a95fa78614 100644 --- a/plugin-server/src/main/ingestion-queues/queue.ts +++ b/plugin-server/src/main/ingestion-queues/queue.ts @@ -39,10 +39,10 @@ export async function startQueues( workerMethods: Partial = {} ): Promise { const mergedWorkerMethods = { - ingestBufferEvent: (event: PreIngestionEvent) => { + runBufferEventPipeline: (event: PreIngestionEvent) => { server.lastActivity = new Date().valueOf() - server.lastActivityType = 'ingestBufferEvent' - return piscina.run({ task: 'ingestBufferEvent', args: { event } }) + server.lastActivityType = 'runBufferEventPipeline' + return piscina.run({ task: 'runBufferEventPipeline', args: { event } }) }, runEventPipeline: (event: PluginEvent) => { server.lastActivity = new Date().valueOf() diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 56181a43ed4d7..92e27516bf2ca 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -375,7 +375,7 @@ export interface PluginTask { } export type WorkerMethods = { - ingestBufferEvent: (event: PreIngestionEvent) => Promise + runBufferEventPipeline: (event: PreIngestionEvent) => Promise runEventPipeline: (event: PluginEvent) => Promise } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 2fb0f47ef4616..6f9cd96066391 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -61,13 +61,13 @@ export class EventPipelineRunner { this.originalEvent = originalEvent } - async runMainPipeline(event: PluginEvent): Promise { + async runEventPipeline(event: PluginEvent): Promise { const result = await this.runPipeline('pluginsProcessEventStep', event) this.hub.statsd?.increment('kafka_queue.single_event.processed_and_ingested') return result } - async runBufferPipeline(event: PreIngestionEvent): Promise { + async runBufferEventPipeline(event: PreIngestionEvent): Promise { const person = await this.hub.db.fetchPerson(event.teamId, event.distinctId) const result = await this.runPipeline('createEventStep', event, person) this.hub.statsd?.increment('kafka_queue.buffer_event.processed_and_ingested') diff --git a/plugin-server/src/worker/tasks.ts b/plugin-server/src/worker/tasks.ts index 95dbe8845ad14..3bcfa9ea60637 100644 --- a/plugin-server/src/worker/tasks.ts +++ b/plugin-server/src/worker/tasks.ts @@ -30,11 +30,11 @@ export const workerTasks: Record = { }, runEventPipeline: async (hub, args: { event: PluginEvent }) => { const runner = new EventPipelineRunner(hub, args.event) - return await runner.runMainPipeline(args.event) + return await runner.runEventPipeline(args.event) }, - ingestBufferEvent: async (hub, args: { event: PreIngestionEvent }) => { + runBufferEventPipeline: async (hub, args: { event: PreIngestionEvent }) => { const runner = new EventPipelineRunner(hub, convertToProcessedPluginEvent(args.event)) - return await runner.runBufferPipeline(args.event) + return await runner.runBufferEventPipeline(args.event) }, reloadPlugins: async (hub) => { await setupPlugins(hub) diff --git a/plugin-server/tests/postgres/worker.test.ts b/plugin-server/tests/postgres/worker.test.ts index 3477d72533839..ee8919c97a2e6 100644 --- a/plugin-server/tests/postgres/worker.test.ts +++ b/plugin-server/tests/postgres/worker.test.ts @@ -148,19 +148,19 @@ describe('worker', () => { it('handles `runEventPipeline` tasks', async () => { const spy = jest - .spyOn(EventPipelineRunner.prototype, 'runMainPipeline') - .mockResolvedValue('runMainPipeline result' as any) + .spyOn(EventPipelineRunner.prototype, 'runEventPipeline') + .mockResolvedValue('runEventPipeline result' as any) const event = createEvent() - expect(await taskRunner({ task: 'runEventPipeline', args: { event } })).toEqual('runMainPipeline result') + expect(await taskRunner({ task: 'runEventPipeline', args: { event } })).toEqual('runEventPipeline result') expect(spy).toHaveBeenCalledWith(event) }) - it('handles `ingestBufferEvent` tasks', async () => { + it('handles `runBufferEventPipeline` tasks', async () => { const spy = jest - .spyOn(EventPipelineRunner.prototype, 'runBufferPipeline') - .mockResolvedValue('runBufferPipeline result' as any) + .spyOn(EventPipelineRunner.prototype, 'runBufferEventPipeline') + .mockResolvedValue('runBufferEventPipeline result' as any) const event: PreIngestionEvent = { eventUuid: 'uuid1', distinctId: 'my_id', @@ -173,7 +173,9 @@ describe('worker', () => { elementsList: [], } - expect(await taskRunner({ task: 'ingestBufferEvent', args: { event } })).toEqual('runBufferPipeline result') + expect(await taskRunner({ task: 'runBufferEventPipeline', args: { event } })).toEqual( + 'runBufferEventPipeline result' + ) expect(spy).toHaveBeenCalledWith(event) }) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap index c50ea4fb3cd1b..d9d39f3fbf012 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap +++ b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap @@ -1,6 +1,6 @@ // Jest Snapshot v1, https://goo.gl/fbAQLP -exports[`EventPipelineRunner runBufferPipeline() runs remaining steps 1`] = ` +exports[`EventPipelineRunner runBufferEventPipeline() runs remaining steps 1`] = ` Array [ Array [ "createEventStep", @@ -38,7 +38,7 @@ Array [ ] `; -exports[`EventPipelineRunner runMainPipeline() runs all steps 1`] = ` +exports[`EventPipelineRunner runEventPipeline() runs all steps 1`] = ` Array [ Array [ "pluginsProcessEventStep", diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts index 43aa3ceb01b17..93ef14a7f7906 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts @@ -15,7 +15,7 @@ describe('Event Pipeline integration test', () => { let closeServer: () => Promise let actionMatcher: ActionMatcher - const ingestEvent = (event: PluginEvent) => new EventPipelineRunner(hub, event).runMainPipeline(event) + const ingestEvent = (event: PluginEvent) => new EventPipelineRunner(hub, event).runEventPipeline(event) beforeEach(async () => { await resetTestDatabase() diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index 2e3cb5bc247e7..ccdcdfcb6e1ad 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -85,9 +85,9 @@ describe('EventPipelineRunner', () => { mocked(runAsyncHandlersStep).mockResolvedValue(null) }) - describe('runMainPipeline()', () => { + describe('runEventPipeline()', () => { it('runs all steps', async () => { - await runner.runMainPipeline(pluginEvent) + await runner.runEventPipeline(pluginEvent) expect(runner.steps).toEqual([ 'pluginsProcessEventStep', @@ -100,7 +100,7 @@ describe('EventPipelineRunner', () => { }) it('emits metrics for every step', async () => { - await runner.runMainPipeline(pluginEvent) + await runner.runEventPipeline(pluginEvent) expect(hub.statsd.timing).toHaveBeenCalledTimes(5) expect(hub.statsd.increment).toBeCalledTimes(7) @@ -121,13 +121,13 @@ describe('EventPipelineRunner', () => { }) it('stops processing after step', async () => { - await runner.runMainPipeline(pluginEvent) + await runner.runEventPipeline(pluginEvent) expect(runner.steps).toEqual(['pluginsProcessEventStep', 'prepareEventStep']) }) it('reports metrics and last step correctly', async () => { - await runner.runMainPipeline(pluginEvent) + await runner.runEventPipeline(pluginEvent) expect(hub.statsd.timing).toHaveBeenCalledTimes(2) expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step.last', { @@ -144,7 +144,7 @@ describe('EventPipelineRunner', () => { it('runs and increments metrics', async () => { mocked(prepareEventStep).mockRejectedValue(error) - await runner.runMainPipeline(pluginEvent) + await runner.runEventPipeline(pluginEvent) expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step', { step: 'pluginsProcessEventStep', @@ -162,7 +162,7 @@ describe('EventPipelineRunner', () => { mocked(generateEventDeadLetterQueueMessage).mockReturnValue('DLQ event' as any) mocked(prepareEventStep).mockRejectedValue(error) - await runner.runMainPipeline(pluginEvent) + await runner.runEventPipeline(pluginEvent) expect(hub.db.kafkaProducer.queueMessage).toHaveBeenCalledWith('DLQ event' as any) expect(hub.statsd.increment).toHaveBeenCalledWith('events_added_to_dead_letter_queue') @@ -171,7 +171,7 @@ describe('EventPipelineRunner', () => { it('does not emit to dead letter queue for runAsyncHandlersStep', async () => { mocked(runAsyncHandlersStep).mockRejectedValue(error) - await runner.runMainPipeline(pluginEvent) + await runner.runEventPipeline(pluginEvent) expect(hub.db.kafkaProducer.queueMessage).not.toHaveBeenCalled() expect(hub.statsd.increment).not.toHaveBeenCalledWith('events_added_to_dead_letter_queue') @@ -179,9 +179,9 @@ describe('EventPipelineRunner', () => { }) }) - describe('runBufferPipeline()', () => { + describe('runBufferEventPipeline()', () => { it('runs remaining steps', async () => { - await runner.runBufferPipeline(preIngestionEvent) + await runner.runBufferEventPipeline(preIngestionEvent) expect(runner.steps).toEqual(['createEventStep', 'runAsyncHandlersStep']) expect(runner.stepsWithArgs).toMatchSnapshot() From f3b81e9a707ef449020790c056ab9052f083c5c1 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Wed, 18 May 2022 10:43:04 +0300 Subject: [PATCH 16/22] Remove now-dead test --- .../worker/ingestion/ingest-event.test.ts | 104 ------------------ 1 file changed, 104 deletions(-) delete mode 100644 plugin-server/tests/worker/ingestion/ingest-event.test.ts diff --git a/plugin-server/tests/worker/ingestion/ingest-event.test.ts b/plugin-server/tests/worker/ingestion/ingest-event.test.ts deleted file mode 100644 index 5ff298e0b6b61..0000000000000 --- a/plugin-server/tests/worker/ingestion/ingest-event.test.ts +++ /dev/null @@ -1,104 +0,0 @@ -import { PluginEvent } from '@posthog/plugin-scaffold/src/types' -import fetch from 'node-fetch' -import { MockedFunction } from 'ts-jest/dist/utils/testing' - -import { Hook, Hub } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' -import { UUIDT } from '../../../src/utils/utils' -import { ActionMatcher } from '../../../src/worker/ingestion/action-matcher' -import { ingestEvent } from '../../../src/worker/ingestion/ingest-event' -import { commonUserId } from '../../helpers/plugins' -import { insertRow, resetTestDatabase } from '../../helpers/sql' - -describe('ingestEvent', () => { - let hub: Hub - let closeServer: () => Promise - let actionMatcher: ActionMatcher - let actionCounter: number - - beforeEach(async () => { - await resetTestDatabase() - ;[hub, closeServer] = await createHub() - actionMatcher = hub.actionMatcher - actionCounter = 0 - }) - - afterEach(async () => { - await closeServer() - }) - - describe('conversion buffer', () => { - beforeEach(() => { - hub.CONVERSION_BUFFER_ENABLED = true - }) - - afterEach(() => { - hub.CONVERSION_BUFFER_ENABLED = false - }) - - afterAll(() => { - jest.clearAllMocks() - }) - - it('events from recently created persons are sent to the buffer', async () => { - hub.eventsProcessor.produceEventToBuffer = jest.fn() - - // will create a new person - const event: PluginEvent = { - event: 'xyz', - properties: { foo: 'bar' }, - timestamp: new Date().toISOString(), - now: new Date().toISOString(), - team_id: 2, - distinct_id: 'abc', - ip: null, - site_url: 'https://example.com', - uuid: new UUIDT().toString(), - } - - await ingestEvent(hub, event) - - expect(hub.eventsProcessor.produceEventToBuffer).toHaveBeenCalled() - }) - - it('anonymous events are not sent to the buffer', async () => { - hub.eventsProcessor.produceEventToBuffer = jest.fn() - - const event: PluginEvent = { - event: 'xyz', - properties: { foo: 'bar', $device_id: 'anonymous' }, - timestamp: new Date().toISOString(), - now: new Date().toISOString(), - team_id: 2, - distinct_id: 'anonymous', - ip: null, - site_url: 'https://example.com', - uuid: new UUIDT().toString(), - } - - await ingestEvent(hub, event) - - expect(hub.eventsProcessor.produceEventToBuffer).not.toHaveBeenCalled() - }) - }) - - it('$identify events are not sent to the buffer', async () => { - hub.eventsProcessor.produceEventToBuffer = jest.fn() - - const event: PluginEvent = { - event: '$identify', - properties: { foo: 'bar' }, - timestamp: new Date().toISOString(), - now: new Date().toISOString(), - team_id: 2, - distinct_id: 'foo', - ip: null, - site_url: 'https://example.com', - uuid: new UUIDT().toString(), - } - - await ingestEvent(hub, event) - - expect(hub.eventsProcessor.produceEventToBuffer).not.toHaveBeenCalled() - }) -}) From 1bc00f5827cb56d771645768290874136bce8bc9 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Wed, 18 May 2022 10:54:19 +0300 Subject: [PATCH 17/22] Update process-event.test.ts --- plugin-server/tests/shared/process-event.test.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/plugin-server/tests/shared/process-event.test.ts b/plugin-server/tests/shared/process-event.test.ts index 760c2ecab6421..41ef44ab1401d 100644 --- a/plugin-server/tests/shared/process-event.test.ts +++ b/plugin-server/tests/shared/process-event.test.ts @@ -20,7 +20,7 @@ import { createHub } from '../../src/utils/db/hub' import { hashElements } from '../../src/utils/db/utils' import { posthog } from '../../src/utils/posthog' import { UUIDT } from '../../src/utils/utils' -import { ingestEvent } from '../../src/worker/ingestion/ingest-event' +import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/runner' import { EventsProcessor } from '../../src/worker/ingestion/process-event' import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../helpers/clickhouse' import { resetKafka } from '../helpers/kafka' @@ -177,7 +177,7 @@ afterEach(async () => { }) const capture = async (hub: Hub, eventName: string, properties: any = {}) => { - await ingestEvent(hub, { + const event = { event: eventName, distinct_id: properties.distinct_id ?? state.currentDistinctId, properties: properties, @@ -187,7 +187,9 @@ const capture = async (hub: Hub, eventName: string, properties: any = {}) => { site_url: 'https://posthog.com', team_id: team.id, uuid: new UUIDT().toString(), - }) + } + const runner = new EventPipelineRunner(hub, event) + await runner.runEventPipeline(event) await delayUntilEventIngested(() => hub.db.fetchEvents(), ++mockClientEventCounter) } From a29ab5fd42b41edd327dd08693529cca9f5daab2 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Wed, 18 May 2022 11:39:16 +0300 Subject: [PATCH 18/22] Update DLQ test --- plugin-server/tests/clickhouse/dead-letter-queue.test.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/plugin-server/tests/clickhouse/dead-letter-queue.test.ts b/plugin-server/tests/clickhouse/dead-letter-queue.test.ts index 3522c3d3c9ad2..3dc66719ddc30 100644 --- a/plugin-server/tests/clickhouse/dead-letter-queue.test.ts +++ b/plugin-server/tests/clickhouse/dead-letter-queue.test.ts @@ -64,8 +64,12 @@ describe('events dead letter queue', () => { }) test('events get sent to dead letter queue on error', async () => { - const ingestResponse1 = await workerTasks.ingestEvent(hub, { event: createEvent() }) - expect(ingestResponse1).toEqual({ success: false, error: 'database unavailable' }) + const ingestResponse1 = await workerTasks.runEventPipeline(hub, { event: createEvent() }) + expect(ingestResponse1).toEqual({ + lastStep: 'prepareEventStep', + error: 'database unavailable', + args: expect.anything(), + }) expect(generateEventDeadLetterQueueMessage).toHaveBeenCalled() await delayUntilEventIngested(() => hub.db.fetchDeadLetterQueueEvents(), 1) From 1752b6f883b2ab12df8619951a09c179bcb9583c Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Wed, 18 May 2022 12:01:54 +0300 Subject: [PATCH 19/22] Ignore test under yeet --- plugin-server/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/package.json b/plugin-server/package.json index 96af4a91dce69..a5517d925e1ba 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -6,7 +6,7 @@ "main": "dist/index.js", "scripts": { "test": "jest --runInBand --forceExit tests/**/*.test.ts", - "test:postgres:1": "jest --runInBand --forceExit tests/ --testPathIgnorePatterns tests/clickhouse/ --testPathIgnorePatterns tests/postgres/ --testPathIgnorePatterns tests/shared/process-event.test.ts", + "test:postgres:1": "jest --runInBand --forceExit tests/ --testPathIgnorePatterns tests/clickhouse/ --testPathIgnorePatterns tests/postgres/ --testPathIgnorePatterns tests/shared/process-event.test.ts --testPathIgnorePatterns tests/worker/ingestion/event-pipeline", "test:postgres:2": "jest --runInBand --forceExit tests/postgres/", "test:clickhouse:1": "jest --runInBand --forceExit tests/clickhouse/", "test:clickhouse:2": "jest --runInBand --forceExit tests/shared/process-event.test.ts", From 464f17b74715c90d37d684c004627324dfb8d178 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Fri, 20 May 2022 10:19:41 +0300 Subject: [PATCH 20/22] Remove mocked --- .../pluginsProcessEventStep.test.ts | 5 ++--- .../ingestion/event-pipeline/runner.test.ts | 19 +++++++++---------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts index 2f7acce893656..2750ff0b6a460 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts @@ -1,5 +1,4 @@ import { PluginEvent } from '@posthog/plugin-scaffold' -import { mocked } from 'ts-jest/utils' import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep' import { runProcessEvent } from '../../../../src/worker/plugins/run' @@ -35,7 +34,7 @@ describe('pluginsProcessEventStep()', () => { it('forwards processed plugin event to `prepareEventStep`', async () => { const processedEvent = { ...pluginEvent, event: 'processed' } - mocked(runProcessEvent).mockResolvedValue(processedEvent) + jest.mocked(runProcessEvent).mockResolvedValue(processedEvent) const response = await pluginsProcessEventStep(runner, pluginEvent) @@ -52,7 +51,7 @@ describe('pluginsProcessEventStep()', () => { }) it('does not forward but counts dropped events by plugins', async () => { - mocked(runProcessEvent).mockResolvedValue(null) + jest.mocked(runProcessEvent).mockResolvedValue(null) const response = await pluginsProcessEventStep(runner, pluginEvent) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index ccdcdfcb6e1ad..5845072158057 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -1,5 +1,4 @@ import { PluginEvent } from '@posthog/plugin-scaffold' -import { mocked } from 'ts-jest/utils' import { PreIngestionEvent } from '../../../../src/types' import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/createEventStep' @@ -78,11 +77,11 @@ describe('EventPipelineRunner', () => { } runner = new TestEventPipelineRunner(hub, pluginEvent) - mocked(pluginsProcessEventStep).mockResolvedValue(['prepareEventStep', [pluginEvent]]) - mocked(prepareEventStep).mockResolvedValue(['determineShouldBufferStep', [preIngestionEvent]]) - mocked(determineShouldBufferStep).mockResolvedValue(['createEventStep', [preIngestionEvent]]) - mocked(createEventStep).mockResolvedValue(['runAsyncHandlersStep', [preIngestionEvent]]) - mocked(runAsyncHandlersStep).mockResolvedValue(null) + jest.mocked(pluginsProcessEventStep).mockResolvedValue(['prepareEventStep', [pluginEvent]]) + jest.mocked(prepareEventStep).mockResolvedValue(['determineShouldBufferStep', [preIngestionEvent]]) + jest.mocked(determineShouldBufferStep).mockResolvedValue(['createEventStep', [preIngestionEvent]]) + jest.mocked(createEventStep).mockResolvedValue(['runAsyncHandlersStep', [preIngestionEvent]]) + jest.mocked(runAsyncHandlersStep).mockResolvedValue(null) }) describe('runEventPipeline()', () => { @@ -142,7 +141,7 @@ describe('EventPipelineRunner', () => { const error = new Error('testError') it('runs and increments metrics', async () => { - mocked(prepareEventStep).mockRejectedValue(error) + jest.mocked(prepareEventStep).mockRejectedValue(error) await runner.runEventPipeline(pluginEvent) @@ -159,8 +158,8 @@ describe('EventPipelineRunner', () => { }) it('emits failures to dead letter queue until createEvent', async () => { - mocked(generateEventDeadLetterQueueMessage).mockReturnValue('DLQ event' as any) - mocked(prepareEventStep).mockRejectedValue(error) + jest.mocked(generateEventDeadLetterQueueMessage).mockReturnValue('DLQ event' as any) + jest.mocked(prepareEventStep).mockRejectedValue(error) await runner.runEventPipeline(pluginEvent) @@ -169,7 +168,7 @@ describe('EventPipelineRunner', () => { }) it('does not emit to dead letter queue for runAsyncHandlersStep', async () => { - mocked(runAsyncHandlersStep).mockRejectedValue(error) + jest.mocked(runAsyncHandlersStep).mockRejectedValue(error) await runner.runEventPipeline(pluginEvent) From e679d72c7ad02f7898e8cfd053dfeff0640c7cf1 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Fri, 20 May 2022 10:22:38 +0300 Subject: [PATCH 21/22] Remove dead code --- plugin-server/tests/postgres/teardown.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/plugin-server/tests/postgres/teardown.test.ts b/plugin-server/tests/postgres/teardown.test.ts index 42442f24bb6c0..ce8f28a258f06 100644 --- a/plugin-server/tests/postgres/teardown.test.ts +++ b/plugin-server/tests/postgres/teardown.test.ts @@ -121,7 +121,6 @@ describe('teardown', () => { 'testTag' ) const event1 = await processEvent(piscina, defaultEvent) - // const event1 = await piscina!.run({ task: 'runEventPipeline', args: { event: { ...defaultEvent } } }) expect(event1.properties.storage).toBe('nope') await piscina!.broadcastTask({ task: 'reloadPlugins' }) From 5b298422a9e810d6ad0f1543d9737667ddd63212 Mon Sep 17 00:00:00 2001 From: Karl-Aksel Puulmann Date: Fri, 20 May 2022 10:25:46 +0300 Subject: [PATCH 22/22] Update naming --- ...etermineShouldBufferStep.ts => emitToBufferStep.ts} | 2 +- .../ingestion/event-pipeline/prepareEventStep.ts | 2 +- .../src/worker/ingestion/event-pipeline/runner.ts | 8 ++++---- .../event-pipeline/__snapshots__/runner.test.ts.snap | 2 +- ...ouldBufferStep.test.ts => emitToBufferStep.test.ts} | 10 +++++----- .../ingestion/event-pipeline/prepareEventStep.test.ts | 4 ++-- .../worker/ingestion/event-pipeline/runner.test.ts | 10 +++++----- 7 files changed, 19 insertions(+), 19 deletions(-) rename plugin-server/src/worker/ingestion/event-pipeline/{determineShouldBufferStep.ts => emitToBufferStep.ts} (97%) rename plugin-server/tests/worker/ingestion/event-pipeline/{determineShouldBufferStep.test.ts => emitToBufferStep.test.ts} (92%) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/emitToBufferStep.ts similarity index 97% rename from plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts rename to plugin-server/src/worker/ingestion/event-pipeline/emitToBufferStep.ts index 49281eac765f3..0f2f9f721a164 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/determineShouldBufferStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/emitToBufferStep.ts @@ -3,7 +3,7 @@ import { DateTime } from 'luxon' import { Hub, Person, PreIngestionEvent, TeamId } from '../../../types' import { EventPipelineRunner, StepResult } from './runner' -export async function determineShouldBufferStep( +export async function emitToBufferStep( runner: EventPipelineRunner, event: PreIngestionEvent, shouldBuffer: ( diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index 08930265cf422..03c5fe05f7266 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -18,7 +18,7 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi ) if (preIngestionEvent && preIngestionEvent.event !== '$snapshot') { - return runner.nextStep('determineShouldBufferStep', preIngestionEvent) + return runner.nextStep('emitToBufferStep', preIngestionEvent) } else if (preIngestionEvent && preIngestionEvent.event === '$snapshot') { return runner.nextStep('runAsyncHandlersStep', preIngestionEvent, undefined, undefined) } else { diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 6f9cd96066391..c7ee01520c809 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -6,7 +6,7 @@ import { timeoutGuard } from '../../../utils/db/utils' import { status } from '../../../utils/status' import { generateEventDeadLetterQueueMessage } from '../utils' import { createEventStep } from './createEventStep' -import { determineShouldBufferStep } from './determineShouldBufferStep' +import { emitToBufferStep } from './emitToBufferStep' import { pluginsProcessEventStep } from './pluginsProcessEventStep' import { prepareEventStep } from './prepareEventStep' import { runAsyncHandlersStep } from './runAsyncHandlersStep' @@ -21,7 +21,7 @@ export type StepParameters any> = T extends ( const EVENT_PIPELINE_STEPS = { pluginsProcessEventStep, prepareEventStep, - determineShouldBufferStep, + emitToBufferStep, createEventStep, runAsyncHandlersStep, } @@ -34,7 +34,7 @@ export type StepResult = | null | NextStep<'pluginsProcessEventStep'> | NextStep<'prepareEventStep'> - | NextStep<'determineShouldBufferStep'> + | NextStep<'emitToBufferStep'> | NextStep<'createEventStep'> | NextStep<'runAsyncHandlersStep'> @@ -48,7 +48,7 @@ export type EventPipelineResult = { const STEPS_TO_EMIT_TO_DLQ_ON_FAILURE: Array = [ 'pluginsProcessEventStep', 'prepareEventStep', - 'determineShouldBufferStep', + 'emitToBufferStep', 'createEventStep', ] diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap index d9d39f3fbf012..cfb473bc29cc0 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap +++ b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap @@ -71,7 +71,7 @@ Array [ ], ], Array [ - "determineShouldBufferStep", + "emitToBufferStep", Array [ Object { "distinctId": "my_id", diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/emitToBufferStep.test.ts similarity index 92% rename from plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts rename to plugin-server/tests/worker/ingestion/event-pipeline/emitToBufferStep.test.ts index 2af2fa8fba674..81d69c2f207cc 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/determineShouldBufferStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/emitToBufferStep.test.ts @@ -2,9 +2,9 @@ import { DateTime } from 'luxon' import { Person, PreIngestionEvent } from '../../../../src/types' import { - determineShouldBufferStep, + emitToBufferStep, shouldSendEventToBuffer, -} from '../../../../src/worker/ingestion/event-pipeline/determineShouldBufferStep' +} from '../../../../src/worker/ingestion/event-pipeline/emitToBufferStep' const now = DateTime.fromISO('2020-01-01T12:00:05.200Z') @@ -49,16 +49,16 @@ beforeEach(() => { } }) -describe('determineShouldBufferStep()', () => { +describe('emitToBufferStep()', () => { it('calls `produceEventToBuffer` if event should be buffered, stops processing', async () => { - const response = await determineShouldBufferStep(runner, preIngestionEvent, () => true) + const response = await emitToBufferStep(runner, preIngestionEvent, () => true) expect(runner.hub.eventsProcessor.produceEventToBuffer).toHaveBeenCalledWith(preIngestionEvent) expect(response).toEqual(null) }) it('calls `createEventStep` next if not buffering', async () => { - const response = await determineShouldBufferStep(runner, preIngestionEvent, () => false) + const response = await emitToBufferStep(runner, preIngestionEvent, () => false) expect(response).toEqual(['createEventStep', preIngestionEvent, existingPerson]) expect(runner.hub.eventsProcessor.produceEventToBuffer).not.toHaveBeenCalled() diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts index 271ba847c5cd6..17ee5c1cccf64 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts @@ -54,11 +54,11 @@ describe('prepareEventStep()', () => { await closeHub() }) - it('goes to `determineShouldBufferStep` for normal events', async () => { + it('goes to `emitToBufferStep` for normal events', async () => { const response = await prepareEventStep(runner, pluginEvent) expect(response).toEqual([ - 'determineShouldBufferStep', + 'emitToBufferStep', { distinctId: 'my_id', elementsList: [], diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index 5845072158057..fa2115ca21a76 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -2,7 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { PreIngestionEvent } from '../../../../src/types' import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/createEventStep' -import { determineShouldBufferStep } from '../../../../src/worker/ingestion/event-pipeline/determineShouldBufferStep' +import { emitToBufferStep } from '../../../../src/worker/ingestion/event-pipeline/emitToBufferStep' import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep' import { prepareEventStep } from '../../../../src/worker/ingestion/event-pipeline/prepareEventStep' import { runAsyncHandlersStep } from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' @@ -17,7 +17,7 @@ import { generateEventDeadLetterQueueMessage } from '../../../../src/worker/inge jest.mock('../../../../src/utils/status') jest.mock('../../../../src/worker/ingestion/event-pipeline/createEventStep') -jest.mock('../../../../src/worker/ingestion/event-pipeline/determineShouldBufferStep') +jest.mock('../../../../src/worker/ingestion/event-pipeline/emitToBufferStep') jest.mock('../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep') jest.mock('../../../../src/worker/ingestion/event-pipeline/prepareEventStep') jest.mock('../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep') @@ -78,8 +78,8 @@ describe('EventPipelineRunner', () => { runner = new TestEventPipelineRunner(hub, pluginEvent) jest.mocked(pluginsProcessEventStep).mockResolvedValue(['prepareEventStep', [pluginEvent]]) - jest.mocked(prepareEventStep).mockResolvedValue(['determineShouldBufferStep', [preIngestionEvent]]) - jest.mocked(determineShouldBufferStep).mockResolvedValue(['createEventStep', [preIngestionEvent]]) + jest.mocked(prepareEventStep).mockResolvedValue(['emitToBufferStep', [preIngestionEvent]]) + jest.mocked(emitToBufferStep).mockResolvedValue(['createEventStep', [preIngestionEvent]]) jest.mocked(createEventStep).mockResolvedValue(['runAsyncHandlersStep', [preIngestionEvent]]) jest.mocked(runAsyncHandlersStep).mockResolvedValue(null) }) @@ -91,7 +91,7 @@ describe('EventPipelineRunner', () => { expect(runner.steps).toEqual([ 'pluginsProcessEventStep', 'prepareEventStep', - 'determineShouldBufferStep', + 'emitToBufferStep', 'createEventStep', 'runAsyncHandlersStep', ])