Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Lazy VMs #220

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/vm/memory.benchmark.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { PluginEvent } from '@posthog/plugin-scaffold/src/types'

import { createServer } from '../../src/server'
import { Plugin, PluginConfig, PluginConfigVMReponse } from '../../src/types'
import { Plugin, PluginConfig, PluginConfigLazyVMResponse, PluginConfigVMResponse } from '../../src/types'
import { createPluginConfigVM } from '../../src/vm/vm'
import { commonOrganizationId } from '../../tests/helpers/plugins'

Expand Down Expand Up @@ -71,10 +71,10 @@ test('test vm memory usage', async () => {
const usedAtStart = getUsed()

let used = usedAtStart
const vms: PluginConfigVMReponse[] = []
const vms: PluginConfigLazyVMResponse[] = []

for (let i = 0; i < numVMs; i++) {
const vm = await createPluginConfigVM(server, mockConfig, indexJs)
const vm = createPluginConfigVM(server, mockConfig, indexJs)
vms.push(vm)

if (debug || i === numVMs - 1) {
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"scripts": {
"test": "jest --runInBand tests/**/*.test.ts",
"test:postgres": "jest --runInBand tests/postgres/*.test.ts tests/*.test.ts",
"test:clickhouse": "yarn test:clickhouse:1 && yarn test:clickhouse:2",
"test:clickhouse:1": "jest --runInBand tests/clickhouse/postgres-parity.test.ts tests/clickhouse/e2e.test.ts tests/clickhouse/ingestion-utils.test.ts",
"test:clickhouse:2": "jest --runInBand tests/clickhouse/process-event.test.ts",
"benchmark": "yarn run benchmarks:clickhouse && yarn run benchmark:postgres && yarn run benchmarks:vm",
Expand All @@ -16,8 +17,8 @@
"benchmark:vm:worker": "node --expose-gc node_modules/.bin/jest --runInBand benchmarks/vm/worker.benchmark.ts",
"start": "yarn start:dev",
"start:dist": "node dist/index.js --base-dir ../posthog",
"start:dev": "ts-node-dev --exit-child src/index.ts --base-dir ../posthog",
"start:dev:ee": "KAFKA_ENABLED=true KAFKA_HOSTS=localhost:9092 yarn start:dev",
"start:dev": "WORKER_CONCURRENCY=4 ts-node-dev --exit-child src/index.ts --base-dir ../posthog",
"start:dev:ee": "WORKER_CONCURRENCY=4 KAFKA_ENABLED=true KAFKA_HOSTS=localhost:9092 yarn start:dev",
"build": "yarn clean && yarn compile",
"clean": "rimraf dist/*",
"compile:protobuf": "cd src/idl/ && rimraf protos.* && pbjs -t static-module -w commonjs -o protos.js *.proto && pbts -o protos.d.ts protos.js && eslint --fix . && prettier --write .",
Expand Down
47 changes: 16 additions & 31 deletions src/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ export async function setupPlugins(server: PluginsServer): Promise<void> {
const pluginConfigRows = await getPluginConfigRows(server)
const foundPluginConfigs = new Map<number, boolean>()
server.pluginConfigsPerTeam.clear()
server.defaultConfigs = []
for (const row of pluginConfigRows) {
const plugin = server.plugins.get(row.plugin_id)
if (!plugin) {
Expand All @@ -56,15 +55,16 @@ export async function setupPlugins(server: PluginsServer): Promise<void> {
server.pluginConfigs.set(row.id, pluginConfig)

if (!row.team_id) {
server.defaultConfigs.push(row)
} else {
let teamConfigs = server.pluginConfigsPerTeam.get(row.team_id)
if (!teamConfigs) {
teamConfigs = []
server.pluginConfigsPerTeam.set(row.team_id, teamConfigs)
}
teamConfigs.push(pluginConfig)
console.error(`🔴 PluginConfig(id=${row.id}) without team_id!`)
continue
}

let teamConfigs = server.pluginConfigsPerTeam.get(row.team_id)
if (!teamConfigs) {
teamConfigs = []
server.pluginConfigsPerTeam.set(row.team_id, teamConfigs)
}
teamConfigs.push(pluginConfig)
}
for (const [id, pluginConfig] of server.pluginConfigs) {
if (!foundPluginConfigs.has(id)) {
Expand All @@ -86,15 +86,7 @@ export async function setupPlugins(server: PluginsServer): Promise<void> {

const sortFunction = (a: PluginConfig, b: PluginConfig) => a.order - b.order
for (const teamId of server.pluginConfigsPerTeam.keys()) {
if (server.defaultConfigs.length > 0) {
const combinedPluginConfigs = [
...(server.pluginConfigsPerTeam.get(teamId) || []),
...server.defaultConfigs,
].sort(sortFunction)
server.pluginConfigsPerTeam.set(teamId, combinedPluginConfigs)
} else {
server.pluginConfigsPerTeam.get(teamId)?.sort(sortFunction)
}
server.pluginConfigsPerTeam.get(teamId)?.sort(sortFunction)
}
}

Expand Down Expand Up @@ -137,14 +129,9 @@ async function loadPlugin(server: PluginsServer, pluginConfig: PluginConfig): Pr
const jsPath = path.resolve(pluginPath, config['main'] || 'index.js')
const indexJs = fs.readFileSync(jsPath).toString()

const libPath = path.resolve(pluginPath, config['lib'] || 'lib.js')
const libJs = fs.existsSync(libPath) ? fs.readFileSync(libPath).toString() : ''
if (libJs) {
console.warn(`⚠️ Using "lib.js" is deprecated! Used by: ${plugin.name} (${plugin.url})`)
}

try {
pluginConfig.vm = await createPluginConfigVM(server, pluginConfig, indexJs, libJs)
pluginConfig.vm = createPluginConfigVM(server, pluginConfig, indexJs)
await pluginConfig.vm.vmPromise //
status.info('🔌', `Loaded local plugin "${plugin.name}" from "${pluginPath}"!`)
await clearError(server, pluginConfig)
return true
Expand All @@ -165,14 +152,11 @@ async function loadPlugin(server: PluginsServer, pluginConfig: PluginConfig): Pr
}

const indexJs = await getFileFromArchive(archive, config['main'] || 'index.js')
const libJs = await getFileFromArchive(archive, config['lib'] || 'lib.js')
if (libJs) {
console.warn(`⚠️ Using "lib.js" is deprecated! Used by: ${plugin.name} (${plugin.url})`)
}

if (indexJs) {
try {
pluginConfig.vm = await createPluginConfigVM(server, pluginConfig, indexJs, libJs || '')
pluginConfig.vm = createPluginConfigVM(server, pluginConfig, indexJs)
await pluginConfig.vm.vmPromise
status.info('🔌', `Loaded plugin "${plugin.name}"!`)
await clearError(server, pluginConfig)
return true
Expand All @@ -184,7 +168,8 @@ async function loadPlugin(server: PluginsServer, pluginConfig: PluginConfig): Pr
}
} else if (plugin.plugin_type === 'source' && plugin.source) {
try {
pluginConfig.vm = await createPluginConfigVM(server, pluginConfig, plugin.source)
pluginConfig.vm = createPluginConfigVM(server, pluginConfig, plugin.source)
await pluginConfig.vm.vmPromise
status.info('🔌', `Loaded plugin "${plugin.name}"!`)
await clearError(server, pluginConfig)
return true
Expand Down
13 changes: 11 additions & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export interface PluginConfig {
config: Record<string, unknown>
error?: PluginError
attachments?: Record<string, PluginAttachment>
vm?: PluginConfigVMReponse | null
vm?: PluginConfigLazyVMResponse | null
}

export interface PluginJsonConfig {
Expand Down Expand Up @@ -156,7 +156,16 @@ export interface PluginTask {
exec: () => Promise<any>
}

export interface PluginConfigVMReponse {
export interface PluginConfigLazyVMResponse {
vmPromise: Promise<VM>
methods: {
processEvent: (event: PluginEvent) => Promise<PluginEvent>
processEventBatch: (batch: PluginEvent[]) => Promise<PluginEvent[]>
}
tasks: Record<string, PluginTask>
}

export interface PluginConfigVMResponse {
vm: VM
methods: {
processEvent: (event: PluginEvent) => Promise<PluginEvent>
Expand Down
39 changes: 30 additions & 9 deletions src/vm/vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,43 @@ import { randomBytes } from 'crypto'
import fetch from 'node-fetch'
import { VM } from 'vm2'

import { PluginConfig, PluginConfigVMReponse, PluginsServer } from '../types'
import { PluginConfig, PluginConfigLazyVMResponse, PluginConfigVMResponse, PluginsServer } from '../types'
import { createCache } from './extensions/cache'
import { createConsole } from './extensions/console'
import { createGoogle } from './extensions/google'
import { createPosthog } from './extensions/posthog'
import { createStorage } from './extensions/storage'
import { secureCode } from './transforms'

export async function createPluginConfigVM(
export function createPluginConfigVM(
server: PluginsServer,
pluginConfig: PluginConfig, // NB! might have team_id = 0
indexJs: string,
libJs = ''
): Promise<PluginConfigVMReponse> {
const rawCode = libJs ? `${libJs};${indexJs}` : indexJs
const securedCode = secureCode(rawCode, server)
indexJs: string
): PluginConfigLazyVMResponse {
const createVmPromise = createActualVM(server, pluginConfig, indexJs)

const lazyResponse: PluginConfigLazyVMResponse = {
vmPromise: createVmPromise.then((vm) => vm.vm),
methods: {
processEvent: (event) => createVmPromise.then((vm) => vm.methods.processEvent(event)),
processEventBatch: (batch) => createVmPromise.then((vm) => vm.methods.processEventBatch(batch)),
},
tasks: {}, // new Proxy(),
}

void createVmPromise.then(({ tasks }) => {
lazyResponse.tasks = tasks
})

return lazyResponse
}

export async function createActualVM(
server: PluginsServer,
pluginConfig: PluginConfig, // NB! might have team_id = 0
indexJs: string
): Promise<PluginConfigVMResponse> {
const securedCode = secureCode(indexJs, server)

// Create virtual machine
const vm = new VM({
Expand Down Expand Up @@ -131,8 +152,8 @@ export async function createPluginConfigVM(

// export various functions
const __methods = {
processEvent: __asyncFunctionGuard(__bindMeta('processEvent')),
processEventBatch: __asyncFunctionGuard(__bindMeta('processEventBatch'))
processEvent: __asyncFunctionGuard(__bindMeta('processEvent')) || ((event) => event),
processEventBatch: __asyncFunctionGuard(__bindMeta('processEventBatch')) || ((batch) => batch)
};

// gather the runEveryX commands and export in __tasks
Expand Down
Loading