diff --git a/src/utils/db/db.ts b/src/utils/db/db.ts index 60168f639..8f1814a35 100644 --- a/src/utils/db/db.ts +++ b/src/utils/db/db.ts @@ -44,6 +44,7 @@ import { castTimestampOrNow, clickHouseTimestampToISO, escapeClickHouseString, + RaceConditionError, sanitizeSqlIdentifier, tryTwice, UUID, @@ -595,17 +596,42 @@ export class DB { } public async moveDistinctIds(source: Person, target: Person): Promise { - const movedDistinctIdResult = await this.postgresQuery( - ` - UPDATE posthog_persondistinctid - SET person_id = $1 - WHERE person_id = $2 - AND team_id = $3 - RETURNING * - `, - [target.id, source.id, target.team_id], - 'updateDistinctIdPerson' - ) + let movedDistinctIdResult: QueryResult | null = null + try { + movedDistinctIdResult = await this.postgresQuery( + ` + UPDATE posthog_persondistinctid + SET person_id = $1 + WHERE person_id = $2 + AND team_id = $3 + RETURNING * + `, + [target.id, source.id, target.team_id], + 'updateDistinctIdPerson' + ) + } catch (error) { + if ( + error.message.includes( + 'insert or update on table "posthog_persondistinctid" violates foreign key constraint' + ) + ) { + // this is caused by a race condition where the _target_ person was deleted after fetching but + // before the update query ran and will trigger a retry with updated persons + throw new RaceConditionError( + 'Failed trying to move distinct IDs because target person no longer exists.' + ) + } + + throw error + } + + // this is caused by a race condition where the _source_ person was deleted after fetching but + // before the update query ran and will trigger a retry with updated persons + if (movedDistinctIdResult.rows.length === 0) { + throw new RaceConditionError( + `Failed trying to move distinct IDs because the source person no longer exists.` + ) + } if (this.kafkaProducer) { for (const row of movedDistinctIdResult.rows) { diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 619ac5552..728da566d 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -649,3 +649,9 @@ export class IllegalOperationError extends Error { super(operation) } } + +// For errors we want to explicitly throw +// concerning race conditions across threads +export class RaceConditionError extends Error { + name = 'RaceConditionError' +} diff --git a/src/worker/ingestion/process-event.ts b/src/worker/ingestion/process-event.ts index e785ba185..b15ad0f38 100644 --- a/src/worker/ingestion/process-event.ts +++ b/src/worker/ingestion/process-event.ts @@ -29,7 +29,7 @@ import { timeoutGuard, } from '../../utils/db/utils' import { status } from '../../utils/status' -import { castTimestampOrNow, filterIncrementProperties, UUID, UUIDT } from '../../utils/utils' +import { castTimestampOrNow, filterIncrementProperties, RaceConditionError, UUID, UUIDT } from '../../utils/utils' import { PersonManager } from './person-manager' import { TeamManager } from './team-manager' @@ -282,7 +282,8 @@ export class EventsProcessor { previousDistinctId: string, distinctId: string, teamId: number, - retryIfFailed = true + retryIfFailed = true, + totalMergeAttempts = 0 ): Promise { const oldPerson = await this.db.fetchPerson(teamId, previousDistinctId) const newPerson = await this.db.fetchPerson(teamId, distinctId) @@ -336,11 +337,31 @@ export class EventsProcessor { } if (oldPerson && newPerson && oldPerson.id !== newPerson.id) { - await this.mergePeople(newPerson, oldPerson) + await this.mergePeople({ + totalMergeAttempts, + mergeInto: newPerson, + mergeIntoDistinctId: distinctId, + otherPerson: oldPerson, + otherPersonDistinctId: previousDistinctId, + }) } } - public async mergePeople(mergeInto: Person, otherPerson: Person): Promise { + public async mergePeople({ + mergeInto, + mergeIntoDistinctId, + otherPerson, + otherPersonDistinctId, + totalMergeAttempts = 0, + }: { + mergeInto: Person + mergeIntoDistinctId: string + otherPerson: Person + otherPersonDistinctId: string + totalMergeAttempts: number + }): Promise { + const teamId = mergeInto.team_id + let firstSeen = mergeInto.created_at // Merge properties @@ -358,7 +379,9 @@ export class EventsProcessor { [mergeInto.id, otherPerson.id], 'updateCohortPeople' ) - let failedAttempts = 0 + let failedAttempts = totalMergeAttempts + let shouldRetryAliasOperation = false + // Retrying merging up to `MAX_FAILED_PERSON_MERGE_ATTEMPTS` times, in case race conditions occur. // An example is a distinct ID being aliased in another plugin server instance, // between `moveDistinctId` and `deletePerson` being called here @@ -370,7 +393,23 @@ export class EventsProcessor { // In the rare case of the person changing VERY often however, it may happen even a few times, // in which case we'll bail and rethrow the error. while (true) { - await this.db.moveDistinctIds(otherPerson, mergeInto) + try { + await this.db.moveDistinctIds(otherPerson, mergeInto) + } catch (error) { + Sentry.captureException(error, { + extra: { mergeInto, mergeIntoDistinctId, otherPerson, otherPersonDistinctId }, + }) + failedAttempts++ + + // If a person was deleted in between fetching and moveDistinctId, re-run alias to ensure + // the updated persons are fetched and merged safely + if (error instanceof RaceConditionError && failedAttempts < MAX_FAILED_PERSON_MERGE_ATTEMPTS) { + shouldRetryAliasOperation = true + break + } + + throw error + } try { await this.db.deletePerson(otherPerson) @@ -386,6 +425,10 @@ export class EventsProcessor { continue // Not OK, trying again to make sure that ALL distinct IDs are merged } } + + if (shouldRetryAliasOperation) { + await this.alias(otherPersonDistinctId, mergeIntoDistinctId, teamId, false, failedAttempts) + } } private async capture( diff --git a/tests/shared/process-event.ts b/tests/shared/process-event.ts index 9cbe7c966..a55766647 100644 --- a/tests/shared/process-event.ts +++ b/tests/shared/process-event.ts @@ -173,7 +173,13 @@ export const createProcessEventTests = ( expect((await hub.db.fetchPersons()).length).toEqual(2) const [person0, person1] = await hub.db.fetchPersons() - await eventsProcessor.mergePeople(person0, person1) + await eventsProcessor.mergePeople({ + mergeInto: person0, + mergeIntoDistinctId: 'person_0', + otherPerson: person1, + otherPersonDistinctId: 'person_1', + totalMergeAttempts: 0, + }) if (database === 'clickhouse') { await delayUntilEventIngested(async () =>