diff --git a/benchmarks/postgres/ingestion.benchmark.ts b/benchmarks/postgres/ingestion.benchmark.ts index 9c687411..d7e80adb 100644 --- a/benchmarks/postgres/ingestion.benchmark.ts +++ b/benchmarks/postgres/ingestion.benchmark.ts @@ -52,6 +52,7 @@ describe('ingestion benchmarks', () => { LOG_LEVEL: LogLevel.Log, }) eventsProcessor = new EventsProcessor(hub) + await eventsProcessor.prepare() team = await getFirstTeam(hub) now = DateTime.utc() diff --git a/src/config/config.ts b/src/config/config.ts index 37743f10..043950c8 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -47,7 +47,7 @@ export function getDefaultConfig(): PluginsServerConfig { WORKER_CONCURRENCY: coreCount, TASK_TIMEOUT: 30, TASKS_PER_WORKER: 10, - LOG_LEVEL: LogLevel.Info, + LOG_LEVEL: isTestEnv ? LogLevel.Warn : LogLevel.Info, SENTRY_DSN: null, STATSD_HOST: null, STATSD_PORT: 8125, diff --git a/src/main/pluginsServer.ts b/src/main/pluginsServer.ts index 42c8df01..3fd90256 100644 --- a/src/main/pluginsServer.ts +++ b/src/main/pluginsServer.ts @@ -2,7 +2,6 @@ import { ReaderModel } from '@maxmind/geoip2-node' import Piscina from '@posthog/piscina' import * as Sentry from '@sentry/node' import { FastifyInstance } from 'fastify' -import Redis from 'ioredis' import net, { AddressInfo } from 'net' import * as schedule from 'node-schedule' @@ -10,8 +9,9 @@ import { defaultConfig } from '../config/config' import { Hub, JobQueueConsumerControl, PluginsServerConfig, Queue, ScheduleControl } from '../types' import { createHub } from '../utils/db/hub' import { killProcess } from '../utils/kill' +import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' -import { createRedis, delay, getPiscinaStats } from '../utils/utils' +import { delay, getPiscinaStats } from '../utils/utils' import { startQueue } from './ingestion-queues/queue' import { startJobQueueConsumer } from './job-queues/job-queue-consumer' import { createMmdbServer, performMmdbStalenessCheck, prepareMmdb } from './services/mmdb' @@ -41,9 +41,10 @@ export async function startPluginsServer( status.info('ℹī¸', `${serverConfig.WORKER_CONCURRENCY} workers, ${serverConfig.TASKS_PER_WORKER} tasks per worker`) - let pubSub: Redis.Redis | undefined + let pubSub: PubSub | undefined let hub: Hub | undefined let fastifyInstance: FastifyInstance | undefined + let actionsReloadJob: schedule.Job | undefined let pingJob: schedule.Job | undefined let statsJob: schedule.Job | undefined let piscina: Piscina | undefined @@ -73,7 +74,8 @@ export async function startPluginsServer( } lastActivityCheck && clearInterval(lastActivityCheck) await queue?.stop() - await pubSub?.quit() + await pubSub?.stop() + actionsReloadJob && schedule.cancelJob(actionsReloadJob) pingJob && schedule.cancelJob(pingJob) statsJob && schedule.cancelJob(statsJob) await jobQueueConsumer?.stop() @@ -141,23 +143,29 @@ export async function startPluginsServer( void jobQueueConsumer?.resume() }) - // use one extra connection for redis pubsub - pubSub = await createRedis(hub) - await pubSub.subscribe(hub.PLUGINS_RELOAD_PUBSUB_CHANNEL) - pubSub.on('message', async (channel: string, message) => { - if (channel === hub!.PLUGINS_RELOAD_PUBSUB_CHANNEL) { + // use one extra Redis connection for pub-sub + pubSub = new PubSub(hub, { + [hub.PLUGINS_RELOAD_PUBSUB_CHANNEL]: async () => { status.info('⚡', 'Reloading plugins!') - await piscina?.broadcastTask({ task: 'reloadPlugins' }) await scheduleControl?.reloadSchedule() - } + }, + 'reload-action': async (message) => + await piscina?.broadcastTask({ task: 'reloadAction', args: { actionId: parseInt(message) } }), + 'drop-action': async (message) => + await piscina?.broadcastTask({ task: 'dropAction', args: { actionId: parseInt(message) } }), }) + await pubSub.start() if (hub.jobQueueManager) { const queueString = hub.jobQueueManager.getJobQueueTypesAsString() await hub!.db!.redisSet('@posthog-plugin-server/enabled-job-queues', queueString) } + // every 5 minutes all ActionManager caches are reloaded for eventual consistency + actionsReloadJob = schedule.scheduleJob('*/5 * * * *', async () => { + await piscina?.broadcastTask({ task: 'reloadAllActions' }) + }) // every 5 seconds set Redis keys @posthog-plugin-server/ping and @posthog-plugin-server/version pingJob = schedule.scheduleJob('*/5 * * * * *', async () => { await hub!.db!.redisSet('@posthog-plugin-server/ping', new Date().toISOString(), 60, { diff --git a/src/types.ts b/src/types.ts index 23e1f0c5..48732879 100644 --- a/src/types.ts +++ b/src/types.ts @@ -16,12 +16,21 @@ import { EventsProcessor } from './worker/ingestion/process-event' import { LazyPluginVM } from './worker/vm/lazy' export enum LogLevel { + None = 'none', Debug = 'debug', Info = 'info', Log = 'log', Warn = 'warn', Error = 'error', - None = 'none', +} + +export const logLevelToNumber: Record = { + [LogLevel.None]: 0, + [LogLevel.Debug]: 10, + [LogLevel.Info]: 20, + [LogLevel.Log]: 30, + [LogLevel.Warn]: 40, + [LogLevel.Error]: 50, } export interface PluginsServerConfig extends Record { @@ -318,6 +327,7 @@ export interface RawOrganization { name: string created_at: string updated_at: string + available_features: string[] } /** Usable Team model. */ @@ -429,6 +439,101 @@ export interface CohortPeople { person_id: number } +/** Sync with posthog/frontend/src/types.ts */ +export enum PropertyOperator { + Exact = 'exact', + IsNot = 'is_not', + IContains = 'icontains', + NotIContains = 'not_icontains', + Regex = 'regex', + NotRegex = 'not_regex', + GreaterThan = 'gt', + LessThan = 'lt', + IsSet = 'is_set', + IsNotSet = 'is_not_set', +} + +/** Sync with posthog/frontend/src/types.ts */ +interface BasePropertyFilter { + key: string + value: string | number | Array | null + label?: string +} + +/** Sync with posthog/frontend/src/types.ts */ +export interface EventPropertyFilter extends BasePropertyFilter { + type: 'event' + operator: PropertyOperator +} + +/** Sync with posthog/frontend/src/types.ts */ +export interface PersonPropertyFilter extends BasePropertyFilter { + type: 'person' + operator: PropertyOperator +} + +/** Sync with posthog/frontend/src/types.ts */ +export interface ElementPropertyFilter extends BasePropertyFilter { + type: 'element' + key: 'tag_name' | 'text' | 'href' | 'selector' + operator: PropertyOperator +} + +/** Sync with posthog/frontend/src/types.ts */ +export interface CohortPropertyFilter extends BasePropertyFilter { + type: 'cohort' + key: 'id' + value: number +} + +/** Sync with posthog/frontend/src/types.ts */ +export type ActionStepProperties = + | EventPropertyFilter + | PersonPropertyFilter + | ElementPropertyFilter + | CohortPropertyFilter + +/** Sync with posthog/frontend/src/types.ts */ +export enum ActionStepUrlMatching { + Contains = 'contains', + Regex = 'regex', + Exact = 'exact', +} + +export interface ActionStep { + id: number + action_id: number + tag_name: string | null + text: string | null + href: string | null + selector: string | null + url: string | null + url_matching: ActionStepUrlMatching | null + name: string | null + event: string | null + properties: ActionStepProperties[] | null +} + +/** Raw Action row from database. */ +export interface RawAction { + id: number + team_id: TeamId + name: string | null + created_at: string + created_by_id: number | null + deleted: boolean + post_to_slack: boolean + slack_message_format: string + is_calculating: boolean + updated_at: string + last_calculated_at: string +} + +/** Usable Action model. */ +export interface Action extends RawAction { + steps: ActionStep[] +} + export interface SessionRecordingEvent { uuid: string timestamp: string diff --git a/src/utils/db/db.ts b/src/utils/db/db.ts index 9502bbbf..1cf47cc3 100644 --- a/src/utils/db/db.ts +++ b/src/utils/db/db.ts @@ -10,6 +10,8 @@ import { Pool, PoolClient, QueryConfig, QueryResult, QueryResultRow } from 'pg' import { KAFKA_PERSON, KAFKA_PERSON_UNIQUE_ID, KAFKA_PLUGIN_LOG_ENTRIES } from '../../config/kafka-topics' import { + Action, + ActionStep, ClickHouseEvent, ClickHousePerson, ClickHousePersonDistinctId, @@ -26,6 +28,7 @@ import { PluginLogEntryType, PostgresSessionRecordingEvent, PropertyDefinitionType, + RawAction, RawOrganization, RawPerson, SessionRecordingEvent, @@ -705,14 +708,63 @@ export class DB { return entry } + // EventDefinition + public async fetchEventDefinitions(): Promise { return (await this.postgresQuery('SELECT * FROM posthog_eventdefinition', undefined, 'fetchEventDefinitions')) .rows as EventDefinitionType[] } + // PropertyDefinition + public async fetchPropertyDefinitions(): Promise { return ( await this.postgresQuery('SELECT * FROM posthog_propertydefinition', undefined, 'fetchPropertyDefinitions') ).rows as PropertyDefinitionType[] } + + // Action & ActionStep + + public async fetchAllActionsMap(): Promise> { + const rawActions: RawAction[] = ( + await this.postgresQuery(`SELECT * FROM posthog_action WHERE deleted = FALSE`, undefined, 'fetchActions') + ).rows + const actionSteps: ActionStep[] = ( + await this.postgresQuery( + `SELECT posthog_actionstep.*, posthog_action.deleted FROM posthog_actionstep + JOIN posthog_action ON (posthog_action.id = posthog_actionstep.action_id) + WHERE posthog_action.deleted = FALSE`, + undefined, + 'fetchActionSteps' + ) + ).rows + const actionsMap: Record = {} + for (const rawAction of rawActions) { + actionsMap[rawAction.id] = { ...rawAction, steps: [] } + } + for (const actionStep of actionSteps) { + if (actionStep.action_id in actionsMap) { + actionsMap[actionStep.action_id].steps.push(actionStep) + } + } + return actionsMap + } + + public async fetchAction(id: Action['id']): Promise { + const rawActions: RawAction[] = ( + await this.postgresQuery( + `SELECT * FROM posthog_action WHERE id = $1 AND deleted = FALSE`, + [id], + 'fetchActions' + ) + ).rows + if (!rawActions.length) { + return null + } + const steps: ActionStep[] = ( + await this.postgresQuery(`SELECT * FROM posthog_actionstep WHERE action_id = $1`, [id], 'fetchActionSteps') + ).rows + const action: Action = { ...rawActions[0], steps } + return action + } } diff --git a/src/utils/db/hub.ts b/src/utils/db/hub.ts index 3dc5fc1e..53ea72b5 100644 --- a/src/utils/db/hub.ts +++ b/src/utils/db/hub.ts @@ -175,6 +175,7 @@ export async function createHub( // :TODO: This is only used on worker threads, not main hub.eventsProcessor = new EventsProcessor(hub as Hub) + await hub.eventsProcessor.prepare() hub.jobQueueManager = new JobQueueManager(hub as Hub) try { diff --git a/src/utils/pubsub.ts b/src/utils/pubsub.ts new file mode 100644 index 00000000..8f49dcc8 --- /dev/null +++ b/src/utils/pubsub.ts @@ -0,0 +1,57 @@ +import { captureException } from '@sentry/node' +import { Redis } from 'ioredis' + +import { PluginsServerConfig } from '../types' +import { status } from './status' +import { createRedis } from './utils' + +export type PubSubTask = ((message: string) => void) | ((message: string) => Promise) + +export interface PubSubTaskMap { + [channel: string]: PubSubTask +} + +export class PubSub { + private serverConfig: PluginsServerConfig + private redis: Redis | null + public taskMap: PubSubTaskMap + + constructor(serverConfig: PluginsServerConfig, taskMap: PubSubTaskMap = {}) { + this.serverConfig = serverConfig + this.redis = null + this.taskMap = taskMap + } + + public async start(): Promise { + if (this.redis) { + throw new Error('Started PubSub cannot be started again!') + } + this.redis = await createRedis(this.serverConfig) + const channels = Object.keys(this.taskMap) + await this.redis.subscribe(channels) + this.redis.on('message', (channel: string, message: string) => { + const task: PubSubTask | undefined = this.taskMap[channel] + if (!task) { + captureException( + new Error( + `Received a pubsub message for unassociated channel ${channel}! Associated channels are: ${Object.keys( + this.taskMap + ).join(', ')}` + ) + ) + } + void task(message) + }) + status.info('👀', `Pub-sub started for channels: ${channels.join(', ')}`) + } + + public async stop(): Promise { + if (!this.redis) { + throw new Error('Unstarted PubSub cannot be stopped!') + } + await this.redis.unsubscribe() + this.redis.disconnect() + this.redis = null + status.info('🛑', `Pub-sub stopped for channels: ${Object.keys(this.taskMap).join(', ')}`) + } +} diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 7019c290..536b0689 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -5,7 +5,7 @@ import AdmZip from 'adm-zip' import { randomBytes } from 'crypto' import Redis, { RedisOptions } from 'ioredis' import { DateTime } from 'luxon' -import { Pool, PoolClient, PoolConfig } from 'pg' +import { Pool, PoolConfig } from 'pg' import { Readable } from 'stream' import * as tar from 'tar-stream' import * as zlib from 'zlib' diff --git a/src/worker/ingestion/action-manager.ts b/src/worker/ingestion/action-manager.ts new file mode 100644 index 00000000..33659926 --- /dev/null +++ b/src/worker/ingestion/action-manager.ts @@ -0,0 +1,68 @@ +import { Action, PluginsServerConfig } from '../../types' +import { DB } from '../../utils/db/db' +import { status } from '../../utils/status' + +type ActionCache = Record + +export class ActionManager { + private ready: boolean + private db: DB + private actionCache: ActionCache + + constructor(db: DB) { + this.ready = false + this.db = db + this.actionCache = {} + } + + public async prepare(): Promise { + await this.reloadAllActions() + this.ready = true + } + + public getAction(id: Action['id']): Action | undefined { + if (!this.ready) { + throw new Error('ActionManager is not ready! Run actionManager.prepare() before this') + } + return this.actionCache[id] + } + + public async reloadAllActions(): Promise { + this.actionCache = await this.db.fetchAllActionsMap() + status.info('đŸŋ', 'Fetched all actions from DB anew') + } + + public async reloadAction(id: Action['id']): Promise { + const refetchedAction = await this.db.fetchAction(id) + if (refetchedAction) { + status.info( + 'đŸŋ', + id in this.actionCache ? `Refetched action ID ${id} from DB` : `Fetched new action ID ${id} from DB` + ) + this.actionCache[id] = refetchedAction + } else if (id in this.actionCache) { + status.info( + 'đŸŋ', + `Tried to fetch action ID ${id} from DB, but it wasn't found in DB, so deleted from cache instead` + ) + delete this.actionCache[id] + } else { + status.info( + 'đŸŋ', + `Tried to fetch action ID ${id} from DB, but it wasn't found in DB or cache, so did nothing instead` + ) + } + } + + public dropAction(id: Action['id']): void { + if (id in this.actionCache) { + status.info('đŸŋ', `Deleted action ID ${id} from cache`) + delete this.actionCache[id] + } else { + status.info( + 'đŸŋ', + `Tried to delete action ID ${id} from cache, but it wasn't found in cache, so did nothing instead` + ) + } + } +} diff --git a/src/worker/ingestion/process-event.ts b/src/worker/ingestion/process-event.ts index 7f538c3c..24722edd 100644 --- a/src/worker/ingestion/process-event.ts +++ b/src/worker/ingestion/process-event.ts @@ -24,12 +24,15 @@ import { Client } from '../../utils/celery/client' import { DB } from '../../utils/db/db' import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper' import { elementsToString, personInitialAndUTMProperties, sanitizeEventName, timeoutGuard } from '../../utils/db/utils' +import { PubSub } from '../../utils/pubsub' import { status } from '../../utils/status' import { castTimestampOrNow, filterIncrementProperties, UUID, UUIDT } from '../../utils/utils' +import { ActionManager } from './action-manager' import { PersonManager } from './person-manager' import { TeamManager } from './team-manager' export class EventsProcessor { + ready: boolean pluginsServer: Hub db: DB clickhouse: ClickHouse | undefined @@ -38,8 +41,10 @@ export class EventsProcessor { posthog: ReturnType teamManager: TeamManager personManager: PersonManager + actionManager: ActionManager constructor(pluginsServer: Hub) { + this.ready = false this.pluginsServer = pluginsServer this.db = pluginsServer.db this.clickhouse = pluginsServer.clickhouse @@ -47,6 +52,7 @@ export class EventsProcessor { this.celery = new Client(pluginsServer.db, pluginsServer.CELERY_DEFAULT_QUEUE) this.teamManager = new TeamManager(pluginsServer.db) this.personManager = new PersonManager(pluginsServer) + this.actionManager = new ActionManager(pluginsServer.db) this.posthog = nodePostHog('sTMFPsFhdP1Ssg', { fetch }) if (process.env.NODE_ENV === 'test') { @@ -54,6 +60,11 @@ export class EventsProcessor { } } + public async prepare(): Promise { + await this.actionManager.prepare() + this.ready = true + } + public async processEvent( distinctId: string, ip: string | null, @@ -64,6 +75,9 @@ export class EventsProcessor { sentAt: DateTime | null, eventUuid: string ): Promise { + if (!this.ready) { + throw new Error('EventsProcessor is not ready! Run eventsProcessor.prepare() before this') + } if (!UUID.validateString(eventUuid, false)) { throw new Error(`Not a valid UUID: "${eventUuid}"`) } diff --git a/src/worker/tasks.ts b/src/worker/tasks.ts index e0d929d1..59f9ba78 100644 --- a/src/worker/tasks.ts +++ b/src/worker/tasks.ts @@ -1,57 +1,63 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' -import { EnqueuedJob, Hub, PluginTaskType } from '../types' +import { Action, EnqueuedJob, Hub, PluginTaskType } from '../types' import { ingestEvent } from './ingestion/ingest-event' import { runOnEvent, runOnSnapshot, runPluginTask, runProcessEvent, runProcessEventBatch } from './plugins/run' import { loadSchedule, setupPlugins } from './plugins/setup' import { teardownPlugins } from './plugins/teardown' -type TaskRunner = (server: Hub, args: any) => Promise | any +type TaskRunner = (hub: Hub, args: any) => Promise | any export const workerTasks: Record = { - hello: (server, args) => { - return `hello ${args}!` + onEvent: (hub, args: { event: PluginEvent }) => { + return runOnEvent(hub, args.event) }, - onEvent: (server, args: { event: PluginEvent }) => { - return runOnEvent(server, args.event) + onSnapshot: (hub, args: { event: PluginEvent }) => { + return runOnSnapshot(hub, args.event) }, - onSnapshot: (server, args: { event: PluginEvent }) => { - return runOnSnapshot(server, args.event) + processEvent: (hub, args: { event: PluginEvent }) => { + return runProcessEvent(hub, args.event) }, - processEvent: (server, args: { event: PluginEvent }) => { - return runProcessEvent(server, args.event) + processEventBatch: (hub, args: { batch: PluginEvent[] }) => { + return runProcessEventBatch(hub, args.batch) }, - processEventBatch: (server, args: { batch: PluginEvent[] }) => { - return runProcessEventBatch(server, args.batch) + runJob: (hub, { job }: { job: EnqueuedJob }) => { + return runPluginTask(hub, job.type, PluginTaskType.Job, job.pluginConfigId, job.payload) }, - runJob: (server, { job }: { job: EnqueuedJob }) => { - return runPluginTask(server, job.type, PluginTaskType.Job, job.pluginConfigId, job.payload) + runEveryMinute: (hub, args: { pluginConfigId: number }) => { + return runPluginTask(hub, 'runEveryMinute', PluginTaskType.Schedule, args.pluginConfigId) }, - runEveryMinute: (server, args: { pluginConfigId: number }) => { - return runPluginTask(server, 'runEveryMinute', PluginTaskType.Schedule, args.pluginConfigId) + runEveryHour: (hub, args: { pluginConfigId: number }) => { + return runPluginTask(hub, 'runEveryHour', PluginTaskType.Schedule, args.pluginConfigId) }, - runEveryHour: (server, args: { pluginConfigId: number }) => { - return runPluginTask(server, 'runEveryHour', PluginTaskType.Schedule, args.pluginConfigId) + runEveryDay: (hub, args: { pluginConfigId: number }) => { + return runPluginTask(hub, 'runEveryDay', PluginTaskType.Schedule, args.pluginConfigId) }, - runEveryDay: (server, args: { pluginConfigId: number }) => { - return runPluginTask(server, 'runEveryDay', PluginTaskType.Schedule, args.pluginConfigId) + getPluginSchedule: (hub) => { + return hub.pluginSchedule }, - getPluginSchedule: (server) => { - return server.pluginSchedule + ingestEvent: async (hub, args: { event: PluginEvent }) => { + return await ingestEvent(hub, args.event) }, - ingestEvent: async (server, args: { event: PluginEvent }) => { - return await ingestEvent(server, args.event) + reloadPlugins: async (hub) => { + await setupPlugins(hub) }, - reloadPlugins: async (server) => { - await setupPlugins(server) + reloadSchedule: async (hub) => { + await loadSchedule(hub) }, - reloadSchedule: async (server) => { - await loadSchedule(server) + reloadAllActions: async (hub) => { + return await hub.eventsProcessor.actionManager.reloadAllActions() }, - teardownPlugins: async (server) => { - await teardownPlugins(server) + reloadAction: async (hub, args: { actionId: Action['id'] }) => { + return await hub.eventsProcessor.actionManager.reloadAction(args.actionId) }, - flushKafkaMessages: async (server) => { - await server.kafkaProducer?.flush() + dropAction: (hub, args: { actionId: Action['id'] }) => { + return hub.eventsProcessor.actionManager.dropAction(args.actionId) + }, + teardownPlugins: async (hub) => { + await teardownPlugins(hub) + }, + flushKafkaMessages: async (hub) => { + await hub.kafkaProducer?.flush() }, } diff --git a/src/worker/vm/extensions/console.ts b/src/worker/vm/extensions/console.ts index 19fd4cdc..d22d2964 100644 --- a/src/worker/vm/extensions/console.ts +++ b/src/worker/vm/extensions/console.ts @@ -18,7 +18,7 @@ function consoleFormat(...args: unknown[]): string { export function createConsole(server: Hub, pluginConfig: PluginConfig): ConsoleExtension { async function consolePersist(type: PluginLogEntryType, ...args: unknown[]): Promise { - if (determineNodeEnv() == NodeEnv.Development) { + if (determineNodeEnv() === NodeEnv.Development) { status.info('👉', `${type} in ${pluginDigest(pluginConfig.plugin!, pluginConfig.team_id)}:`, ...args) } diff --git a/tests/helpers/sql.ts b/tests/helpers/sql.ts index a8386a4c..e4772d02 100644 --- a/tests/helpers/sql.ts +++ b/tests/helpers/sql.ts @@ -1,7 +1,17 @@ import { Pool, PoolClient } from 'pg' import { defaultConfig } from '../../src/config/config' -import { Hub, Plugin, PluginAttachmentDB, PluginConfig, PluginsServerConfig, Team } from '../../src/types' +import { + Hub, + Plugin, + PluginAttachmentDB, + PluginConfig, + PluginsServerConfig, + PropertyOperator, + RawAction, + RawOrganization, + Team, +} from '../../src/types' import { UUIDT } from '../../src/utils/utils' import { commonOrganizationId, @@ -29,6 +39,8 @@ export async function resetTestDatabase( } catch {} await db.query(` + DELETE FROM posthog_actionstep; + DELETE FROM posthog_action; DELETE FROM posthog_element; DELETE FROM posthog_elementgroup; DELETE FROM posthog_sessionrecordingevent; @@ -117,7 +129,7 @@ export async function createUserTeamAndOrganization( setup_section_2_completed: true, for_internal_metrics: false, available_features: [], - }) + } as RawOrganization) await insertRow(db, 'posthog_organizationmembership', { id: organizationMembershipId, organization_id: organizationId, @@ -151,6 +163,32 @@ export async function createUserTeamAndOrganization( timezone: 'UTC', data_attributes: ['data-attr'], }) + await insertRow(db, 'posthog_action', { + id: teamId + 67, + team_id: teamId, + name: 'Test Action', + created_at: new Date().toISOString(), + created_by_id: userId, + deleted: false, + post_to_slack: false, + slack_message_format: '', + is_calculating: false, + updated_at: new Date().toISOString(), + last_calculated_at: new Date().toISOString(), + } as RawAction) + await insertRow(db, 'posthog_actionstep', { + id: teamId + 911, + action_id: teamId + 67, + tag_name: null, + text: null, + href: null, + selector: null, + url: null, + url_matching: null, + name: null, + event: null, + properties: [{ type: 'event', operator: PropertyOperator.Exact, key: 'foo', value: ['bar'] }], + }) } export async function getTeams(hub: Hub): Promise { diff --git a/tests/postgres/teardown.test.ts b/tests/postgres/teardown.test.ts index 7631411f..7e80fd46 100644 --- a/tests/postgres/teardown.test.ts +++ b/tests/postgres/teardown.test.ts @@ -82,7 +82,7 @@ describe('teardown', () => { expect(event1.properties.storage).toBe('nope') await piscina!.broadcastTask({ task: 'reloadPlugins' }) - await delay(3000) + await delay(2000) const event2 = await piscina!.runTask({ task: 'processEvent', args: { event: { ...defaultEvent } } }) expect(event2.properties.storage).toBe('tore down') diff --git a/tests/postgres/worker.test.ts b/tests/postgres/worker.test.ts index 5db70959..2b612a25 100644 --- a/tests/postgres/worker.test.ts +++ b/tests/postgres/worker.test.ts @@ -4,9 +4,12 @@ import { mocked } from 'ts-jest/utils' import { ServerInstance, startPluginsServer } from '../../src/main/pluginsServer' import { loadPluginSchedule } from '../../src/main/services/schedule' -import { LogLevel } from '../../src/types' +import { Hub, LogLevel } from '../../src/types' import { Client } from '../../src/utils/celery/client' +import { createHub } from '../../src/utils/db/hub' +import { KafkaProducerWrapper } from '../../src/utils/db/kafka-producer-wrapper' import { delay, UUIDT } from '../../src/utils/utils' +import { ActionManager } from '../../src/worker/ingestion/action-manager' import { ingestEvent } from '../../src/worker/ingestion/ingest-event' import { makePiscina } from '../../src/worker/piscina' import { runPluginTask, runProcessEvent, runProcessEventBatch } from '../../src/worker/plugins/run' @@ -16,6 +19,7 @@ import { createTaskRunner } from '../../src/worker/worker' import { resetTestDatabase } from '../helpers/sql' import { setupPiscina } from '../helpers/worker' +jest.mock('../../src/worker/ingestion/action-manager') jest.mock('../../src/utils/db/sql') jest.mock('../../src/utils/status') jest.mock('../../src/worker/ingestion/ingest-event') @@ -38,6 +42,9 @@ function createEvent(index = 0): PluginEvent { beforeEach(() => { console.debug = jest.fn() + jest.spyOn(ActionManager.prototype, 'reloadAllActions') + jest.spyOn(ActionManager.prototype, 'reloadAction') + jest.spyOn(ActionManager.prototype, 'dropAction') }) test('piscina worker test', async () => { @@ -229,15 +236,15 @@ describe('queue logic', () => { describe('createTaskRunner()', () => { let taskRunner: any - let hub: any + let hub: Hub + let closeHub: () => Promise - beforeEach(() => { - hub = { mock: 'server' } + beforeEach(async () => { + ;[hub, closeHub] = await createHub() taskRunner = createTaskRunner(hub) }) - - it('handles `hello` task', async () => { - expect(await taskRunner({ task: 'hello', args: ['world'] })).toEqual('hello world!') + afterEach(async () => { + await closeHub() }) it('handles `processEvent` task', async () => { @@ -306,6 +313,24 @@ describe('createTaskRunner()', () => { expect(loadSchedule).toHaveBeenCalled() }) + it('handles `reloadAllActions` task', async () => { + await taskRunner({ task: 'reloadAllActions' }) + + expect(hub.eventsProcessor.actionManager.reloadAllActions).toHaveBeenCalledWith() + }) + + it('handles `reloadAction` task', async () => { + await taskRunner({ task: 'reloadAction', args: { actionId: 777 } }) + + expect(hub.eventsProcessor.actionManager.reloadAction).toHaveBeenCalledWith(777) + }) + + it('handles `dropAction` task', async () => { + await taskRunner({ task: 'dropAction', args: { actionId: 777 } }) + + expect(hub.eventsProcessor.actionManager.dropAction).toHaveBeenCalledWith(777) + }) + it('handles `teardownPlugin` task', async () => { await taskRunner({ task: 'teardownPlugins' }) @@ -313,7 +338,7 @@ describe('createTaskRunner()', () => { }) it('handles `flushKafkaMessages` task', async () => { - hub.kafkaProducer = { flush: jest.fn() } + hub.kafkaProducer = ({ flush: jest.fn() } as unknown) as KafkaProducerWrapper await taskRunner({ task: 'flushKafkaMessages' }) diff --git a/tests/schedule.test.ts b/tests/schedule.test.ts index 13e1fa79..91675aca 100644 --- a/tests/schedule.test.ts +++ b/tests/schedule.test.ts @@ -118,9 +118,7 @@ describe('startSchedule', () => { ` await resetTestDatabase(testCode) piscina = setupPiscina(workerThreads, 10) - const [_hub, _closeHub] = await createHub({ LOG_LEVEL: LogLevel.Log, SCHEDULE_LOCK_TTL: 3 }) - hub = _hub - closeHub = _closeHub + ;[hub, closeHub] = await createHub({ LOG_LEVEL: LogLevel.Log, SCHEDULE_LOCK_TTL: 3 }) redis = await hub.redisPool.acquire() await redis.del(LOCKED_RESOURCE) diff --git a/tests/shared/process-event.ts b/tests/shared/process-event.ts index b3cba28d..f8ae2e9a 100644 --- a/tests/shared/process-event.ts +++ b/tests/shared/process-event.ts @@ -128,6 +128,7 @@ export const createProcessEventTests = ( returned.hub = hub returned.closeHub = closeHub eventsProcessor = new EventsProcessor(hub) + await eventsProcessor.prepare() queryCounter = 0 processEventCounter = 0 team = await getFirstTeam(hub) diff --git a/tests/worker/ingestion/action-manager.test.ts b/tests/worker/ingestion/action-manager.test.ts new file mode 100644 index 00000000..42b7f866 --- /dev/null +++ b/tests/worker/ingestion/action-manager.test.ts @@ -0,0 +1,93 @@ +import { Hub, PropertyOperator, RawAction } from '../../../src/types' +import { createHub } from '../../../src/utils/db/hub' +import { ActionManager } from '../../../src/worker/ingestion/action-manager' +import { resetTestDatabase } from '../../helpers/sql' + +describe('ActionManager', () => { + let hub: Hub + let closeServer: () => Promise + let actionManager: ActionManager + + beforeEach(async () => { + ;[hub, closeServer] = await createHub() + await resetTestDatabase() + actionManager = new ActionManager(hub.db) + await actionManager.prepare() + }) + afterEach(async () => { + await closeServer() + }) + + it('returns the correct action', async () => { + const ACTION_ID = 69 + const ACTION_STEP_ID = 913 + + const action = actionManager.getAction(ACTION_ID) + + expect(action).toMatchObject({ + id: ACTION_ID, + name: 'Test Action', + deleted: false, + post_to_slack: false, + slack_message_format: '', + is_calculating: false, + steps: [ + { + id: ACTION_STEP_ID, + action_id: ACTION_ID, + tag_name: null, + text: null, + href: null, + selector: null, + url: null, + url_matching: null, + name: null, + event: null, + properties: [{ type: 'event', operator: PropertyOperator.Exact, key: 'foo', value: ['bar'] }], + }, + ], + }) + + await hub.db.postgresQuery( + `UPDATE posthog_actionstep SET properties = jsonb_set(properties, '{0,key}', '"baz"') WHERE id = $1`, + [ACTION_STEP_ID], + 'testKey' + ) + + // This is normally dispatched by Django and broadcasted by Piscina + await actionManager.reloadAction(ACTION_ID) + + const reloadedAction = actionManager.getAction(ACTION_ID) + + expect(reloadedAction).toMatchObject({ + id: ACTION_ID, + name: 'Test Action', + deleted: false, + post_to_slack: false, + slack_message_format: '', + is_calculating: false, + steps: [ + { + id: ACTION_STEP_ID, + action_id: ACTION_ID, + tag_name: null, + text: null, + href: null, + selector: null, + url: null, + url_matching: null, + name: null, + event: null, + properties: [{ type: 'event', operator: PropertyOperator.Exact, key: 'baz', value: ['bar'] }], + }, + ], + }) + + // This is normally dispatched by Django and broadcasted by Piscina + actionManager.dropAction(ACTION_ID) + + const droppedAction = actionManager.getAction(ACTION_ID) + + expect(droppedAction).toBeUndefined() + }) +})