Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(plugin-server): Run ingestion only on worker threads #9738

Merged
merged 15 commits into from
May 13, 2022
Merged
49 changes: 1 addition & 48 deletions plugin-server/src/main/ingestion-queues/ingest-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ import { PluginEvent } from '@posthog/plugin-scaffold'

import { Hub, WorkerMethods } from '../../types'
import { status } from '../../utils/status'
import { onEvent } from '../runner/on-event'
import { runInstrumentedFunction } from '../utils'
import { Action } from './../../types'
import { processEvent } from './process-event'

export async function ingestEvent(
server: Hub,
Expand All @@ -14,55 +10,12 @@ export async function ingestEvent(
checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again
): Promise<void> {
const eachEventStartTimer = new Date()
const isSnapshot = event.event === '$snapshot'

checkAndPause?.()

server.statsd?.increment('kafka_queue_ingest_event_hit')

const processedEvent = await processEvent(server, workerMethods, event)

checkAndPause?.()

if (processedEvent) {
let actionMatches: Action[] = []
await Promise.all([
runInstrumentedFunction({
server,
event: processedEvent,
func: async (event) => {
const result = await workerMethods.ingestEvent(event)
actionMatches = result.actionMatches || []
},
statsKey: 'kafka_queue.single_ingestion',
timeoutMessage: 'After 30 seconds still ingesting event',
}),
onEvent(server, workerMethods, processedEvent),
])

server.statsd?.increment('kafka_queue_single_event_processed_and_ingested')

if (actionMatches.length > 0) {
const promises = []
for (const actionMatch of actionMatches) {
promises.push(
runInstrumentedFunction({
server,
event: processedEvent,
func: (event) => workerMethods.onAction(actionMatch, event),
statsKey: `kafka_queue.on_action`,
timeoutMessage: 'After 30 seconds still running onAction',
})
)
}
await Promise.all(promises)
}
} else {
// processEvent might not return an event. This is expected and plugins, e.g. downsample plugin uses it.
server.statsd?.increment('kafka_queue.dropped_event', {
teamID: String(event.team_id),
})
}
await workerMethods.runEventPipeline(event)

server.statsd?.timing('kafka_queue.each_event', eachEventStartTimer)
server.internalMetrics?.incr('$$plugin_server_events_processed')
Expand Down
25 changes: 8 additions & 17 deletions plugin-server/src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import { PluginServerMode } from '../../types'
import { Hub, Queue, WorkerMethods } from '../../types'
import { status } from '../../utils/status'
import { groupIntoBatches, killGracefully, sanitizeEvent } from '../../utils/utils'
import { onEvent } from '../runner/on-event'
import { runInstrumentedFunction } from '../utils'
import { KAFKA_BUFFER } from './../../config/kafka-topics'
import { ingestEvent } from './ingest-event'

class DelayProcessing extends Error {}
Expand Down Expand Up @@ -44,21 +42,14 @@ export class KafkaQueue implements Queue {
}

private async eachMessageIngestion(message: KafkaMessage): Promise<void> {
// Currently the else part is never triggered. The plugin server can only be
// in "ingestion" mode at the moment, and onEvent is triggered in ingestEvent
if (this.pluginServerMode === PluginServerMode.Ingestion) {
const { data: dataStr, ...rawEvent } = JSON.parse(message.value!.toString())
const combinedEvent = { ...rawEvent, ...JSON.parse(dataStr) }
const event: PluginEvent = sanitizeEvent({
...combinedEvent,
site_url: combinedEvent.site_url || null,
ip: combinedEvent.ip || null,
})
await ingestEvent(this.pluginsServer, this.workerMethods, event)
} else {
const event = JSON.parse(message.value!.toString())
await onEvent(this.pluginsServer, this.workerMethods, event)
}
const { data: dataStr, ...rawEvent } = JSON.parse(message.value!.toString())
const combinedEvent = { ...rawEvent, ...JSON.parse(dataStr) }
const event: PluginEvent = sanitizeEvent({
...combinedEvent,
site_url: combinedEvent.site_url || null,
ip: combinedEvent.ip || null,
})
await ingestEvent(this.pluginsServer, this.workerMethods, event)
}

private async eachMessageBuffer(
Expand Down
30 changes: 0 additions & 30 deletions plugin-server/src/main/ingestion-queues/process-event.ts

This file was deleted.

5 changes: 5 additions & 0 deletions plugin-server/src/main/ingestion-queues/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ export async function startQueues(
server.lastActivityType = 'ingestBufferEvent'
return piscina.run({ task: 'ingestBufferEvent', args: { event } })
},
runEventPipeline: (event: PluginEvent) => {
server.lastActivity = new Date().valueOf()
server.lastActivityType = 'runEventPipeline'
return piscina.run({ task: 'runEventPipeline', args: { event } })
},
...workerMethods,
}

Expand Down
9 changes: 6 additions & 3 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export async function startPluginsServer(
let piscina: Piscina | undefined
let queue: Queue | undefined | null // ingestion queue
let redisQueueForPluginJobs: Queue | undefined | null
let healthCheckConsumer: Consumer | undefined
let jobQueueConsumer: JobQueueConsumerControl | undefined
let closeHub: () => Promise<void> | undefined
let pluginScheduleControl: PluginScheduleControl | undefined
Expand Down Expand Up @@ -90,6 +91,7 @@ export async function startPluginsServer(
await pubSub?.stop()
await jobQueueConsumer?.stop()
await pluginScheduleControl?.stopSchedule()
await healthCheckConsumer?.stop()
await new Promise<void>((resolve, reject) =>
!mmdbServer
? resolve()
Expand Down Expand Up @@ -264,12 +266,13 @@ export async function startPluginsServer(
serverInstance.stop = closeJobs

if (hub.kafka) {
serverInstance.kafkaHealthcheckConsumer = await setupKafkaHealthcheckConsumer(hub.kafka)
healthCheckConsumer = await setupKafkaHealthcheckConsumer(hub.kafka)
serverInstance.kafkaHealthcheckConsumer = healthCheckConsumer

await serverInstance.kafkaHealthcheckConsumer.connect()
await healthCheckConsumer.connect()

try {
serverInstance.kafkaHealthcheckConsumer.pause([{ topic: KAFKA_HEALTHCHECK }])
healthCheckConsumer.pause([{ topic: KAFKA_HEALTHCHECK }])
} catch (err) {
// It's fine to do nothing for now - Kafka issues will be caught by the periodic healthcheck
status.error('🔴', 'Failed to pause Kafka healthcheck consumer on connect!')
Expand Down
23 changes: 0 additions & 23 deletions plugin-server/src/main/runner/on-event.ts

This file was deleted.

1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ export type WorkerMethods = {
processEvent: (event: PluginEvent) => Promise<PluginEvent | null>
ingestEvent: (event: PluginEvent) => Promise<IngestEventResponse>
ingestBufferEvent: (event: PreIngestionEvent) => Promise<IngestEventResponse>
runEventPipeline: (event: PluginEvent) => Promise<void>
}

export type VMMethods = {
Expand Down
84 changes: 84 additions & 0 deletions plugin-server/src/worker/ingestion/event-pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { PluginEvent } from '@posthog/plugin-scaffold'

import { runInstrumentedFunction } from '../../main/utils'
import { Hub } from '../../types'
import { runOnAction, runOnEvent, runOnSnapshot, runProcessEvent } from '../plugins/run'
import { Action } from './../../types'
import { ingestEvent } from './ingest-event'

export async function runEventPipeline(server: Hub, event: PluginEvent): Promise<void> {
const processedEvent = await processEvent(server, event)

if (processedEvent) {
let actionMatches: Action[] = []
await Promise.all([
runInstrumentedFunction({
server,
event: processedEvent,
func: async (event) => {
const result = await ingestEvent(server, event)
actionMatches = result.actionMatches || []
},
statsKey: 'kafka_queue.single_ingestion',
timeoutMessage: 'After 30 seconds still ingesting event',
}),
onEvent(server, processedEvent),
])

server.statsd?.increment('kafka_queue_single_event_processed_and_ingested')

if (actionMatches.length > 0) {
const promises = []
for (const actionMatch of actionMatches) {
promises.push(
runInstrumentedFunction({
server,
event: processedEvent,
func: (event) => runOnAction(server, actionMatch, event),
statsKey: `kafka_queue.on_action`,
timeoutMessage: 'After 30 seconds still running onAction',
})
)
}
await Promise.all(promises)
}
} else {
// processEvent might not return an event. This is expected and plugins, e.g. downsample plugin uses it.
server.statsd?.increment('kafka_queue.dropped_event', {
teamID: String(event.team_id),
})
}
}

export async function processEvent(server: Hub, event: PluginEvent): Promise<PluginEvent | null> {
const isSnapshot = event.event === '$snapshot'

let processedEvent: PluginEvent | null = event

// run processEvent on all events that are not $snapshot
if (!isSnapshot) {
processedEvent = await runInstrumentedFunction({
server,
event,
func: (event) => runProcessEvent(server, event),
statsKey: 'kafka_queue.single_event',
timeoutMessage: 'Still running plugins on event. Timeout warning after 30 sec!',
})
}

return processedEvent
}

export async function onEvent(server: Hub, event: PluginEvent): Promise<void> {
const isSnapshot = event.event === '$snapshot'

const method = isSnapshot ? runOnSnapshot : runOnEvent

await runInstrumentedFunction({
server,
event: event,
func: (event) => method(server, event),
statsKey: `kafka_queue.single_${isSnapshot ? 'on_snapshot' : 'on_event'}`,
timeoutMessage: `After 30 seconds still running ${isSnapshot ? 'onSnapshot' : 'onEvent'}`,
})
}
17 changes: 12 additions & 5 deletions plugin-server/src/worker/ingestion/ingest-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Sentry from '@sentry/node'
import { DateTime } from 'luxon'
import { CachedPersonData } from 'utils/db/db'

import { Element, Hub, IngestEventResponse, Person, PreIngestionEvent } from '../../types'
import { Element, Hub, IngestEventResponse, Person, PreIngestionEvent, TeamId } from '../../types'
import { timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'
import { Action } from './../../types'
Expand Down Expand Up @@ -32,9 +32,7 @@ export async function ingestEvent(hub: Hub, event: PluginEvent): Promise<IngestE
const person = await hub.db.fetchPerson(team_id, distinctId)

// even if the buffer is disabled we want to get metrics on how many events would have gone to it
const sendEventToBuffer =
shouldSendEventToBuffer(hub, result, person) &&
(hub.CONVERSION_BUFFER_ENABLED || hub.conversionBufferEnabledTeams.has(team_id))
const sendEventToBuffer = shouldSendEventToBuffer(hub, result, person, team_id)

if (sendEventToBuffer) {
await hub.eventsProcessor.produceEventToBuffer(result)
Expand Down Expand Up @@ -98,7 +96,12 @@ async function handleActionMatches(
// TL;DR: events from a recently created non-anonymous person are sent to a buffer
// because their person_id might change. We merge based on the person_id of the anonymous user
// so ingestion is delayed for those events to increase our chances of getting person_id correctly
function shouldSendEventToBuffer(hub: Hub, event: PreIngestionEvent, person?: Person) {
function shouldSendEventToBuffer(
hub: Hub,
event: PreIngestionEvent,
person: Person | undefined,
teamId: TeamId
): boolean {
const isAnonymousEvent =
event.properties && event.properties['$device_id'] && event.distinctId === event.properties['$device_id']
const isRecentPerson = !person || DateTime.now().diff(person.created_at).seconds < hub.BUFFER_CONVERSION_SECONDS
Expand All @@ -109,5 +112,9 @@ function shouldSendEventToBuffer(hub: Hub, event: PreIngestionEvent, person?: Pe
hub.statsd?.increment('conversion_events_buffer_size', { teamId: event.teamId.toString() })
}

if (!hub.CONVERSION_BUFFER_ENABLED && !hub.conversionBufferEnabledTeams.has(teamId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return false
}

return sendToBuffer
}
4 changes: 4 additions & 0 deletions plugin-server/src/worker/tasks.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { PluginEvent } from '@posthog/plugin-scaffold/src/types'

import { Action, Alert, EnqueuedJob, Hub, PluginTaskType, PreIngestionEvent, Team } from '../types'
import { runEventPipeline } from './ingestion/event-pipeline'
import { ingestBufferEvent, ingestEvent } from './ingestion/ingest-event'
import { runHandleAlert, runOnAction, runOnEvent, runOnSnapshot, runPluginTask, runProcessEvent } from './plugins/run'
import { loadSchedule, setupPlugins } from './plugins/setup'
Expand Down Expand Up @@ -39,6 +40,9 @@ export const workerTasks: Record<string, TaskRunner> = {
getPluginSchedule: (hub) => {
return hub.pluginSchedule
},
runEventPipeline: async (hub, args: { event: PluginEvent }) => {
return await runEventPipeline(hub, args.event)
},
ingestEvent: async (hub, args: { event: PluginEvent }) => {
return await ingestEvent(hub, args.event)
},
Expand Down
25 changes: 11 additions & 14 deletions plugin-server/tests/helpers/clickhouse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@ export async function resetTestDatabaseClickhouse(extraServerConfig: Partial<Plu
output_format_json_quote_64bit_integers: false,
},
})
await clickhouse.querying('TRUNCATE sharded_events')
await clickhouse.querying('TRUNCATE events_mv')
await clickhouse.querying('TRUNCATE person')
await clickhouse.querying('TRUNCATE person_distinct_id')
await clickhouse.querying('TRUNCATE person_distinct_id2')
await clickhouse.querying('TRUNCATE person_mv')
await clickhouse.querying('TRUNCATE person_static_cohort')
await clickhouse.querying('TRUNCATE session_recording_events')
await clickhouse.querying('TRUNCATE session_recording_events_mv')
await clickhouse.querying('TRUNCATE plugin_log_entries')
await clickhouse.querying('TRUNCATE events_dead_letter_queue')
await clickhouse.querying('TRUNCATE events_dead_letter_queue_mv')
await clickhouse.querying('TRUNCATE groups')
await clickhouse.querying('TRUNCATE groups_mv')
await Promise.all([
clickhouse.querying('TRUNCATE sharded_events'),
clickhouse.querying('TRUNCATE person'),
clickhouse.querying('TRUNCATE person_distinct_id'),
clickhouse.querying('TRUNCATE person_distinct_id2'),
clickhouse.querying('TRUNCATE person_static_cohort'),
clickhouse.querying('TRUNCATE sharded_session_recording_events'),
clickhouse.querying('TRUNCATE plugin_log_entries'),
clickhouse.querying('TRUNCATE events_dead_letter_queue'),
clickhouse.querying('TRUNCATE groups'),
])
}
Loading