Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

In-memory action definitions synced with Django #403

Merged
merged 25 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
42eb852
Add ActionManager
Twixes May 21, 2021
bb45010
Refactor ActionManager
Twixes May 21, 2021
580ffc1
Remove hello
Twixes May 21, 2021
755e5dd
Adjust ActionManager method names and use single PubSub
Twixes May 21, 2021
54f254a
Touch tests up
Twixes May 21, 2021
797d56f
Merge branch 'master' into 235-action-matching
Twixes May 21, 2021
a87a0a0
Make some adjustments
Twixes May 24, 2021
ff0013e
Disable `status` stdout logs in test mode
Twixes May 24, 2021
e12d648
Fix `status`
Twixes May 24, 2021
f3415c8
Fix test problems
Twixes May 24, 2021
a42f6c8
Merge branch 'master' into 235-action-matching
Twixes May 25, 2021
3f5553a
Fix dropAction typo
Twixes May 25, 2021
63785f1
Reload all ActionManager caches every 5 min
Twixes May 25, 2021
e27c039
Merge branch 'master' into 235-action-matching
Twixes May 25, 2021
1590d3c
Fix duplicate RawAction
Twixes May 25, 2021
7f0dc18
Don't stringify JSONB column for `insertRow`
Twixes May 25, 2021
d768f11
It's a hub now
Twixes May 25, 2021
a5be2e4
Filter by Action.deleted
Twixes May 25, 2021
5dde8df
Enhance ActionManager tests
Twixes May 25, 2021
4af1ba7
Add Action-syncing task runner tests
Twixes May 25, 2021
bb606db
Merge branch 'master' into 235-action-matching
Twixes May 25, 2021
8304827
Use `LOG_LEVEL=warn` in tests
Twixes May 26, 2021
c132a53
Don't `throw` error on unassociated channel pubsub
Twixes May 26, 2021
4082003
Don't use defaultConfig in Status.buildMethod due to circular import
Twixes May 26, 2021
6212419
Fix actions reload job var name
Twixes May 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/postgres/ingestion.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ describe('ingestion benchmarks', () => {
LOG_LEVEL: LogLevel.Log,
})
eventsProcessor = new EventsProcessor(hub)
await eventsProcessor.prepare()
team = await getFirstTeam(hub)
now = DateTime.utc()

