Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Commit

Permalink
Babel Loop Timeouts (#155)
Browse files Browse the repository at this point in the history
* transpile code via babel-standalone

* move vm.ts into vm/

* add TASK_TIMEOUT config

* add babel transform to timeout while loops

* update refactored rename

* vm while timeout test

* add more unimplemented timeout tests

* add transforms for while/for/do-while loops

* fix loop timeout ms

* remove extra ;

* fix import

* fix more imports

* fix even more imports

* fix error messages

* simplify errors

* small refactor

* fix import

* add async guard and protect against long promises

* async guard around setupPlugin()

* add column

* slightly safer variables

* less noise in vm bench

* can not override async guard for the main functions (e.g. processEvent, etc)

* reduce how much we do in a benchmark

* add types to loop timeout

* types for promise timeout

* fix line/column numbers

* explain some decisions

* verify that the process event timeout applies e2e.

* managed to get equal, so changing

* update message that it's just a warning

* add e2e kafka test for bad delay

* shorter test in github action

* increase test timeout to see if that makes a difference (locally it takes 3min for both tests)

* skip the "slow on GH action" test for now

* process kafka events in parallel by default

* add more metrics to kafka queue

* some debug to help fix the test

* add debug log in the delayUntilEventIngested function

* add "single_event_batch" timing to the event processing steps

* fix timer

* Improve timeoutGuard default timeout

* revert benchmark to the last one that worked

* skip bad delay

* add back a 1:1 copy of the e2e.kafka test, but with the timeout code. see if GH actions run

* remove broken tests, improve logging of working tests

* Clan up vm.ts

* Improve clarity of loopTimeout

* Refactor transforms slightly

* Refactor transform call to secureCode func

* Add secureCode tests

Co-authored-by: Michael Matloka <dev@twixes.com>
  • Loading branch information
mariusandra and Twixes authored Feb 18, 2021
1 parent e91d131 commit b235de3
Show file tree
Hide file tree
Showing 33 changed files with 995 additions and 183 deletions.
4 changes: 2 additions & 2 deletions benchmarks/clickhouse/e2e.kafka.benchmark.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { performance } from 'perf_hooks'

import { createPosthog, DummyPostHog } from '../../src/extensions/posthog'
import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics'
import { startPluginsServer } from '../../src/server'
import { LogLevel, PluginsServerConfig, Queue } from '../../src/types'
import { PluginsServer } from '../../src/types'
import { delay, UUIDT } from '../../src/utils'
import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog'
import { makePiscina } from '../../src/worker/piscina'
import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse'
import { resetKafka } from '../../tests/helpers/kafka'
Expand Down Expand Up @@ -84,7 +84,7 @@ describe('e2e kafka & clickhouse benchmark', () => {

const n = (n: number) => `${Math.round(n * 100) / 100}`
console.log(
`[Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n(
`ℹ️️ [Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n(
1000 / (timeMs / count)
)} events/sec, ${n(timeMs / count)}ms per event)`
)
Expand Down
97 changes: 97 additions & 0 deletions benchmarks/clickhouse/e2e.timeout.benchmark.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { performance } from 'perf_hooks'

import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../src/ingestion/topics'
import { startPluginsServer } from '../../src/server'
import { ClickHouseEvent, LogLevel, PluginsServerConfig, Queue } from '../../src/types'
import { PluginsServer } from '../../src/types'
import { delay, UUIDT } from '../../src/utils'
import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog'
import { makePiscina } from '../../src/worker/piscina'
import { resetTestDatabaseClickhouse } from '../../tests/helpers/clickhouse'
import { resetKafka } from '../../tests/helpers/kafka'
import { pluginConfig39 } from '../../tests/helpers/plugins'
import { resetTestDatabase } from '../../tests/helpers/sql'
import { delayUntilEventIngested } from '../../tests/shared/process-event'

jest.setTimeout(600000) // 10min timeout

const extraServerConfig: Partial<PluginsServerConfig> = {
KAFKA_ENABLED: true,
KAFKA_HOSTS: process.env.KAFKA_HOSTS || 'kafka:9092',
WORKER_CONCURRENCY: 4,
TASK_TIMEOUT: 5,
PLUGIN_SERVER_INGESTION: true,
KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_BATCH_PARALELL_PROCESSING: true,
LOG_LEVEL: LogLevel.Log,
}

describe('e2e kafka processing timeout benchmark', () => {
let queue: Queue
let server: PluginsServer
let stopServer: () => Promise<void>
let posthog: DummyPostHog

beforeEach(async () => {
await resetTestDatabase(`
async function processEvent (event) {
await new Promise(resolve => __jestSetTimeout(() => resolve(), 15000 * Math.random()))
event.properties.timeout = 'no timeout'
return event
}
`)
await resetKafka(extraServerConfig)
await resetTestDatabaseClickhouse(extraServerConfig)

const startResponse = await startPluginsServer(extraServerConfig, makePiscina)
server = startResponse.server
stopServer = startResponse.stop
queue = startResponse.queue

posthog = createPosthog(server, pluginConfig39)
})

afterEach(async () => {
await stopServer()
})

test('measure performance', async () => {
console.debug = () => null

const count = 3000

// fill in the queue
function createEvent() {
const uuid = new UUIDT().toString()
posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' })
}
await queue.pause()
for (let i = 0; i < count; i++) {
createEvent()
}

// hope that 5sec is enough to load kafka with all the events (posthog.capture can't be awaited)
await delay(5000)
await queue.resume()

console.log('Starting timer')
const startTime = performance.now()
await delayUntilEventIngested(() => server.db.fetchEvents(), count, 500, count)
const timeMs = performance.now() - startTime
console.log('Finished!')

const n = (n: number) => `${Math.round(n * 100) / 100}`
console.log(
`ℹ️️ [Kafka & ClickHouse] Ingested ${count} events in ${n(timeMs / 1000)}s (${n(
1000 / (timeMs / count)
)} events/sec, ${n(timeMs / count)}ms per event)`
)
const events = (await server.db.fetchEvents()) as ClickHouseEvent[]
const passedEvents = events.filter((e) => e.properties.timeout).length
console.log(
`ℹ️ Out of 3000 events: ${passedEvents} took under 5sec, ${
3000 - passedEvents
} timed out. This should be a 1:2 ratio.`
)
})
})
2 changes: 1 addition & 1 deletion benchmarks/postgres/e2e.celery.benchmark.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { performance } from 'perf_hooks'

import { createPosthog, DummyPostHog } from '../../src/extensions/posthog'
import { startPluginsServer } from '../../src/server'
import { LogLevel, PluginsServerConfig, Queue } from '../../src/types'
import { PluginsServer } from '../../src/types'
import { delay, UUIDT } from '../../src/utils'
import { createPosthog, DummyPostHog } from '../../src/vm/extensions/posthog'
import { makePiscina } from '../../src/worker/piscina'
import { pluginConfig39 } from '../../tests/helpers/plugins'
import { resetTestDatabase } from '../../tests/helpers/sql'
Expand Down
37 changes: 21 additions & 16 deletions benchmarks/vm/memory.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types'

import { createServer } from '../../src/server'
import { Plugin, PluginConfig, PluginConfigVMReponse } from '../../src/types'
import { createPluginConfigVM } from '../../src/vm'
import { createPluginConfigVM } from '../../src/vm/vm'
import { commonOrganizationId } from '../../tests/helpers/plugins'

jest.mock('../../src/sql')
Expand Down Expand Up @@ -47,6 +47,7 @@ const mockConfig: PluginConfig = {
}

test('test vm memory usage', async () => {
const debug = false
const numVMs = 1000
const numEventsPerVM = 100

Expand Down Expand Up @@ -76,27 +77,31 @@ test('test vm memory usage', async () => {
const vm = await createPluginConfigVM(server, mockConfig, indexJs)
vms.push(vm)

const nowUsed = getUsed()
console.log(
`Used: ${nowUsed} MB, diff ${nowUsed - used} (${(nowUsed - usedAtStart) / (i + 1)} * ${
i + 1
} used since the start)`
)
used = nowUsed
if (debug || i === numVMs - 1) {
const nowUsed = getUsed()
console.log(
`Used: ${nowUsed} MB, diff ${nowUsed - used} (${(nowUsed - usedAtStart) / (i + 1)} * ${
i + 1
} used since the start)`
)
used = nowUsed
}
}

for (let i = 0; i < numEventsPerVM; i++) {
for (let j = 0; j < numVMs; j++) {
await vms[j].methods.processEvent(createEvent(i + j))
}
global.gc()
const nowUsed = getUsed()
console.log(
`Run ${i}. Used: ${nowUsed} MB, diff ${nowUsed - used} (${nowUsed - usedAtStart} used since the start, ${
(nowUsed - usedAtStart) / numVMs
} per vm)`
)
used = nowUsed
if (debug || i === numEventsPerVM - 1) {
global?.gc?.()
const nowUsed = getUsed()
console.log(
`Run ${i}. Used: ${nowUsed} MB, diff ${nowUsed - used} (${
nowUsed - usedAtStart
} used since the start, ${(nowUsed - usedAtStart) / numVMs} per vm)`
)
used = nowUsed
}
}

await closeServer()
Expand Down
7 changes: 5 additions & 2 deletions benchmarks/vm/worker.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,14 @@ test('piscina worker benchmark', async () => {
`,
},
{
testName: 'for200k',
// used to be 'for200k', but since we inject Date.now() code into
// the for/while/do loops, to throw if they are too long, running
// those comparisons 200k * 10k * runs * threads times is bit too much
testName: 'for2k',
events: 10000,
testCode: `
function processEvent (event, meta) {
let j = 0; for(let i = 0; i < 200000; i++) { j = i };
let j = 0; for(let i = 0; i < 2000; i++) { j = i };
event.properties = { "somewhere": "over the rainbow" };
return event
}
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"repository": "https://github.com/PostHog/posthog-plugin-server",
"license": "MIT",
"dependencies": {
"@babel/standalone": "^7.12.16",
"@google-cloud/bigquery": "^5.5.0",
"@posthog/clickhouse": "^1.7.0",
"@sentry/node": "^5.29.0",
Expand Down Expand Up @@ -72,6 +73,7 @@
"@babel/preset-typescript": "^7.8.3",
"@posthog/plugin-scaffold": "0.2.8",
"@types/adm-zip": "^0.4.33",
"@types/babel__standalone": "^7.1.3",
"@types/ioredis": "^4.17.7",
"@types/jest": "^26.0.15",
"@types/luxon": "^1.25.0",
Expand All @@ -86,7 +88,6 @@
"@types/yargs": "^15.0.9",
"@typescript-eslint/eslint-plugin": "^4.14.0",
"@typescript-eslint/parser": "^4.14.0",
"babel-core": "^7.0.0-bridge.0",
"babel-eslint": "^10.1.0",
"eslint": "^7.18.0",
"eslint-config-prettier": "^7.2.0",
Expand Down
4 changes: 3 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_CLIENT_CERT_KEY_B64: null,
KAFKA_TRUSTED_CERT_B64: null,
KAFKA_CONSUMPTION_TOPIC: null,
KAFKA_BATCH_PARALELL_PROCESSING: false,
KAFKA_BATCH_PARALELL_PROCESSING: true,
PLUGIN_SERVER_INGESTION: false,
PLUGINS_CELERY_QUEUE: 'posthog-plugins',
REDIS_URL: 'redis://127.0.0.1',
Expand All @@ -34,6 +34,7 @@ export function getDefaultConfig(): PluginsServerConfig {
WEB_PORT: 3008,
WEB_HOSTNAME: '0.0.0.0',
WORKER_CONCURRENCY: coreCount,
TASK_TIMEOUT: 30,
TASKS_PER_WORKER: 10,
LOG_LEVEL: LogLevel.Info,
SENTRY_DSN: null,
Expand Down Expand Up @@ -63,6 +64,7 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> {
WEB_PORT: 'port for web server to listen on',
WEB_HOSTNAME: 'hostname for web server to listen on',
WORKER_CONCURRENCY: 'number of concurrent worker threads',
TASK_TIMEOUT: 'How many seconds until tasks are timed out',
TASKS_PER_WORKER: 'number of parallel tasks per worker thread',
LOG_LEVEL: 'minimum log level',
KAFKA_ENABLED: 'use Kafka instead of Celery to ingest events',
Expand Down
23 changes: 18 additions & 5 deletions src/ingestion/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class KafkaQueue implements Queue {
isRunning,
isStale,
}: EachBatchPayload): Promise<void> {
const batchProcessingTimer = new Date()
const batchStartTimer = new Date()

const uuidOrder = new Map<string, number>()
const uuidOffset = new Map<string, string>()
Expand All @@ -63,21 +63,33 @@ export class KafkaQueue implements Queue {
)

const processingTimeout = timeoutGuard(
`Took too long to run plugins on ${pluginEvents.length} events! Timeout after 30 sec!`
`Still running plugins on ${pluginEvents.length} events. Timeout warning after 30 sec!`
)
const batches = groupIntoBatches(pluginEvents, maxBatchSize)
const processedEvents = (await Promise.all(batches.map(this.processEventBatch))).flat()
const processedEvents = (
await Promise.all(
batches.map(async (batch) => {
const timer = new Date()
const processedBatch = this.processEventBatch(batch)
this.pluginsServer.statsd?.timing('kafka_queue.single_event_batch', timer)
return processedBatch
})
)
).flat()

clearTimeout(processingTimeout)

this.pluginsServer.statsd?.timing('kafka_queue.each_batch.process_events', batchStartTimer)
const batchIngestionTimer = new Date()

// Sort in the original order that the events came in, putting any randomly added events to the end.
// This is so we would resolve the correct kafka offsets in order.
processedEvents.sort(
(a, b) => (uuidOrder.get(a.uuid!) || pluginEvents.length) - (uuidOrder.get(b.uuid!) || pluginEvents.length)
)

const ingestionTimeout = timeoutGuard(
`Took too long to ingest ${processedEvents.length} events! Timeout after 30 sec!`
`Still ingesting ${processedEvents.length} events. Timeout warning after 30 sec!`
)

// TODO: add chunking into groups of 500 or so. Might start too many promises at once now
Expand Down Expand Up @@ -126,7 +138,8 @@ export class KafkaQueue implements Queue {

clearTimeout(ingestionTimeout)

this.pluginsServer.statsd?.timing('kafka_queue.each_batch', batchProcessingTimer)
this.pluginsServer.statsd?.timing('kafka_queue.each_batch.ingest_events', batchIngestionTimer)
this.pluginsServer.statsd?.timing('kafka_queue.each_batch', batchStartTimer)
resolveOffset(batch.lastOffset())
await heartbeat()
await commitOffsetsIfNecessary()
Expand Down
5 changes: 3 additions & 2 deletions src/ingestion/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Sentry from '@sentry/node'
import crypto from 'crypto'

import { defaultConfig } from '../config'
import { BasePerson, Element, Person, RawPerson } from '../types'

export function unparsePersonPartial(person: Partial<Person>): Partial<RawPerson> {
Expand Down Expand Up @@ -156,9 +157,9 @@ export function chainToElements(chain: string): Element[] {
return elements
}

export function timeoutGuard(message: string): NodeJS.Timeout {
export function timeoutGuard(message: string, timeout = defaultConfig.TASK_TIMEOUT * 1000): NodeJS.Timeout {
return setTimeout(() => {
console.log(`⌛⌛⌛ ${message}`)
Sentry.captureMessage(message)
}, 30000)
}, timeout)
}
2 changes: 1 addition & 1 deletion src/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { getPluginAttachmentRows, getPluginConfigRows, getPluginRows } from './s
import { status } from './status'
import { PluginConfig, PluginJsonConfig, PluginsServer, PluginTask, TeamId } from './types'
import { getFileFromArchive } from './utils'
import { createPluginConfigVM } from './vm'
import { createPluginConfigVM } from './vm/vm'

export async function setupPlugins(server: PluginsServer): Promise<void> {
const pluginRows = await getPluginRows(server)
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export enum LogLevel {
export interface PluginsServerConfig extends Record<string, any> {
WORKER_CONCURRENCY: number
TASKS_PER_WORKER: number
TASK_TIMEOUT: number
CELERY_DEFAULT_QUEUE: string
DATABASE_URL: string
CLICKHOUSE_HOST: string
Expand Down
8 changes: 8 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,11 @@ export function groupIntoBatches<T>(array: T[], batchSize: number): T[][] {
}
return batches
}

/** Template literal function, to standardize JS code used internally to form without extraneous indentation. */
export function code(strings: TemplateStringsArray): string {
const stringsConcat = strings.join('…')
const indentation = stringsConcat.match(/^\n([ ]*)/)?.[1].length ?? 0
const dedentedCode = stringsConcat.replace(new RegExp(`^[ ]{${indentation}}`, 'gm'), '')
return dedentedCode.trim()
}
Loading

0 comments on commit b235de3

Please sign in to comment.