Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Handle moveDistinctIds race conditions #524

Merged
merged 11 commits into from
Aug 16, 2021
48 changes: 33 additions & 15 deletions src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -591,22 +591,40 @@ export class DB {
}

public async moveDistinctIds(source: Person, target: Person): Promise<void> {
const movedDistinctIdResult = await this.postgresQuery(
`
UPDATE posthog_persondistinctid
SET person_id = $1
WHERE person_id = $2
AND team_id = $3
RETURNING *
`,
[target.id, source.id, target.team_id],
'updateDistinctIdPerson'
)
let operationFailedDueToRaceCondition = false
let movedDistinctIdResult: QueryResult<any> | null = null
try {
movedDistinctIdResult = await this.postgresQuery(
`
UPDATE posthog_persondistinctid
SET person_id = $1
WHERE person_id = $2
AND team_id = $3
RETURNING *
`,
[target.id, source.id, target.team_id],
'updateDistinctIdPerson'
)
} catch (error) {
if (
error.message.includes(
'insert or update on table "posthog_persondistinctid" violates foreign key constraint'
)
) {
operationFailedDueToRaceCondition = true
yakkomajuri marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw error
}
}

// this should never (?) happen, but needed to satisfy the TS compiler
yakkomajuri marked this conversation as resolved.
Show resolved Hide resolved
if (!movedDistinctIdResult) {
throw new Error('movedDistinctIdResult is null but query did not error')
}

// this is caused by a race condition and will trigger a retry with
// updated persons
if (movedDistinctIdResult.rows.length === 0) {
throw new RaceConditionError(`Failed trying to move distinct IDs because otherPerson doesn't exist.`)
// this is caused by a race condition and will trigger a retry with updated persons
if (operationFailedDueToRaceCondition || movedDistinctIdResult.rows.length === 0) {
throw new RaceConditionError(`Failed trying to move distinct IDs because one of the persons doesn't exist.`)
}

if (this.kafkaProducer) {
Expand Down
54 changes: 37 additions & 17 deletions src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
timeoutGuard,
} from '../../utils/db/utils'
import { status } from '../../utils/status'
import { castTimestampOrNow, filterIncrementProperties, UUID, UUIDT } from '../../utils/utils'
import { castTimestampOrNow, filterIncrementProperties, RaceConditionError, UUID, UUIDT } from '../../utils/utils'
import { PersonManager } from './person-manager'
import { TeamManager } from './team-manager'

Expand Down Expand Up @@ -331,17 +331,29 @@ export class EventsProcessor {
}

if (oldPerson && newPerson && oldPerson.id !== newPerson.id) {
await this.mergePeople([newPerson, distinctId], [oldPerson, previousDistinctId], totalMergeAttempts)
await this.mergePeople({
totalMergeAttempts,
mergeInto: newPerson,
mergeIntoDistinctId: distinctId,
otherPerson: oldPerson,
otherPersonDistinctId: previousDistinctId,
})
}
}

public async mergePeople(
mergeIntoPayload: [Person, string],
otherPersonPayload: [Person, string],
totalMergeAttempts = 0
): Promise<void> {
const [mergeInto, mergeIntoDistinctId] = mergeIntoPayload
const [otherPerson, otherPersonDistinctId] = otherPersonPayload
public async mergePeople({
macobo marked this conversation as resolved.
Show resolved Hide resolved
mergeInto,
mergeIntoDistinctId,
otherPerson,
otherPersonDistinctId,
totalMergeAttempts = 0,
}: {
mergeInto: Person
mergeIntoDistinctId: string
otherPerson: Person
otherPersonDistinctId: string
totalMergeAttempts: number
}): Promise<void> {
const teamId = mergeInto.team_id

let firstSeen = mergeInto.created_at
Expand All @@ -362,6 +374,8 @@ export class EventsProcessor {
'updateCohortPeople'
)
let failedAttempts = totalMergeAttempts
let shouldRetryAliasOperation = false

// Retrying merging up to `MAX_FAILED_PERSON_MERGE_ATTEMPTS` times, in case race conditions occur.
// An example is a distinct ID being aliased in another plugin server instance,
// between `moveDistinctId` and `deletePerson` being called here
Expand All @@ -376,17 +390,19 @@ export class EventsProcessor {
try {
await this.db.moveDistinctIds(otherPerson, mergeInto)
} catch (error) {
Sentry.captureException(error, { extra: { mergeIntoPayload, otherPersonPayload } })

Sentry.captureException(error, {
extra: { mergeInto, mergeIntoDistinctId, otherPerson, otherPersonDistinctId },
})
failedAttempts++
if (failedAttempts === MAX_FAILED_PERSON_MERGE_ATTEMPTS) {
throw error

// If a person was deleted in between fetching and moveDistinctId, re-run alias to ensure
// the updated persons are fetched and merged safely
if (error instanceof RaceConditionError && failedAttempts < MAX_FAILED_PERSON_MERGE_ATTEMPTS) {
shouldRetryAliasOperation = true
break
}

// If a person was deleted in between fetching and moveDistinctId,
// re-run alias to ensure the updated persons are fetched and
// merged safely
return await this.alias(otherPersonDistinctId, mergeIntoDistinctId, teamId, false, failedAttempts)
throw error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Is the other try-catch still valid? if not, it makes sense to perhaps remove the loop alltogether.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should still be valid, as a distinct ID can be added in between moveDistinctIds and deletePerson, in which case we need to move the IDs again so we can delete without an error.

}

try {
Expand All @@ -403,6 +419,10 @@ export class EventsProcessor {
continue // Not OK, trying again to make sure that ALL distinct IDs are merged
}
}

if (shouldRetryAliasOperation) {
await this.alias(otherPersonDistinctId, mergeIntoDistinctId, teamId, false, failedAttempts)
}
}

private async capture(
Expand Down