Expand Down
2 changes: 1 addition & 1 deletion src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export function getDefaultConfig(): PluginsServerConfig {
WORKER_CONCURRENCY: coreCount,
TASK_TIMEOUT: 30,
TASKS_PER_WORKER: 10,
LOG_LEVEL: LogLevel.Info,
LOG_LEVEL: isTestEnv ? LogLevel.Warn : LogLevel.Info,
SENTRY_DSN: null,
STATSD_HOST: null,
STATSD_PORT: 8125,
Expand Down
28 changes: 17 additions & 11 deletions src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ import { ReaderModel } from '@maxmind/geoip2-node'
import Piscina from '@posthog/piscina'
import * as Sentry from '@sentry/node'
import { FastifyInstance } from 'fastify'
import Redis from 'ioredis'
import net, { AddressInfo } from 'net'
import * as schedule from 'node-schedule'

import { defaultConfig } from '../config/config'
import { Hub, JobQueueConsumerControl, PluginsServerConfig, Queue, ScheduleControl } from '../types'
import { createHub } from '../utils/db/hub'
import { killProcess } from '../utils/kill'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { createRedis, delay, getPiscinaStats } from '../utils/utils'
import { delay, getPiscinaStats } from '../utils/utils'
import { startQueue } from './ingestion-queues/queue'
import { startJobQueueConsumer } from './job-queues/job-queue-consumer'
import { createMmdbServer, performMmdbStalenessCheck, prepareMmdb } from './services/mmdb'
Expand Down Expand Up @@ -41,7 +41,7 @@ export async function startPluginsServer(

status.info('ℹ️', `${serverConfig.WORKER_CONCURRENCY} workers, ${serverConfig.TASKS_PER_WORKER} tasks per worker`)

let pubSub: Redis.Redis | undefined
let pubSub: PubSub | undefined
let hub: Hub | undefined
let fastifyInstance: FastifyInstance | undefined
let pingJob: schedule.Job | undefined
Expand Down Expand Up @@ -73,7 +73,7 @@ export async function startPluginsServer(
}
lastActivityCheck && clearInterval(lastActivityCheck)
await queue?.stop()
await pubSub?.quit()
await pubSub?.stop()
pingJob && schedule.cancelJob(pingJob)
statsJob && schedule.cancelJob(statsJob)
await jobQueueConsumer?.stop()
Expand Down Expand Up @@ -141,23 +141,29 @@ export async function startPluginsServer(
void jobQueueConsumer?.resume()
})

// use one extra connection for redis pubsub
pubSub = await createRedis(hub)
await pubSub.subscribe(hub.PLUGINS_RELOAD_PUBSUB_CHANNEL)
pubSub.on('message', async (channel: string, message) => {
if (channel === hub!.PLUGINS_RELOAD_PUBSUB_CHANNEL) {
// use one extra Redis connection for pub-sub
pubSub = new PubSub(hub, {
[hub.PLUGINS_RELOAD_PUBSUB_CHANNEL]: async () => {
status.info('⚡', 'Reloading plugins!')

await piscina?.broadcastTask({ task: 'reloadPlugins' })
await scheduleControl?.reloadSchedule()
}
},
'reload-action': async (message) =>
await piscina?.broadcastTask({ task: 'reloadAction', args: { actionId: parseInt(message) } }),
'drop-action': async (message) =>
await piscina?.broadcastTask({ task: 'dropAction', args: { actionId: parseInt(message) } }),
})
await pubSub.start()

if (hub.jobQueueManager) {
const queueString = hub.jobQueueManager.getJobQueueTypesAsString()
await hub!.db!.redisSet('@posthog-plugin-server/enabled-job-queues', queueString)
}

// every 5 minutes all ActionManager caches are reloaded for eventual consistency
pingJob = schedule.scheduleJob('*/5 * * * *', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this is overridden by the pingJob below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

await piscina?.broadcastTask({ task: 'reloadAllActions' })
})
// every 5 seconds set Redis keys @posthog-plugin-server/ping and @posthog-plugin-server/version
pingJob = schedule.scheduleJob('*/5 * * * * *', async () => {
await hub!.db!.redisSet('@posthog-plugin-server/ping', new Date().toISOString(), 60, {
Expand Down
107 changes: 106 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ import { EventsProcessor } from './worker/ingestion/process-event'
import { LazyPluginVM } from './worker/vm/lazy'

export enum LogLevel {
None = 'none',
Debug = 'debug',
Info = 'info',
Log = 'log',
Warn = 'warn',
Error = 'error',
None = 'none',
}

export const logLevelToNumber: Record<LogLevel, number> = {
[LogLevel.None]: 0,
[LogLevel.Debug]: 10,
[LogLevel.Info]: 20,
[LogLevel.Log]: 30,
[LogLevel.Warn]: 40,
[LogLevel.Error]: 50,
}

export interface PluginsServerConfig extends Record<string, any> {
Expand Down Expand Up @@ -318,6 +327,7 @@ export interface RawOrganization {
name: string
created_at: string
updated_at: string
available_features: string[]
}

/** Usable Team model. */
Expand Down Expand Up @@ -429,6 +439,101 @@ export interface CohortPeople {
person_id: number
}

/** Sync with posthog/frontend/src/types.ts */
export enum PropertyOperator {
Exact = 'exact',
IsNot = 'is_not',
IContains = 'icontains',
NotIContains = 'not_icontains',
Regex = 'regex',
NotRegex = 'not_regex',
GreaterThan = 'gt',
LessThan = 'lt',
IsSet = 'is_set',
IsNotSet = 'is_not_set',
}

/** Sync with posthog/frontend/src/types.ts */
Twixes marked this conversation as resolved.
Show resolved Hide resolved
interface BasePropertyFilter {
key: string
value: string | number | Array<string | number> | null
label?: string
}

/** Sync with posthog/frontend/src/types.ts */
export interface EventPropertyFilter extends BasePropertyFilter {
type: 'event'
operator: PropertyOperator
}

/** Sync with posthog/frontend/src/types.ts */
export interface PersonPropertyFilter extends BasePropertyFilter {
type: 'person'
operator: PropertyOperator
}

/** Sync with posthog/frontend/src/types.ts */
export interface ElementPropertyFilter extends BasePropertyFilter {
type: 'element'
key: 'tag_name' | 'text' | 'href' | 'selector'
operator: PropertyOperator
}

/** Sync with posthog/frontend/src/types.ts */
export interface CohortPropertyFilter extends BasePropertyFilter {
type: 'cohort'
key: 'id'
value: number
}

/** Sync with posthog/frontend/src/types.ts */
export type ActionStepProperties =
| EventPropertyFilter
| PersonPropertyFilter
| ElementPropertyFilter
| CohortPropertyFilter

/** Sync with posthog/frontend/src/types.ts */
export enum ActionStepUrlMatching {
Contains = 'contains',
Regex = 'regex',
Exact = 'exact',
}

export interface ActionStep {
id: number
action_id: number
tag_name: string | null
text: string | null
href: string | null
selector: string | null
url: string | null
url_matching: ActionStepUrlMatching | null
name: string | null
event: string | null
properties: ActionStepProperties[] | null
}

/** Raw Action row from database. */
export interface RawAction {
Twixes marked this conversation as resolved.
Show resolved Hide resolved
id: number
team_id: TeamId
name: string | null
created_at: string
created_by_id: number | null
deleted: boolean
post_to_slack: boolean
slack_message_format: string
is_calculating: boolean
updated_at: string
last_calculated_at: string
}

/** Usable Action model. */
export interface Action extends RawAction {
steps: ActionStep[]
}

export interface SessionRecordingEvent {
uuid: string
timestamp: string
Expand Down
52 changes: 52 additions & 0 deletions src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { Pool, PoolClient, QueryConfig, QueryResult, QueryResultRow } from 'pg'

import { KAFKA_PERSON, KAFKA_PERSON_UNIQUE_ID, KAFKA_PLUGIN_LOG_ENTRIES } from '../../config/kafka-topics'
import {
Action,
ActionStep,
ClickHouseEvent,
ClickHousePerson,
ClickHousePersonDistinctId,
Expand All @@ -26,6 +28,7 @@ import {
PluginLogEntryType,
PostgresSessionRecordingEvent,
PropertyDefinitionType,
RawAction,
RawOrganization,
RawPerson,
SessionRecordingEvent,
Expand Down Expand Up @@ -705,14 +708,63 @@ export class DB {
return entry
}

// EventDefinition

public async fetchEventDefinitions(): Promise<EventDefinitionType[]> {
return (await this.postgresQuery('SELECT * FROM posthog_eventdefinition', undefined, 'fetchEventDefinitions'))
.rows as EventDefinitionType[]
}

// PropertyDefinition

public async fetchPropertyDefinitions(): Promise<PropertyDefinitionType[]> {
return (
await this.postgresQuery('SELECT * FROM posthog_propertydefinition', undefined, 'fetchPropertyDefinitions')
).rows as PropertyDefinitionType[]
}

// Action & ActionStep

public async fetchAllActionsMap(): Promise<Record<Action['id'], Action>> {
const rawActions: RawAction[] = (
await this.postgresQuery(`SELECT * FROM posthog_action WHERE deleted = FALSE`, undefined, 'fetchActions')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch!

).rows
const actionSteps: ActionStep[] = (
await this.postgresQuery(
`SELECT posthog_actionstep.*, posthog_action.deleted FROM posthog_actionstep
JOIN posthog_action ON (posthog_action.id = posthog_actionstep.action_id)
WHERE posthog_action.deleted = FALSE`,
undefined,
'fetchActionSteps'
)
).rows
const actionsMap: Record<Action['id'], Action> = {}
for (const rawAction of rawActions) {
actionsMap[rawAction.id] = { ...rawAction, steps: [] }
}
for (const actionStep of actionSteps) {
if (actionStep.action_id in actionsMap) {
actionsMap[actionStep.action_id].steps.push(actionStep)
}
}
return actionsMap
}

public async fetchAction(id: Action['id']): Promise<Action | null> {
const rawActions: RawAction[] = (
await this.postgresQuery(
`SELECT * FROM posthog_action WHERE id = $1 AND deleted = FALSE`,
[id],
'fetchActions'
)
).rows
if (!rawActions.length) {
return null
}
const steps: ActionStep[] = (
await this.postgresQuery(`SELECT * FROM posthog_actionstep WHERE action_id = $1`, [id], 'fetchActionSteps')
).rows
const action: Action = { ...rawActions[0], steps }
Twixes marked this conversation as resolved.
Show resolved Hide resolved
return action
}
}
1 change: 1 addition & 0 deletions src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ export async function createHub(

// :TODO: This is only used on worker threads, not main
hub.eventsProcessor = new EventsProcessor(hub as Hub)
await hub.eventsProcessor.prepare()
hub.jobQueueManager = new JobQueueManager(hub as Hub)

try {
Expand Down
57 changes: 57 additions & 0 deletions src/utils/pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { captureException } from '@sentry/node'
import { Redis } from 'ioredis'

import { PluginsServerConfig } from '../types'
import { status } from './status'
import { createRedis } from './utils'

export type PubSubTask = ((message: string) => void) | ((message: string) => Promise<void>)

export interface PubSubTaskMap {
[channel: string]: PubSubTask
}

export class PubSub {
private serverConfig: PluginsServerConfig
private redis: Redis | null
public taskMap: PubSubTaskMap

constructor(serverConfig: PluginsServerConfig, taskMap: PubSubTaskMap = {}) {
this.serverConfig = serverConfig
this.redis = null
this.taskMap = taskMap
}

public async start(): Promise<void> {
if (this.redis) {
throw new Error('Started PubSub cannot be started again!')
}
this.redis = await createRedis(this.serverConfig)
const channels = Object.keys(this.taskMap)
await this.redis.subscribe(channels)
this.redis.on('message', (channel: string, message: string) => {
const task: PubSubTask | undefined = this.taskMap[channel]
if (!task) {
captureException(
new Error(
`Received a pubsub message for unassociated channel ${channel}! Associated channels are: ${Object.keys(
this.taskMap
).join(', ')}`
)
)
}
void task(message)
})
status.info('👀', `Pub-sub started for channels: ${channels.join(', ')}`)
}

public async stop(): Promise<void> {
if (!this.redis) {
throw new Error('Unstarted PubSub cannot be stopped!')
}
await this.redis.unsubscribe()
this.redis.disconnect()
this.redis = null
status.info('🛑', `Pub-sub stopped for channels: ${Object.keys(this.taskMap).join(', ')}`)
}
}
2 changes: 1 addition & 1 deletion src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import AdmZip from 'adm-zip'
import { randomBytes } from 'crypto'
import Redis, { RedisOptions } from 'ioredis'
import { DateTime } from 'luxon'
import { Pool, PoolClient, PoolConfig } from 'pg'
import { Pool, PoolConfig } from 'pg'
import { Readable } from 'stream'
import * as tar from 'tar-stream'
import * as zlib from 'zlib'
Expand Down
Loading