Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Handle moveDistinctIds race conditions #524

Merged
merged 11 commits into from
Aug 16, 2021
48 changes: 37 additions & 11 deletions src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
castTimestampOrNow,
clickHouseTimestampToISO,
escapeClickHouseString,
RaceConditionError,
sanitizeSqlIdentifier,
tryTwice,
UUID,
Expand Down Expand Up @@ -595,17 +596,42 @@ export class DB {
}

public async moveDistinctIds(source: Person, target: Person): Promise<void> {
const movedDistinctIdResult = await this.postgresQuery(
`
UPDATE posthog_persondistinctid
SET person_id = $1
WHERE person_id = $2
AND team_id = $3
RETURNING *
`,
[target.id, source.id, target.team_id],
'updateDistinctIdPerson'
)
let movedDistinctIdResult: QueryResult<any> | null = null
try {
movedDistinctIdResult = await this.postgresQuery(
`
UPDATE posthog_persondistinctid
SET person_id = $1
WHERE person_id = $2
AND team_id = $3
RETURNING *
`,
[target.id, source.id, target.team_id],
'updateDistinctIdPerson'
)
} catch (error) {
if (
error.message.includes(
'insert or update on table "posthog_persondistinctid" violates foreign key constraint'
)
) {
// this is caused by a race condition where the _target_ person was deleted after fetching but
// before the update query ran and will trigger a retry with updated persons
throw new RaceConditionError(
'Failed trying to move distinct IDs because target person no longer exists.'
)
}

throw error
}

// this is caused by a race condition where the _source_ person was deleted after fetching but
// before the update query ran and will trigger a retry with updated persons
if (movedDistinctIdResult.rows.length === 0) {
throw new RaceConditionError(
`Failed trying to move distinct IDs because the source person no longer exists.`
)
}

if (this.kafkaProducer) {
for (const row of movedDistinctIdResult.rows) {
Expand Down
6 changes: 6 additions & 0 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,3 +649,9 @@ export class IllegalOperationError extends Error {
super(operation)
}
}

// For errors we want to explicitly throw
// concerning race conditions across threads
export class RaceConditionError extends Error {
name = 'RaceConditionError'
}
55 changes: 49 additions & 6 deletions src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
timeoutGuard,
} from '../../utils/db/utils'
import { status } from '../../utils/status'
import { castTimestampOrNow, filterIncrementProperties, UUID, UUIDT } from '../../utils/utils'
import { castTimestampOrNow, filterIncrementProperties, RaceConditionError, UUID, UUIDT } from '../../utils/utils'
import { PersonManager } from './person-manager'
import { TeamManager } from './team-manager'

Expand Down Expand Up @@ -282,7 +282,8 @@ export class EventsProcessor {
previousDistinctId: string,
distinctId: string,
teamId: number,
retryIfFailed = true
retryIfFailed = true,
totalMergeAttempts = 0
): Promise<void> {
const oldPerson = await this.db.fetchPerson(teamId, previousDistinctId)
const newPerson = await this.db.fetchPerson(teamId, distinctId)
Expand Down Expand Up @@ -336,11 +337,31 @@ export class EventsProcessor {
}

if (oldPerson && newPerson && oldPerson.id !== newPerson.id) {
await this.mergePeople(newPerson, oldPerson)
await this.mergePeople({
totalMergeAttempts,
mergeInto: newPerson,
mergeIntoDistinctId: distinctId,
otherPerson: oldPerson,
otherPersonDistinctId: previousDistinctId,
})
}
}

public async mergePeople(mergeInto: Person, otherPerson: Person): Promise<void> {
public async mergePeople({
macobo marked this conversation as resolved.
Show resolved Hide resolved
mergeInto,
mergeIntoDistinctId,
otherPerson,
otherPersonDistinctId,
totalMergeAttempts = 0,
}: {
mergeInto: Person
mergeIntoDistinctId: string
otherPerson: Person
otherPersonDistinctId: string
totalMergeAttempts: number
}): Promise<void> {
const teamId = mergeInto.team_id

let firstSeen = mergeInto.created_at

// Merge properties
Expand All @@ -358,7 +379,9 @@ export class EventsProcessor {
[mergeInto.id, otherPerson.id],
'updateCohortPeople'
)
let failedAttempts = 0
let failedAttempts = totalMergeAttempts
let shouldRetryAliasOperation = false

// Retrying merging up to `MAX_FAILED_PERSON_MERGE_ATTEMPTS` times, in case race conditions occur.
// An example is a distinct ID being aliased in another plugin server instance,
// between `moveDistinctId` and `deletePerson` being called here
Expand All @@ -370,7 +393,23 @@ export class EventsProcessor {
// In the rare case of the person changing VERY often however, it may happen even a few times,
// in which case we'll bail and rethrow the error.
while (true) {
await this.db.moveDistinctIds(otherPerson, mergeInto)
try {
await this.db.moveDistinctIds(otherPerson, mergeInto)
} catch (error) {
Sentry.captureException(error, {
extra: { mergeInto, mergeIntoDistinctId, otherPerson, otherPersonDistinctId },
})
failedAttempts++

// If a person was deleted in between fetching and moveDistinctId, re-run alias to ensure
// the updated persons are fetched and merged safely
if (error instanceof RaceConditionError && failedAttempts < MAX_FAILED_PERSON_MERGE_ATTEMPTS) {
shouldRetryAliasOperation = true
break
}

throw error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Is the other try-catch still valid? if not, it makes sense to perhaps remove the loop alltogether.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should still be valid, as a distinct ID can be added in between moveDistinctIds and deletePerson, in which case we need to move the IDs again so we can delete without an error.

}

try {
await this.db.deletePerson(otherPerson)
Expand All @@ -386,6 +425,10 @@ export class EventsProcessor {
continue // Not OK, trying again to make sure that ALL distinct IDs are merged
}
}

if (shouldRetryAliasOperation) {
await this.alias(otherPersonDistinctId, mergeIntoDistinctId, teamId, false, failedAttempts)
}
}

private async capture(
Expand Down
8 changes: 7 additions & 1 deletion tests/shared/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,13 @@ export const createProcessEventTests = (
expect((await hub.db.fetchPersons()).length).toEqual(2)
const [person0, person1] = await hub.db.fetchPersons()

await eventsProcessor.mergePeople(person0, person1)
await eventsProcessor.mergePeople({
mergeInto: person0,
mergeIntoDistinctId: 'person_0',
otherPerson: person1,
otherPersonDistinctId: 'person_1',
totalMergeAttempts: 0,
})

if (database === 'clickhouse') {
await delayUntilEventIngested(async () =>
Expand Down