diff --git a/benchmarks/vm/memory.benchmark.ts b/benchmarks/vm/memory.benchmark.ts index 40e7263e..a2b32f19 100644 --- a/benchmarks/vm/memory.benchmark.ts +++ b/benchmarks/vm/memory.benchmark.ts @@ -1,7 +1,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' import { createServer } from '../../src/server' -import { Plugin, PluginConfig, PluginConfigVMReponse } from '../../src/types' +import { Plugin, PluginConfig, PluginConfigLazyVMResponse, PluginConfigVMResponse } from '../../src/types' import { createPluginConfigVM } from '../../src/vm/vm' import { commonOrganizationId } from '../../tests/helpers/plugins' @@ -71,10 +71,10 @@ test('test vm memory usage', async () => { const usedAtStart = getUsed() let used = usedAtStart - const vms: PluginConfigVMReponse[] = [] + const vms: PluginConfigLazyVMResponse[] = [] for (let i = 0; i < numVMs; i++) { - const vm = await createPluginConfigVM(server, mockConfig, indexJs) + const vm = createPluginConfigVM(server, mockConfig, indexJs) vms.push(vm) if (debug || i === numVMs - 1) { diff --git a/package.json b/package.json index 932a397a..46936a89 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,7 @@ "scripts": { "test": "jest --runInBand tests/**/*.test.ts", "test:postgres": "jest --runInBand tests/postgres/*.test.ts tests/*.test.ts", + "test:clickhouse": "yarn test:clickhouse:1 && yarn test:clickhouse:2", "test:clickhouse:1": "jest --runInBand tests/clickhouse/postgres-parity.test.ts tests/clickhouse/e2e.test.ts tests/clickhouse/ingestion-utils.test.ts", "test:clickhouse:2": "jest --runInBand tests/clickhouse/process-event.test.ts", "benchmark": "yarn run benchmarks:clickhouse && yarn run benchmark:postgres && yarn run benchmarks:vm", @@ -16,8 +17,8 @@ "benchmark:vm:worker": "node --expose-gc node_modules/.bin/jest --runInBand benchmarks/vm/worker.benchmark.ts", "start": "yarn start:dev", "start:dist": "node dist/index.js --base-dir ../posthog", - "start:dev": "ts-node-dev --exit-child src/index.ts --base-dir ../posthog", - "start:dev:ee": "KAFKA_ENABLED=true KAFKA_HOSTS=localhost:9092 yarn start:dev", + "start:dev": "WORKER_CONCURRENCY=4 ts-node-dev --exit-child src/index.ts --base-dir ../posthog", + "start:dev:ee": "WORKER_CONCURRENCY=4 KAFKA_ENABLED=true KAFKA_HOSTS=localhost:9092 yarn start:dev", "build": "yarn clean && yarn compile", "clean": "rimraf dist/*", "compile:protobuf": "cd src/idl/ && rimraf protos.* && pbjs -t static-module -w commonjs -o protos.js *.proto && pbts -o protos.d.ts protos.js && eslint --fix . && prettier --write .", diff --git a/src/plugins.ts b/src/plugins.ts index 3e9e2f21..cd24b74d 100644 --- a/src/plugins.ts +++ b/src/plugins.ts @@ -40,7 +40,6 @@ export async function setupPlugins(server: PluginsServer): Promise { const pluginConfigRows = await getPluginConfigRows(server) const foundPluginConfigs = new Map() server.pluginConfigsPerTeam.clear() - server.defaultConfigs = [] for (const row of pluginConfigRows) { const plugin = server.plugins.get(row.plugin_id) if (!plugin) { @@ -56,15 +55,16 @@ export async function setupPlugins(server: PluginsServer): Promise { server.pluginConfigs.set(row.id, pluginConfig) if (!row.team_id) { - server.defaultConfigs.push(row) - } else { - let teamConfigs = server.pluginConfigsPerTeam.get(row.team_id) - if (!teamConfigs) { - teamConfigs = [] - server.pluginConfigsPerTeam.set(row.team_id, teamConfigs) - } - teamConfigs.push(pluginConfig) + console.error(`🔴 PluginConfig(id=${row.id}) without team_id!`) + continue + } + + let teamConfigs = server.pluginConfigsPerTeam.get(row.team_id) + if (!teamConfigs) { + teamConfigs = [] + server.pluginConfigsPerTeam.set(row.team_id, teamConfigs) } + teamConfigs.push(pluginConfig) } for (const [id, pluginConfig] of server.pluginConfigs) { if (!foundPluginConfigs.has(id)) { @@ -86,15 +86,7 @@ export async function setupPlugins(server: PluginsServer): Promise { const sortFunction = (a: PluginConfig, b: PluginConfig) => a.order - b.order for (const teamId of server.pluginConfigsPerTeam.keys()) { - if (server.defaultConfigs.length > 0) { - const combinedPluginConfigs = [ - ...(server.pluginConfigsPerTeam.get(teamId) || []), - ...server.defaultConfigs, - ].sort(sortFunction) - server.pluginConfigsPerTeam.set(teamId, combinedPluginConfigs) - } else { - server.pluginConfigsPerTeam.get(teamId)?.sort(sortFunction) - } + server.pluginConfigsPerTeam.get(teamId)?.sort(sortFunction) } } @@ -137,14 +129,9 @@ async function loadPlugin(server: PluginsServer, pluginConfig: PluginConfig): Pr const jsPath = path.resolve(pluginPath, config['main'] || 'index.js') const indexJs = fs.readFileSync(jsPath).toString() - const libPath = path.resolve(pluginPath, config['lib'] || 'lib.js') - const libJs = fs.existsSync(libPath) ? fs.readFileSync(libPath).toString() : '' - if (libJs) { - console.warn(`⚠️ Using "lib.js" is deprecated! Used by: ${plugin.name} (${plugin.url})`) - } - try { - pluginConfig.vm = await createPluginConfigVM(server, pluginConfig, indexJs, libJs) + pluginConfig.vm = createPluginConfigVM(server, pluginConfig, indexJs) + await pluginConfig.vm.vmPromise // status.info('🔌', `Loaded local plugin "${plugin.name}" from "${pluginPath}"!`) await clearError(server, pluginConfig) return true @@ -165,14 +152,11 @@ async function loadPlugin(server: PluginsServer, pluginConfig: PluginConfig): Pr } const indexJs = await getFileFromArchive(archive, config['main'] || 'index.js') - const libJs = await getFileFromArchive(archive, config['lib'] || 'lib.js') - if (libJs) { - console.warn(`⚠️ Using "lib.js" is deprecated! Used by: ${plugin.name} (${plugin.url})`) - } if (indexJs) { try { - pluginConfig.vm = await createPluginConfigVM(server, pluginConfig, indexJs, libJs || '') + pluginConfig.vm = createPluginConfigVM(server, pluginConfig, indexJs) + await pluginConfig.vm.vmPromise status.info('🔌', `Loaded plugin "${plugin.name}"!`) await clearError(server, pluginConfig) return true @@ -184,7 +168,8 @@ async function loadPlugin(server: PluginsServer, pluginConfig: PluginConfig): Pr } } else if (plugin.plugin_type === 'source' && plugin.source) { try { - pluginConfig.vm = await createPluginConfigVM(server, pluginConfig, plugin.source) + pluginConfig.vm = createPluginConfigVM(server, pluginConfig, plugin.source) + await pluginConfig.vm.vmPromise status.info('🔌', `Loaded plugin "${plugin.name}"!`) await clearError(server, pluginConfig) return true diff --git a/src/types.ts b/src/types.ts index 4e66541c..e88bb12d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -119,7 +119,7 @@ export interface PluginConfig { config: Record error?: PluginError attachments?: Record - vm?: PluginConfigVMReponse | null + vm?: PluginConfigLazyVMResponse | null } export interface PluginJsonConfig { @@ -156,7 +156,16 @@ export interface PluginTask { exec: () => Promise } -export interface PluginConfigVMReponse { +export interface PluginConfigLazyVMResponse { + vmPromise: Promise + methods: { + processEvent: (event: PluginEvent) => Promise + processEventBatch: (batch: PluginEvent[]) => Promise + } + tasks: Record +} + +export interface PluginConfigVMResponse { vm: VM methods: { processEvent: (event: PluginEvent) => Promise diff --git a/src/vm/vm.ts b/src/vm/vm.ts index d6efd00b..c9faf31a 100644 --- a/src/vm/vm.ts +++ b/src/vm/vm.ts @@ -2,7 +2,7 @@ import { randomBytes } from 'crypto' import fetch from 'node-fetch' import { VM } from 'vm2' -import { PluginConfig, PluginConfigVMReponse, PluginsServer } from '../types' +import { PluginConfig, PluginConfigLazyVMResponse, PluginConfigVMResponse, PluginsServer } from '../types' import { createCache } from './extensions/cache' import { createConsole } from './extensions/console' import { createGoogle } from './extensions/google' @@ -10,14 +10,35 @@ import { createPosthog } from './extensions/posthog' import { createStorage } from './extensions/storage' import { secureCode } from './transforms' -export async function createPluginConfigVM( +export function createPluginConfigVM( server: PluginsServer, pluginConfig: PluginConfig, // NB! might have team_id = 0 - indexJs: string, - libJs = '' -): Promise { - const rawCode = libJs ? `${libJs};${indexJs}` : indexJs - const securedCode = secureCode(rawCode, server) + indexJs: string +): PluginConfigLazyVMResponse { + const createVmPromise = createActualVM(server, pluginConfig, indexJs) + + const lazyResponse: PluginConfigLazyVMResponse = { + vmPromise: createVmPromise.then((vm) => vm.vm), + methods: { + processEvent: (event) => createVmPromise.then((vm) => vm.methods.processEvent(event)), + processEventBatch: (batch) => createVmPromise.then((vm) => vm.methods.processEventBatch(batch)), + }, + tasks: {}, // new Proxy(), + } + + void createVmPromise.then(({ tasks }) => { + lazyResponse.tasks = tasks + }) + + return lazyResponse +} + +export async function createActualVM( + server: PluginsServer, + pluginConfig: PluginConfig, // NB! might have team_id = 0 + indexJs: string +): Promise { + const securedCode = secureCode(indexJs, server) // Create virtual machine const vm = new VM({ @@ -131,8 +152,8 @@ export async function createPluginConfigVM( // export various functions const __methods = { - processEvent: __asyncFunctionGuard(__bindMeta('processEvent')), - processEventBatch: __asyncFunctionGuard(__bindMeta('processEventBatch')) + processEvent: __asyncFunctionGuard(__bindMeta('processEvent')) || ((event) => event), + processEventBatch: __asyncFunctionGuard(__bindMeta('processEventBatch')) || ((batch) => batch) }; // gather the runEveryX commands and export in __tasks diff --git a/tests/postgres/vm.test.ts b/tests/postgres/vm.test.ts index 91f00d41..b7b56de0 100644 --- a/tests/postgres/vm.test.ts +++ b/tests/postgres/vm.test.ts @@ -35,13 +35,12 @@ afterEach(async () => { test('empty plugins', async () => { const indexJs = '' - const libJs = '' - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs, libJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) - expect(Object.keys(vm).sort()).toEqual(['methods', 'tasks', 'vm']) + expect(Object.keys(vm).sort()).toEqual(['methods', 'tasks', 'vmPromise']) expect(Object.keys(vm.methods).sort()).toEqual(['processEvent', 'processEventBatch']) - expect(vm.methods.processEvent).toEqual(undefined) - expect(vm.methods.processEventBatch).toEqual(undefined) + expect(await vm.methods.processEvent(defaultEvent)).toEqual(defaultEvent) + expect(await vm.methods.processEventBatch([defaultEvent])).toEqual([defaultEvent]) }) test('setupPlugin sync', async () => { @@ -55,7 +54,7 @@ test('setupPlugin sync', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const newEvent = await vm.methods.processEvent({ ...defaultEvent }) expect(newEvent.event).toEqual('haha') }) @@ -72,7 +71,7 @@ test('setupPlugin async', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const newEvent = await vm.methods.processEvent({ ...defaultEvent }) expect(newEvent.event).toEqual('haha') }) @@ -85,7 +84,7 @@ test('processEvent', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) expect(vm.methods.processEvent).not.toEqual(undefined) expect(vm.methods.processEventBatch).not.toEqual(undefined) @@ -118,7 +117,7 @@ test('async processEvent', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) expect(vm.methods.processEvent).not.toEqual(undefined) expect(vm.methods.processEventBatch).not.toEqual(undefined) @@ -153,7 +152,7 @@ test('processEventBatch', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) expect(vm.methods.processEvent).not.toEqual(undefined) expect(vm.methods.processEventBatch).not.toEqual(undefined) @@ -188,7 +187,7 @@ test('async processEventBatch', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) expect(vm.methods.processEvent).not.toEqual(undefined) expect(vm.methods.processEventBatch).not.toEqual(undefined) @@ -227,7 +226,7 @@ test('processEvent && processEventBatch', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) expect(vm.methods.processEvent).not.toEqual(undefined) expect(vm.methods.processEventBatch).not.toEqual(undefined) @@ -259,7 +258,7 @@ test('processEvent without returning', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) expect(vm.methods.processEvent).not.toEqual(undefined) const event: PluginEvent = { @@ -284,7 +283,7 @@ test('async processEvent', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, @@ -304,7 +303,7 @@ test('module.exports override', async () => { module.exports = { processEvent: myProcessEventFunction } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, @@ -324,7 +323,7 @@ test('module.exports set', async () => { module.exports.processEvent = myProcessEventFunction ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, @@ -344,7 +343,7 @@ test('exports override', async () => { exports = { processEvent: myProcessEventFunction } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, event: 'original event', @@ -363,7 +362,7 @@ test('exports set', async () => { exports.processEvent = myProcessEventFunction ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, event: 'original event', @@ -381,7 +380,7 @@ test('meta.config', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, event: 'original event', @@ -405,7 +404,7 @@ test('meta.cache set/get', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, event: 'original event', @@ -443,7 +442,7 @@ test('meta.storage set/get', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, event: 'original event', @@ -474,7 +473,7 @@ test('meta.cache expire', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, event: 'original event', @@ -506,7 +505,7 @@ test('meta.cache set ttl', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, event: 'original event', @@ -537,7 +536,7 @@ test('meta.cache incr', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, event: 'original event', @@ -554,28 +553,6 @@ test('meta.cache incr', async () => { expect(event.properties!['counter']).toEqual(3) }) -test('lib.js (deprecated)', async () => { - const indexJs = ` - async function processEvent (event, meta) { - event.event = libraryFunction(event.event) - return event - } - ` - const libJs = ` - function libraryFunction (string) { - return string.split("").reverse().join("") - } - ` - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs, libJs) - const event: PluginEvent = { - ...defaultEvent, - event: 'original event', - } - await vm.methods.processEvent(event) - - expect(event.event).toEqual('tneve lanigiro') -}) - test('console.log', async () => { console.log = jest.fn() console.error = jest.fn() @@ -593,7 +570,7 @@ test('console.log', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, event: 'logged event', @@ -616,7 +593,7 @@ test('fetch', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) const event: PluginEvent = { ...defaultEvent, event: 'fetched', @@ -642,7 +619,7 @@ test('attachments', async () => { contents: Buffer.from('{"name": "plugin"}'), }, } - const vm = await createPluginConfigVM( + const vm = createPluginConfigVM( mockServer, { ...pluginConfig39, @@ -673,7 +650,10 @@ test('runEvery', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) + + expect(Object.keys(vm.tasks)).toEqual([]) + await vm.vmPromise expect(Object.keys(vm.tasks)).toEqual(['runEveryMinute', 'runEveryHour', 'runEveryDay']) expect(Object.values(vm.tasks).map((v) => v?.name)).toEqual(['runEveryMinute', 'runEveryHour', 'runEveryDay']) @@ -690,7 +670,10 @@ test('runEvery must be a function', async () => { const runEveryDay = { some: 'object' } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) + + expect(Object.keys(vm.tasks)).toEqual([]) + await vm.vmPromise expect(Object.keys(vm.tasks)).toEqual(['runEveryMinute']) expect(Object.values(vm.tasks).map((v) => v?.name)).toEqual(['runEveryMinute']) @@ -706,7 +689,8 @@ test('posthog in runEvery', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) + await vm.vmPromise expect(Client).not.toHaveBeenCalled @@ -745,7 +729,8 @@ test('posthog in runEvery with timestamp', async () => { } ` await resetTestDatabase(indexJs) - const vm = await createPluginConfigVM(mockServer, pluginConfig39, indexJs) + const vm = createPluginConfigVM(mockServer, pluginConfig39, indexJs) + await vm.vmPromise expect(Client).not.toHaveBeenCalled