Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor exportEvents buffer #8573

Merged
merged 10 commits into from
Feb 16, 2022
83 changes: 42 additions & 41 deletions plugin-server/README.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export function getDefaultConfig(): PluginsServerConfig {
NEW_PERSON_PROPERTIES_UPDATE_ENABLED: false,
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: true,
EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED: true,
MAX_PENDING_PROMISES_PER_WORKER: 100,
}
}

Expand Down Expand Up @@ -146,6 +147,8 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> {
'(advanced) teams for which to run the new person properties update flow on',
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: '(advanced) enable experimental feature to track lastSeenAt',
EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED: '(advanced) enable experimental feature to track event properties',
MAX_PENDING_PROMISES_PER_WORKER:
'(advanced) maximum number of promises that a worker can have running at once in the background. currently only targets the exportEvents buffer.',
}
}

Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { TeamManager } from './worker/ingestion/team-manager'
import { PluginsApiKeyManager } from './worker/vm/extensions/helpers/api-key-manager'
import { RootAccessManager } from './worker/vm/extensions/helpers/root-acess-manager'
import { LazyPluginVM } from './worker/vm/lazy'
import { PromiseManager } from './worker/vm/promise-manager'

export enum LogLevel {
None = 'none',
Expand Down Expand Up @@ -108,6 +109,7 @@ export interface PluginsServerConfig extends Record<string, any> {
NEW_PERSON_PROPERTIES_UPDATE_ENABLED: boolean
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: boolean
EXPERIMENTAL_EVENT_PROPERTY_TRACKER_ENABLED: boolean
MAX_PENDING_PROMISES_PER_WORKER: number
}

export interface Hub extends PluginsServerConfig {
Expand Down Expand Up @@ -138,6 +140,7 @@ export interface Hub extends PluginsServerConfig {
organizationManager: OrganizationManager
pluginsApiKeyManager: PluginsApiKeyManager
rootAccessManager: RootAccessManager
promiseManager: PromiseManager
actionManager: ActionManager
actionMatcher: ActionMatcher
hookCannon: HookCommander
Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { status } from '../status'
import { createPostgresPool, createRedis, logOrThrowJobQueueError, UUIDT } from '../utils'
import { PluginsApiKeyManager } from './../../worker/vm/extensions/helpers/api-key-manager'
import { RootAccessManager } from './../../worker/vm/extensions/helpers/root-acess-manager'
import { PromiseManager } from './../../worker/vm/promise-manager'
import { PluginMetricsManager } from './../plugin-metrics'
import { DB } from './db'
import { KafkaProducerWrapper } from './kafka-producer-wrapper'
Expand Down Expand Up @@ -190,6 +191,7 @@ export async function createHub(
const organizationManager = new OrganizationManager(db)
const pluginsApiKeyManager = new PluginsApiKeyManager(db)
const rootAccessManager = new RootAccessManager(db)
const promiseManager = new PromiseManager(serverConfig)
const actionManager = new ActionManager(db)
await actionManager.prepare()

Expand Down Expand Up @@ -217,6 +219,7 @@ export async function createHub(
organizationManager,
pluginsApiKeyManager,
rootAccessManager,
promiseManager,
actionManager,
actionMatcher: new ActionMatcher(db, actionManager, statsd),
hookCannon: new HookCommander(db, teamManager, organizationManager, statsd),
Expand Down
25 changes: 25 additions & 0 deletions plugin-server/src/worker/vm/promise-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { PluginsServerConfig } from '../../types'

export class PromiseManager {
pendingPromises: Set<Promise<any>>
config: PluginsServerConfig

constructor(config: PluginsServerConfig) {
this.pendingPromises = new Set()
this.config = config
}

public trackPromise(promise: Promise<any>): void {
this.pendingPromises.add(promise)

promise.finally(() => {
this.pendingPromises.delete(promise)
})
}

public async awaitPromisesIfNeeded() {
yakkomajuri marked this conversation as resolved.
Show resolved Hide resolved
while (this.pendingPromises.size > this.config.MAX_PENDING_PROMISES_PER_WORKER) {
yakkomajuri marked this conversation as resolved.
Show resolved Hide resolved
await Promise.any(this.pendingPromises)
}
}
}
33 changes: 15 additions & 18 deletions plugin-server/src/worker/vm/upgrades/export-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { Plugin, PluginEvent, PluginMeta, RetryError } from '@posthog/plugin-sca
import { Hub, MetricMathOperations, PluginConfig, PluginConfigVMInternalResponse, PluginTaskType } from '../../../types'
import { status } from '../../../utils/status'
import { determineNodeEnv, stringClamp } from '../../../utils/utils'
import { createBuffer } from '../utils'
import { NodeEnv } from './../../../utils/utils'
import { ExportEventsBuffer } from './utils/export-events-buffer'

const MAXIMUM_RETRIES = 15
const EXPORT_BUFFER_BYTES_MINIMUM = 1
Expand All @@ -16,7 +16,7 @@ const EXPORT_BUFFER_SECONDS_DEFAULT = determineNodeEnv() === NodeEnv.Test ? EXPO

type ExportEventsUpgrade = Plugin<{
global: {
exportEventsBuffer: ReturnType<typeof createBuffer>
exportEventsBuffer: ExportEventsBuffer
exportEventsToIgnore: Set<string>
exportEventsWithRetry: (payload: ExportEventsJobPayload, meta: PluginMeta<ExportEventsUpgrade>) => Promise<void>
}
Expand Down Expand Up @@ -76,22 +76,19 @@ export function upgradeExportEvents(
: null
)

meta.global.exportEventsBuffer = createBuffer(
{
limit: uploadBytes,
timeoutSeconds: uploadSeconds,
onFlush: async (batch) => {
const jobPayload = {
batch,
batchId: Math.floor(Math.random() * 1000000),
retriesPerformedSoFar: 0,
}
// Running the first export code directly, without a job in between
await meta.global.exportEventsWithRetry(jobPayload, meta)
},
meta.global.exportEventsBuffer = new ExportEventsBuffer(hub, {
limit: uploadBytes,
timeoutSeconds: uploadSeconds,
onFlush: async (batch) => {
const jobPayload = {
batch,
batchId: Math.floor(Math.random() * 1000000),
retriesPerformedSoFar: 0,
}
// Running the first export code directly, without a job in between
await meta.global.exportEventsWithRetry(jobPayload, meta)
},
hub.statsd
)
})

meta.global.exportEventsWithRetry = async (
payload: ExportEventsJobPayload,
Expand Down Expand Up @@ -157,7 +154,7 @@ export function upgradeExportEvents(
const oldOnEvent = methods.onEvent
methods.onEvent = async (event: PluginEvent) => {
if (!meta.global.exportEventsToIgnore.has(event.event)) {
meta.global.exportEventsBuffer.add(event, JSON.stringify(event).length)
await meta.global.exportEventsBuffer.add(event, JSON.stringify(event).length)
}
await oldOnEvent?.(event)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
ExportHistoricalEventsUpgrade,
fetchEventsForInterval,
fetchTimestampBoundariesForTeam,
} from './utils'
} from '../utils/utils'

const TEN_MINUTES = 1000 * 60 * 10
const EVENTS_TIME_INTERVAL = TEN_MINUTES
Expand Down
68 changes: 68 additions & 0 deletions plugin-server/src/worker/vm/upgrades/utils/export-events-buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { Hub } from 'types'

export type BufferOptions = {
limit: number
timeoutSeconds: number
onFlush?: (objects: any[], points: number) => void | Promise<void>
}

export class ExportEventsBuffer {
buffer: any[]
timeout: NodeJS.Timeout | null
points: number
options: BufferOptions
hub: Hub

constructor(hub: Hub, opts?: Partial<BufferOptions>) {
this.buffer = []
this.timeout = null
this.points = 0
this.options = {
limit: 10,
timeoutSeconds: 60,
...opts,
}
this.hub = hub
}

public async add(object: any, points = 1): Promise<void> {
// flush existing if adding would make us go over the limit
if (this.points && this.points + points > this.options.limit) {
await this.flush()
}

// add the object to the buffer
this.points += points
this.buffer.push(object)

if (this.points > this.options.limit) {
// flush (again?) if we are now over the limit
await this.flush()
} else if (!this.timeout) {
// if not, make sure there's a flush timeout
this.timeout = setTimeout(async () => await this.flush(), this.options.timeoutSeconds * 1000)
}
}

public async flush(): Promise<void> {
this.hub.statsd?.increment(`buffer_voided_promises`, { instanceId: this.hub.instanceId.toString() })

const oldBuffer = this.buffer
const oldPoints = this.points
this.buffer = []
this.points = 0

this.hub.promiseManager.trackPromise(this._flush(oldBuffer, oldPoints, new Date()))
await this.hub.promiseManager.awaitPromisesIfNeeded()
}

public async _flush(oldBuffer: any[], oldPoints: number, timer: Date): Promise<void> {
if (this.timeout) {
clearTimeout(this.timeout)
this.timeout = null
}

await this.options.onFlush?.(oldBuffer, oldPoints)
this.hub.statsd?.timing(`buffer_promise_duration`, timer)
}
}
62 changes: 0 additions & 62 deletions plugin-server/src/worker/vm/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,65 +73,3 @@ export const addPublicJobIfNotExists = async (
'addPublicJobIfNotExists'
)
}

type BufferOptions = {
limit: number
timeoutSeconds: number
onFlush: (objects: any[], points: number) => void | Promise<void>
}

export function createBuffer(opts: Partial<BufferOptions>, statsd?: StatsD) {
const buffer = {
_buffer: [] as any[],
_timeout: null as NodeJS.Timeout | null,
_lastFlushTriggered: new Date(),
_points: 0,
_options: {
limit: 10,
timeoutSeconds: 60,
...opts,
} as BufferOptions,
add: (object: any, points = 1) => {
// flush existing if adding would make us go over the limit
if (buffer._points && buffer._points + points > buffer._options.limit) {
buffer.triggerFlushInstrumented()
}

// add the object to the buffer
buffer._points += points
buffer._buffer.push(object)

if (buffer._points > buffer._options.limit) {
// flush (again?) if we are now over the limit
buffer.triggerFlushInstrumented()
} else if (!buffer._timeout) {
// if not, make sure there's a flush timeout
buffer._timeout = setTimeout(
() => buffer.triggerFlushInstrumented(),
buffer._options.timeoutSeconds * 1000
)
}
},
triggerFlushInstrumented: () => {
statsd?.increment(`buffer_voided_promises`)
buffer._lastFlushTriggered = new Date()
void buffer.flush()
},
flush: async (): Promise<void> => {
if (buffer._timeout) {
clearTimeout(buffer._timeout)
buffer._timeout = null
}
if (buffer._buffer.length > 0 || buffer._points !== 0) {
const oldBuffer = buffer._buffer
const oldPoints = buffer._points
buffer._buffer = []
buffer._points = 0
await buffer._options.onFlush?.(oldBuffer, oldPoints)
}
statsd?.timing(`buffer_promise_duration`, buffer._lastFlushTriggered)
},
}

return buffer
}
Loading