Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(plugin-server): refactor the event pipeline #9829

Merged
merged 23 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugin-server/benchmarks/postgres/helpers/piscina.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export function ingestOneEvent(
export async function ingestCountEvents(piscina: ReturnType<typeof makePiscina>, count: number): Promise<void> {
const maxPromises = 500
const promises = Array(maxPromises)
const ingestEvent = (event: PluginEvent) => piscina.run({ task: 'ingestEvent', args: { event } })
const ingestEvent = (event: PluginEvent) => piscina.run({ task: 'runEventPipeline', args: { event } })

const groups = Math.ceil(count / maxPromises)
for (let j = 0; j < groups; j++) {
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/benchmarks/vm/worker.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function processOneEvent(
async function processCountEvents(piscina: ReturnType<typeof makePiscina>, count: number, batchSize = 1) {
const maxPromises = 1000
const promises = Array(maxPromises)
const processEvent = (event: PluginEvent) => piscina.run({ task: 'processEvent', args: { event } })
const processEvent = (event: PluginEvent) => piscina.run({ task: '_testsRunProcessEvent', args: { event } })

const groups = Math.ceil((count * batchSize) / maxPromises)
for (let j = 0; j < groups; j++) {
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"main": "dist/index.js",
"scripts": {
"test": "jest --runInBand --forceExit tests/**/*.test.ts",
"test:postgres:1": "jest --runInBand --forceExit tests/ --testPathIgnorePatterns tests/clickhouse/ --testPathIgnorePatterns tests/postgres/ --testPathIgnorePatterns tests/shared/process-event.test.ts",
"test:postgres:1": "jest --runInBand --forceExit tests/ --testPathIgnorePatterns tests/clickhouse/ --testPathIgnorePatterns tests/postgres/ --testPathIgnorePatterns tests/shared/process-event.test.ts --testPathIgnorePatterns tests/worker/ingestion/event-pipeline",
"test:postgres:2": "jest --runInBand --forceExit tests/postgres/",
"test:clickhouse:1": "jest --runInBand --forceExit tests/clickhouse/",
"test:clickhouse:2": "jest --runInBand --forceExit tests/shared/process-event.test.ts",
Expand Down
4 changes: 2 additions & 2 deletions plugin-server/src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ export class KafkaQueue implements Queue {
await runInstrumentedFunction({
server: this.pluginsServer,
event: bufferEvent,
func: (_) => this.workerMethods.ingestBufferEvent(bufferEvent),
func: (_) => this.workerMethods.runBufferEventPipeline(bufferEvent),
statsKey: `kafka_queue.ingest_buffer_event`,
timeoutMessage: 'After 30 seconds still running ingestBufferEvent',
timeoutMessage: 'After 30 seconds still running runBufferEventPipeline',
})
resolveOffset(message.offset)
}
Expand Down
31 changes: 3 additions & 28 deletions plugin-server/src/main/ingestion-queues/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,35 +39,10 @@ export async function startQueues(
workerMethods: Partial<WorkerMethods> = {}
): Promise<Queues> {
const mergedWorkerMethods = {
onEvent: (event: ProcessedPluginEvent) => {
runBufferEventPipeline: (event: PreIngestionEvent) => {
server.lastActivity = new Date().valueOf()
server.lastActivityType = 'onEvent'
return piscina.run({ task: 'onEvent', args: { event } })
},
onAction: (action: Action, event: ProcessedPluginEvent) => {
server.lastActivity = new Date().valueOf()
server.lastActivityType = 'onAction'
return piscina.run({ task: 'onAction', args: { event, action } })
},
onSnapshot: (event: ProcessedPluginEvent) => {
server.lastActivity = new Date().valueOf()
server.lastActivityType = 'onSnapshot'
return piscina.run({ task: 'onSnapshot', args: { event } })
},
processEvent: (event: PluginEvent) => {
server.lastActivity = new Date().valueOf()
server.lastActivityType = 'processEvent'
return piscina.run({ task: 'processEvent', args: { event } })
},
ingestEvent: (event: PluginEvent) => {
server.lastActivity = new Date().valueOf()
server.lastActivityType = 'ingestEvent'
return piscina.run({ task: 'ingestEvent', args: { event } })
},
ingestBufferEvent: (event: PreIngestionEvent) => {
server.lastActivity = new Date().valueOf()
server.lastActivityType = 'ingestBufferEvent'
return piscina.run({ task: 'ingestBufferEvent', args: { event } })
server.lastActivityType = 'runBufferEventPipeline'
return piscina.run({ task: 'runBufferEventPipeline', args: { event } })
},
runEventPipeline: (event: PluginEvent) => {
server.lastActivity = new Date().valueOf()
Expand Down
7 changes: 1 addition & 6 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,7 @@ export interface PluginTask {
}

export type WorkerMethods = {
onEvent: (event: ProcessedPluginEvent) => Promise<void>
onAction: (action: Action, event: ProcessedPluginEvent) => Promise<void>
onSnapshot: (event: ProcessedPluginEvent) => Promise<void>
processEvent: (event: PluginEvent) => Promise<PluginEvent | null>
ingestEvent: (event: PluginEvent) => Promise<IngestEventResponse>
ingestBufferEvent: (event: PreIngestionEvent) => Promise<IngestEventResponse>
runBufferEventPipeline: (event: PreIngestionEvent) => Promise<IngestEventResponse>
runEventPipeline: (event: PluginEvent) => Promise<void>
}

Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/utils/internal-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class InternalMetrics {
uuid: new UUIDT().toString(),
}

promises.push(piscina.run({ task: 'ingestEvent', args: { event } }))
promises.push(piscina.run({ task: 'runEventPipeline', args: { event } }))
}

await Promise.all(promises)
Expand Down
79 changes: 0 additions & 79 deletions plugin-server/src/worker/ingestion/event-pipeline.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Person, PreIngestionEvent } from '../../../types'
import { EventPipelineRunner, StepResult } from './runner'

export async function createEventStep(
runner: EventPipelineRunner,
event: PreIngestionEvent,
person: Person | undefined
): Promise<StepResult> {
const [, , elements] = await runner.hub.eventsProcessor.createEvent(event)
return runner.nextStep('runAsyncHandlersStep', event, person, elements)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { DateTime } from 'luxon'

import { Hub, Person, PreIngestionEvent, TeamId } from '../../../types'
import { EventPipelineRunner, StepResult } from './runner'

export async function emitToBufferStep(
runner: EventPipelineRunner,
event: PreIngestionEvent,
shouldBuffer: (
hub: Hub,
event: PreIngestionEvent,
person: Person | undefined,
teamId: TeamId
) => boolean = shouldSendEventToBuffer
): Promise<StepResult> {
const person = await runner.hub.db.fetchPerson(event.teamId, event.distinctId)

if (shouldBuffer(runner.hub, event, person, event.teamId)) {
await runner.hub.eventsProcessor.produceEventToBuffer(event)
return null
} else {
return runner.nextStep('createEventStep', event, person)
}
}

// context: https://github.com/PostHog/posthog/issues/9182
// TL;DR: events from a recently created non-anonymous person are sent to a buffer
// because their person_id might change. We merge based on the person_id of the anonymous user
// so ingestion is delayed for those events to increase our chances of getting person_id correctly
export function shouldSendEventToBuffer(
hub: Hub,
event: PreIngestionEvent,
person: Person | undefined,
teamId: TeamId
): boolean {
const isAnonymousEvent =
event.properties && event.properties['$device_id'] && event.distinctId === event.properties['$device_id']
const isRecentPerson =
!person || DateTime.now().diff(person.created_at).as('seconds') < hub.BUFFER_CONVERSION_SECONDS
const ingestEventDirectly = isAnonymousEvent || event.event === '$identify' || !isRecentPerson
const sendToBuffer = !ingestEventDirectly

if (sendToBuffer) {
hub.statsd?.increment('conversion_events_buffer_size', { teamId: event.teamId.toString() })
}

if (!hub.CONVERSION_BUFFER_ENABLED && !hub.conversionBufferEnabledTeams.has(teamId)) {
return false
}

return sendToBuffer
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { PluginEvent } from '@posthog/plugin-scaffold'

import { runInstrumentedFunction } from '../../../main/utils'
import { runProcessEvent } from '../../plugins/run'
import { EventPipelineRunner, StepResult } from './runner'

export async function pluginsProcessEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise<StepResult> {
let processedEvent: PluginEvent | null = event

// run processEvent on all events that are not $snapshot
if (event.event !== '$snapshot') {
processedEvent = await runInstrumentedFunction({
server: runner.hub,
event,
func: (event) => runProcessEvent(runner.hub, event),
statsKey: 'kafka_queue.single_event',
timeoutMessage: 'Still running plugins on event. Timeout warning after 30 sec!',
})
}

if (processedEvent) {
return runner.nextStep('prepareEventStep', processedEvent)
} else {
// processEvent might not return an event. This is expected and plugins, e.g. downsample plugin uses it.
runner.hub.statsd?.increment('kafka_queue.dropped_event', {
teamID: String(event.team_id),
})
return null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'

import { EventPipelineRunner, StepResult } from './runner'

export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise<StepResult> {
const { ip, site_url, team_id, now, sent_at, uuid } = event
const distinctId = String(event.distinct_id)
const preIngestionEvent = await runner.hub.eventsProcessor.processEvent(
distinctId,
ip,
event,
team_id,
DateTime.fromISO(now),
sent_at ? DateTime.fromISO(sent_at) : null,
uuid!, // it will throw if it's undefined,
site_url
)

if (preIngestionEvent && preIngestionEvent.event !== '$snapshot') {
return runner.nextStep('emitToBufferStep', preIngestionEvent)
} else if (preIngestionEvent && preIngestionEvent.event === '$snapshot') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else if (preIngestionEvent && preIngestionEvent.event === '$snapshot') {
} else if (preIngestionEvent) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional - I was trying to make it clearer to the reader it's dealing with a snapshot here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair

return runner.nextStep('runAsyncHandlersStep', preIngestionEvent, undefined, undefined)
} else {
return null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { runInstrumentedFunction } from '../../../main/utils'
import { Action, Element, Person, PreIngestionEvent } from '../../../types'
import { convertToProcessedPluginEvent } from '../../../utils/event'
import { runOnAction, runOnEvent, runOnSnapshot } from '../../plugins/run'
import { EventPipelineRunner, StepResult } from './runner'

export async function runAsyncHandlersStep(
runner: EventPipelineRunner,
event: PreIngestionEvent,
person: Person | undefined,
elements: Element[] | undefined
): Promise<StepResult> {
const promises = []
let actionMatches: Action[] = []
if (event.event !== '$snapshot') {
actionMatches = await runner.hub.actionMatcher.match(event, person, elements)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we actually run the onEvent / onSnapshot flow earlier? I'd like to make sure that runs if the action path is broken. Also diff is probably minimal but I'd like to trigger exporting an event before action webhooks etc.

will anyway group code together that's used together

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. Do you want to ignore errors from action matching when deciding whether to run onAction? That's a new requirement if so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - let me submit a suggestion as to what I want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mistyped my question - onEvent calling should not be affected by action-related errors? If so, I think re-ordering is too implicit about that and we should make that obvious in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's resolve this in a follow-up PR - I'm in merge conflict hell until this is in.

promises.push(runner.hub.hookCannon.findAndFireHooks(event, person, event.siteUrl, actionMatches))
}

const processedPluginEvent = convertToProcessedPluginEvent(event)
const isSnapshot = event.event === '$snapshot'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could move this up given we have a isSnapshot test on line 15

const method = isSnapshot ? runOnSnapshot : runOnEvent
promises.push(
runInstrumentedFunction({
server: runner.hub,
event: processedPluginEvent,
func: (event) => method(runner.hub, event),
statsKey: `kafka_queue.single_${isSnapshot ? 'on_snapshot' : 'on_event'}`,
timeoutMessage: `After 30 seconds still running ${isSnapshot ? 'onSnapshot' : 'onEvent'}`,
})
)
for (const actionMatch of actionMatches) {
promises.push(
runInstrumentedFunction({
server: runner.hub,
event: processedPluginEvent,
func: (event) => runOnAction(runner.hub, actionMatch, event),
statsKey: `kafka_queue.on_action`,
timeoutMessage: 'After 30 seconds still running onAction',
})
)
}

await Promise.all(promises)

return null
}
Loading