From a0845f8c5c4b83c08e8dc0db53a14806faea5289 Mon Sep 17 00:00:00 2001 From: yakkomajuri Date: Tue, 10 Aug 2021 15:52:41 +0000 Subject: [PATCH 01/10] handle race conditions with moveDistinctIds --- src/utils/db/db.ts | 5 ++++ src/worker/ingestion/process-event.ts | 33 ++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/utils/db/db.ts b/src/utils/db/db.ts index 5431e9ec..b5a88562 100644 --- a/src/utils/db/db.ts +++ b/src/utils/db/db.ts @@ -602,6 +602,11 @@ export class DB { 'updateDistinctIdPerson' ) + // this is caused by a race condition and will trigger a retry + if (movedDistinctIdResult.rows.length === 0) { + throw new Error('Tried to move ditinct IDs from a non-existent person') + } + if (this.kafkaProducer) { for (const row of movedDistinctIdResult.rows) { await this.kafkaProducer.queueMessage({ diff --git a/src/worker/ingestion/process-event.ts b/src/worker/ingestion/process-event.ts index d73025a8..429ffefe 100644 --- a/src/worker/ingestion/process-event.ts +++ b/src/worker/ingestion/process-event.ts @@ -330,11 +330,19 @@ export class EventsProcessor { } if (oldPerson && newPerson && oldPerson.id !== newPerson.id) { - await this.mergePeople(newPerson, oldPerson) + await this.mergePeople([newPerson, distinctId], [oldPerson, previousDistinctId]) } } - public async mergePeople(mergeInto: Person, otherPerson: Person): Promise { + public async mergePeople(mergeIntoPayload: [Person, string], otherPersonPayload: [Person, string]): Promise { + let mergeInto = mergeIntoPayload[0] + let otherPerson = otherPersonPayload[0] + + const mergeIntoDistinctId = mergeIntoPayload[1] + const otherPersonDistinctId = otherPersonPayload[1] + + const teamId = mergeInto.team_id + let firstSeen = mergeInto.created_at // Merge properties @@ -364,7 +372,26 @@ export class EventsProcessor { // In the rare case of the person changing VERY often however, it may happen even a few times, // in which case we'll bail and rethrow the error. while (true) { - await this.db.moveDistinctIds(otherPerson, mergeInto) + try { + await this.db.moveDistinctIds(otherPerson, mergeInto) + } catch (error) { + Sentry.captureException(error, { extra: { mergeIntoPayload, otherPersonPayload } }) + + // If the a person was deleted in between fetching and moveDistinctId, + // try to fetch the person again as the distinct ID is likely to now point + // to another person + const otherPersonResult = await this.db.fetchPerson(teamId, mergeIntoDistinctId) + const mergeIntoResult = await this.db.fetchPerson(teamId, otherPersonDistinctId) + + mergeInto = mergeIntoResult ?? mergeInto + otherPerson = otherPersonResult ?? otherPerson + + failedAttempts++ + if (failedAttempts === MAX_FAILED_PERSON_MERGE_ATTEMPTS) { + throw error + } + continue + } try { await this.db.deletePerson(otherPerson) From 65ce804b4d68e64c5546047a61f4fefc25f790be Mon Sep 17 00:00:00 2001 From: yakkomajuri Date: Tue, 10 Aug 2021 16:06:30 +0000 Subject: [PATCH 02/10] update comment --- src/worker/ingestion/process-event.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/worker/ingestion/process-event.ts b/src/worker/ingestion/process-event.ts index 429ffefe..cae04b5c 100644 --- a/src/worker/ingestion/process-event.ts +++ b/src/worker/ingestion/process-event.ts @@ -377,7 +377,7 @@ export class EventsProcessor { } catch (error) { Sentry.captureException(error, { extra: { mergeIntoPayload, otherPersonPayload } }) - // If the a person was deleted in between fetching and moveDistinctId, + // If a person was deleted in between fetching and moveDistinctId, // try to fetch the person again as the distinct ID is likely to now point // to another person const otherPersonResult = await this.db.fetchPerson(teamId, mergeIntoDistinctId) From 4b6b2d9fa0be3a52f2a6790a7d3b152f22bcc0ec Mon Sep 17 00:00:00 2001 From: yakkomajuri Date: Tue, 10 Aug 2021 16:17:01 +0000 Subject: [PATCH 03/10] update error message --- src/utils/db/db.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/db/db.ts b/src/utils/db/db.ts index b5a88562..03860fb6 100644 --- a/src/utils/db/db.ts +++ b/src/utils/db/db.ts @@ -604,7 +604,7 @@ export class DB { // this is caused by a race condition and will trigger a retry if (movedDistinctIdResult.rows.length === 0) { - throw new Error('Tried to move ditinct IDs from a non-existent person') + throw new Error('Tried to move ditinct IDs from an unreferenced person') } if (this.kafkaProducer) { From 1cf9961bb685c0868c45e72f3217402622eb6289 Mon Sep 17 00:00:00 2001 From: yakkomajuri Date: Tue, 10 Aug 2021 17:13:22 +0000 Subject: [PATCH 04/10] update test --- tests/shared/process-event.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/shared/process-event.ts b/tests/shared/process-event.ts index 9cbe7c96..7b704cd9 100644 --- a/tests/shared/process-event.ts +++ b/tests/shared/process-event.ts @@ -173,7 +173,7 @@ export const createProcessEventTests = ( expect((await hub.db.fetchPersons()).length).toEqual(2) const [person0, person1] = await hub.db.fetchPersons() - await eventsProcessor.mergePeople(person0, person1) + await eventsProcessor.mergePeople([person0, 'person_0'], [person1, 'person_1']) if (database === 'clickhouse') { await delayUntilEventIngested(async () => From a7c0fbb4cb97f3f49d01bee4ee5f1ad7f703be9d Mon Sep 17 00:00:00 2001 From: yakkomajuri Date: Wed, 11 Aug 2021 15:49:11 +0000 Subject: [PATCH 05/10] update approach --- src/worker/ingestion/process-event.ts | 24 +++++------------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/src/worker/ingestion/process-event.ts b/src/worker/ingestion/process-event.ts index cae04b5c..7b4b2e19 100644 --- a/src/worker/ingestion/process-event.ts +++ b/src/worker/ingestion/process-event.ts @@ -335,12 +335,8 @@ export class EventsProcessor { } public async mergePeople(mergeIntoPayload: [Person, string], otherPersonPayload: [Person, string]): Promise { - let mergeInto = mergeIntoPayload[0] - let otherPerson = otherPersonPayload[0] - - const mergeIntoDistinctId = mergeIntoPayload[1] - const otherPersonDistinctId = otherPersonPayload[1] - + const [mergeInto, mergeIntoDistinctId] = mergeIntoPayload + const [otherPerson, otherPersonDistinctId] = otherPersonPayload const teamId = mergeInto.team_id let firstSeen = mergeInto.created_at @@ -378,19 +374,9 @@ export class EventsProcessor { Sentry.captureException(error, { extra: { mergeIntoPayload, otherPersonPayload } }) // If a person was deleted in between fetching and moveDistinctId, - // try to fetch the person again as the distinct ID is likely to now point - // to another person - const otherPersonResult = await this.db.fetchPerson(teamId, mergeIntoDistinctId) - const mergeIntoResult = await this.db.fetchPerson(teamId, otherPersonDistinctId) - - mergeInto = mergeIntoResult ?? mergeInto - otherPerson = otherPersonResult ?? otherPerson - - failedAttempts++ - if (failedAttempts === MAX_FAILED_PERSON_MERGE_ATTEMPTS) { - throw error - } - continue + // re-run alias to ensure the updated persons are fetched and + // merged safely + return await this.alias(otherPersonDistinctId, mergeIntoDistinctId, teamId) } try { From 2c81c1f9f3eaac5bc2c4bb3f148b22aceb964da2 Mon Sep 17 00:00:00 2001 From: yakkomajuri Date: Wed, 11 Aug 2021 16:16:29 +0000 Subject: [PATCH 06/10] updates --- src/utils/db/db.ts | 5 +++-- src/worker/ingestion/process-event.ts | 20 +++++++++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/utils/db/db.ts b/src/utils/db/db.ts index 03860fb6..1757f0af 100644 --- a/src/utils/db/db.ts +++ b/src/utils/db/db.ts @@ -602,9 +602,10 @@ export class DB { 'updateDistinctIdPerson' ) - // this is caused by a race condition and will trigger a retry + // this is caused by a race condition and will trigger a retry with + // updated persons if (movedDistinctIdResult.rows.length === 0) { - throw new Error('Tried to move ditinct IDs from an unreferenced person') + throw new Error(`Failed trying to move distinct IDs because otherPerson doesn't exist.`) } if (this.kafkaProducer) { diff --git a/src/worker/ingestion/process-event.ts b/src/worker/ingestion/process-event.ts index 7b4b2e19..ef4cd59e 100644 --- a/src/worker/ingestion/process-event.ts +++ b/src/worker/ingestion/process-event.ts @@ -279,7 +279,8 @@ export class EventsProcessor { previousDistinctId: string, distinctId: string, teamId: number, - retryIfFailed = true + retryIfFailed = true, + totalMergeAttempts = 0 ): Promise { const oldPerson = await this.db.fetchPerson(teamId, previousDistinctId) const newPerson = await this.db.fetchPerson(teamId, distinctId) @@ -330,11 +331,15 @@ export class EventsProcessor { } if (oldPerson && newPerson && oldPerson.id !== newPerson.id) { - await this.mergePeople([newPerson, distinctId], [oldPerson, previousDistinctId]) + await this.mergePeople([newPerson, distinctId], [oldPerson, previousDistinctId], totalMergeAttempts) } } - public async mergePeople(mergeIntoPayload: [Person, string], otherPersonPayload: [Person, string]): Promise { + public async mergePeople( + mergeIntoPayload: [Person, string], + otherPersonPayload: [Person, string], + totalMergeAttempts = 0 + ): Promise { const [mergeInto, mergeIntoDistinctId] = mergeIntoPayload const [otherPerson, otherPersonDistinctId] = otherPersonPayload const teamId = mergeInto.team_id @@ -356,7 +361,7 @@ export class EventsProcessor { [mergeInto.id, otherPerson.id], 'updateCohortPeople' ) - let failedAttempts = 0 + let failedAttempts = totalMergeAttempts // Retrying merging up to `MAX_FAILED_PERSON_MERGE_ATTEMPTS` times, in case race conditions occur. // An example is a distinct ID being aliased in another plugin server instance, // between `moveDistinctId` and `deletePerson` being called here @@ -373,10 +378,15 @@ export class EventsProcessor { } catch (error) { Sentry.captureException(error, { extra: { mergeIntoPayload, otherPersonPayload } }) + failedAttempts++ + if (failedAttempts === MAX_FAILED_PERSON_MERGE_ATTEMPTS) { + throw error + } + // If a person was deleted in between fetching and moveDistinctId, // re-run alias to ensure the updated persons are fetched and // merged safely - return await this.alias(otherPersonDistinctId, mergeIntoDistinctId, teamId) + return await this.alias(otherPersonDistinctId, mergeIntoDistinctId, teamId, false, failedAttempts) } try { From 4f2d85e7f5c8c322d32a6ef27bcf8ef259b871e0 Mon Sep 17 00:00:00 2001 From: yakkomajuri Date: Wed, 11 Aug 2021 16:21:02 +0000 Subject: [PATCH 07/10] add RaceConditionError --- src/utils/db/db.ts | 3 ++- src/utils/utils.ts | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/utils/db/db.ts b/src/utils/db/db.ts index 1757f0af..7c34bb24 100644 --- a/src/utils/db/db.ts +++ b/src/utils/db/db.ts @@ -44,6 +44,7 @@ import { castTimestampOrNow, clickHouseTimestampToISO, escapeClickHouseString, + RaceConditionError, sanitizeSqlIdentifier, tryTwice, UUID, @@ -605,7 +606,7 @@ export class DB { // this is caused by a race condition and will trigger a retry with // updated persons if (movedDistinctIdResult.rows.length === 0) { - throw new Error(`Failed trying to move distinct IDs because otherPerson doesn't exist.`) + throw new RaceConditionError(`Failed trying to move distinct IDs because otherPerson doesn't exist.`) } if (this.kafkaProducer) { diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 619ac555..728da566 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -649,3 +649,9 @@ export class IllegalOperationError extends Error { super(operation) } } + +// For errors we want to explicitly throw +// concerning race conditions across threads +export class RaceConditionError extends Error { + name = 'RaceConditionError' +} From 241d589c7ffb02e69d8d27706a981f2a67882919 Mon Sep 17 00:00:00 2001 From: yakkomajuri Date: Mon, 16 Aug 2021 12:23:09 +0000 Subject: [PATCH 08/10] address review comments --- src/utils/db/db.ts | 48 ++++++++++++++++-------- src/worker/ingestion/process-event.ts | 54 ++++++++++++++++++--------- 2 files changed, 70 insertions(+), 32 deletions(-) diff --git a/src/utils/db/db.ts b/src/utils/db/db.ts index 7c34bb24..5336c966 100644 --- a/src/utils/db/db.ts +++ b/src/utils/db/db.ts @@ -591,22 +591,40 @@ export class DB { } public async moveDistinctIds(source: Person, target: Person): Promise { - const movedDistinctIdResult = await this.postgresQuery( - ` - UPDATE posthog_persondistinctid - SET person_id = $1 - WHERE person_id = $2 - AND team_id = $3 - RETURNING * - `, - [target.id, source.id, target.team_id], - 'updateDistinctIdPerson' - ) + let operationFailedDueToRaceCondition = false + let movedDistinctIdResult: QueryResult | null = null + try { + movedDistinctIdResult = await this.postgresQuery( + ` + UPDATE posthog_persondistinctid + SET person_id = $1 + WHERE person_id = $2 + AND team_id = $3 + RETURNING * + `, + [target.id, source.id, target.team_id], + 'updateDistinctIdPerson' + ) + } catch (error) { + if ( + error.message.includes( + 'insert or update on table "posthog_persondistinctid" violates foreign key constraint' + ) + ) { + operationFailedDueToRaceCondition = true + } else { + throw error + } + } + + // this should never (?) happen, but needed to satisfy the TS compiler + if (!movedDistinctIdResult) { + throw new Error('movedDistinctIdResult is null but query did not error') + } - // this is caused by a race condition and will trigger a retry with - // updated persons - if (movedDistinctIdResult.rows.length === 0) { - throw new RaceConditionError(`Failed trying to move distinct IDs because otherPerson doesn't exist.`) + // this is caused by a race condition and will trigger a retry with updated persons + if (operationFailedDueToRaceCondition || movedDistinctIdResult.rows.length === 0) { + throw new RaceConditionError(`Failed trying to move distinct IDs because one of the persons doesn't exist.`) } if (this.kafkaProducer) { diff --git a/src/worker/ingestion/process-event.ts b/src/worker/ingestion/process-event.ts index ef4cd59e..961f1332 100644 --- a/src/worker/ingestion/process-event.ts +++ b/src/worker/ingestion/process-event.ts @@ -29,7 +29,7 @@ import { timeoutGuard, } from '../../utils/db/utils' import { status } from '../../utils/status' -import { castTimestampOrNow, filterIncrementProperties, UUID, UUIDT } from '../../utils/utils' +import { castTimestampOrNow, filterIncrementProperties, RaceConditionError, UUID, UUIDT } from '../../utils/utils' import { PersonManager } from './person-manager' import { TeamManager } from './team-manager' @@ -331,17 +331,29 @@ export class EventsProcessor { } if (oldPerson && newPerson && oldPerson.id !== newPerson.id) { - await this.mergePeople([newPerson, distinctId], [oldPerson, previousDistinctId], totalMergeAttempts) + await this.mergePeople({ + totalMergeAttempts, + mergeInto: newPerson, + mergeIntoDistinctId: distinctId, + otherPerson: oldPerson, + otherPersonDistinctId: previousDistinctId, + }) } } - public async mergePeople( - mergeIntoPayload: [Person, string], - otherPersonPayload: [Person, string], - totalMergeAttempts = 0 - ): Promise { - const [mergeInto, mergeIntoDistinctId] = mergeIntoPayload - const [otherPerson, otherPersonDistinctId] = otherPersonPayload + public async mergePeople({ + mergeInto, + mergeIntoDistinctId, + otherPerson, + otherPersonDistinctId, + totalMergeAttempts = 0, + }: { + mergeInto: Person + mergeIntoDistinctId: string + otherPerson: Person + otherPersonDistinctId: string + totalMergeAttempts: number + }): Promise { const teamId = mergeInto.team_id let firstSeen = mergeInto.created_at @@ -362,6 +374,8 @@ export class EventsProcessor { 'updateCohortPeople' ) let failedAttempts = totalMergeAttempts + let shouldRetryAliasOperation = false + // Retrying merging up to `MAX_FAILED_PERSON_MERGE_ATTEMPTS` times, in case race conditions occur. // An example is a distinct ID being aliased in another plugin server instance, // between `moveDistinctId` and `deletePerson` being called here @@ -376,17 +390,19 @@ export class EventsProcessor { try { await this.db.moveDistinctIds(otherPerson, mergeInto) } catch (error) { - Sentry.captureException(error, { extra: { mergeIntoPayload, otherPersonPayload } }) - + Sentry.captureException(error, { + extra: { mergeInto, mergeIntoDistinctId, otherPerson, otherPersonDistinctId }, + }) failedAttempts++ - if (failedAttempts === MAX_FAILED_PERSON_MERGE_ATTEMPTS) { - throw error + + // If a person was deleted in between fetching and moveDistinctId, re-run alias to ensure + // the updated persons are fetched and merged safely + if (error instanceof RaceConditionError && failedAttempts < MAX_FAILED_PERSON_MERGE_ATTEMPTS) { + shouldRetryAliasOperation = true + break } - // If a person was deleted in between fetching and moveDistinctId, - // re-run alias to ensure the updated persons are fetched and - // merged safely - return await this.alias(otherPersonDistinctId, mergeIntoDistinctId, teamId, false, failedAttempts) + throw error } try { @@ -403,6 +419,10 @@ export class EventsProcessor { continue // Not OK, trying again to make sure that ALL distinct IDs are merged } } + + if (shouldRetryAliasOperation) { + await this.alias(otherPersonDistinctId, mergeIntoDistinctId, teamId, false, failedAttempts) + } } private async capture( From 7ce5fc2d894768d4008bda71b35b34f96ba1e4b4 Mon Sep 17 00:00:00 2001 From: yakkomajuri Date: Mon, 16 Aug 2021 13:41:59 +0000 Subject: [PATCH 09/10] updates --- src/utils/db/db.ts | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/utils/db/db.ts b/src/utils/db/db.ts index 5336c966..e8228307 100644 --- a/src/utils/db/db.ts +++ b/src/utils/db/db.ts @@ -591,7 +591,6 @@ export class DB { } public async moveDistinctIds(source: Person, target: Person): Promise { - let operationFailedDueToRaceCondition = false let movedDistinctIdResult: QueryResult | null = null try { movedDistinctIdResult = await this.postgresQuery( @@ -611,20 +610,22 @@ export class DB { 'insert or update on table "posthog_persondistinctid" violates foreign key constraint' ) ) { - operationFailedDueToRaceCondition = true - } else { - throw error + // this is caused by a race condition where the _target_ person was deleted after fetching but + // before the update query ran and will trigger a retry with updated persons + throw new RaceConditionError( + 'Failed trying to move distinct IDs because target person no longer exists.' + ) } - } - // this should never (?) happen, but needed to satisfy the TS compiler - if (!movedDistinctIdResult) { - throw new Error('movedDistinctIdResult is null but query did not error') + throw error } - // this is caused by a race condition and will trigger a retry with updated persons - if (operationFailedDueToRaceCondition || movedDistinctIdResult.rows.length === 0) { - throw new RaceConditionError(`Failed trying to move distinct IDs because one of the persons doesn't exist.`) + // this is caused by a race condition where the _source_ person was deleted after fetching but + // before the update query ran and will trigger a retry with updated persons + if (movedDistinctIdResult.rows.length === 0) { + throw new RaceConditionError( + `Failed trying to move distinct IDs because the source person no longer exists.` + ) } if (this.kafkaProducer) { From c92c780aa55294348b3cd4ffa7ff3a3d733fa4ba Mon Sep 17 00:00:00 2001 From: yakkomajuri Date: Mon, 16 Aug 2021 13:50:34 +0000 Subject: [PATCH 10/10] update tests --- tests/shared/process-event.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/shared/process-event.ts b/tests/shared/process-event.ts index 7b704cd9..a5576664 100644 --- a/tests/shared/process-event.ts +++ b/tests/shared/process-event.ts @@ -173,7 +173,13 @@ export const createProcessEventTests = ( expect((await hub.db.fetchPersons()).length).toEqual(2) const [person0, person1] = await hub.db.fetchPersons() - await eventsProcessor.mergePeople([person0, 'person_0'], [person1, 'person_1']) + await eventsProcessor.mergePeople({ + mergeInto: person0, + mergeIntoDistinctId: 'person_0', + otherPerson: person1, + otherPersonDistinctId: 'person_1', + totalMergeAttempts: 0, + }) if (database === 'clickhouse') { await delayUntilEventIngested(async () =>