Skip to content

Commit

Permalink
Combine activity moving and affiliation recalculation (#2807)
Browse files Browse the repository at this point in the history
  • Loading branch information
sausage-todd authored Feb 17, 2025
1 parent 9952017 commit 90965c5
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 107 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
},
"devDependencies": {
"husky": "^9.1.7"
}
},
"packageManager": "pnpm@9.11.0+sha512.0a203ffaed5a3f63242cd064c8fb5892366c103e328079318f78062f24ea8c9d50bc6a47aa3567cabefd824d170e78fa2745ed1f16b132e16436146b7688f19b"
}
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions services/apps/entity_merging_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"@crowd/opensearch": "workspace:*",
"@crowd/redis": "workspace:*",
"@crowd/types": "workspace:*",
"@crowd/queue": "workspace:*",
"@temporalio/workflow": "~1.11.1",
"tsx": "^4.7.1",
"typescript": "^5.6.3"
Expand Down
3 changes: 2 additions & 1 deletion services/apps/entity_merging_worker/src/activities.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
export {
deleteMember,
moveActivitiesBetweenMembers,
moveActivitiesWithIdentityToAnotherMember,
recalculateActivityAffiliationsOfMemberAsync,
syncMember,
notifyFrontendMemberMergeSuccessful,
notifyFrontendMemberUnmergeSuccessful,
syncRemoveMember,
finishMemberMergingUpdateActivities,
finishMemberUnmergingUpdateActivities,
} from './activities/members'

export {
Expand Down
90 changes: 75 additions & 15 deletions services/apps/entity_merging_worker/src/activities/members.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import { WorkflowIdReusePolicy } from '@temporalio/workflow'

import { updateActivities } from '@crowd/data-access-layer/src/activities/update'
import { cleanupMemberAggregates } from '@crowd/data-access-layer/src/members/segments'
import { IDbActivityCreateData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data'
import {
cleanupMember,
deleteMemberSegments,
findMemberById,
getIdentitiesWithActivity,
moveActivitiesToNewMember,
moveIdentityActivitiesToNewMember,
} from '@crowd/data-access-layer/src/old/apps/entity_merging_worker'
import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor'
import { figureOutNewOrgId } from '@crowd/data-access-layer/src/old/apps/profiles_worker'
import { prepareMemberAffiliationsUpdate } from '@crowd/data-access-layer/src/old/apps/profiles_worker'
import { dbStoreQx, pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { SearchSyncApiClient } from '@crowd/opensearch'
import { RedisPubSubEmitter } from '@crowd/redis'
import {
Expand All @@ -28,19 +31,6 @@ export async function deleteMember(memberId: string): Promise<void> {
await cleanupMember(svc.postgres.writer, memberId)
}

export async function moveActivitiesBetweenMembers(
primaryId: string,
secondaryId: string,
tenantId: string,
): Promise<void> {
const memberExists = await findMemberById(svc.postgres.writer, primaryId, tenantId)

if (!memberExists) {
return
}
await moveActivitiesToNewMember(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId)
}

export async function moveActivitiesWithIdentityToAnotherMember(
fromId: string,
toId: string,
Expand Down Expand Up @@ -187,3 +177,73 @@ export async function notifyFrontendMemberUnmergeSuccessful(
),
)
}

export async function finishMemberMergingUpdateActivities(memberId: string, newMemberId: string) {
const pgDb = svc.postgres.reader
const qDb = svc.questdbSQL
const queueClient = svc.queue

const qx = pgpQx(pgDb.connection())
const { orgCases } = await prepareMemberAffiliationsUpdate(qx, memberId)

await updateActivities(
qDb,
queueClient,
[
async () => ({ memberId: newMemberId }),
async (activity) => ({ organizationId: figureOutNewOrgId(activity, orgCases) }),
],
`"memberId" = $(memberId)`,
{
memberId,
},
)
}

function moveByIdentities({
activity,
identities,
newMemberId,
}: {
activity: IDbActivityCreateData
identities: IMemberIdentity[]
newMemberId: string
}): Partial<IDbActivityCreateData> {
const { platform, username } = activity
const activityMatches = identities.some(
(i) =>
i.type === MemberIdentityType.USERNAME && i.platform === platform && i.value === username,
)

return activityMatches ? { memberId: newMemberId } : {}
}

export async function finishMemberUnmergingUpdateActivities({
memberId,
newMemberId,
identities,
}: {
memberId: string
newMemberId: string
identities: IMemberIdentity[]
}) {
const pgDb = svc.postgres.reader
const qDb = svc.questdbSQL
const queueClient = svc.queue

const qx = pgpQx(pgDb.connection())
const { orgCases } = await prepareMemberAffiliationsUpdate(qx, memberId)

await updateActivities(
qDb,
queueClient,
[
async (activity) => moveByIdentities({ activity, identities, newMemberId }),
async (activity) => ({
organizationId: figureOutNewOrgId(activity, orgCases),
}),
],
`"memberId" = $(memberId)`,
{ memberId },
)
}
16 changes: 11 additions & 5 deletions services/apps/entity_merging_worker/src/workflows/all.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import * as activities from '../activities'

const {
deleteMember,
moveActivitiesBetweenMembers,
deleteOrganization,
moveActivitiesBetweenOrgs,
notifyFrontendOrganizationMergeSuccessful,
notifyFrontendOrganizationUnmergeSuccessful,
moveActivitiesWithIdentityToAnotherMember,
recalculateActivityAffiliationsOfMemberAsync,
recalculateActivityAffiliationsOfOrganizationSynchronous,
setMergeAction,
Expand All @@ -20,6 +18,8 @@ const {
notifyFrontendMemberMergeSuccessful,
notifyFrontendMemberUnmergeSuccessful,
syncRemoveMember,
finishMemberMergingUpdateActivities,
finishMemberUnmergingUpdateActivities,
} = proxyActivities<typeof activities>({
startToCloseTimeout: '60 minutes',
})
Expand All @@ -35,8 +35,9 @@ export async function finishMemberMerging(
await setMergeAction(primaryId, secondaryId, tenantId, {
step: MergeActionStep.MERGE_ASYNC_STARTED,
})
await moveActivitiesBetweenMembers(primaryId, secondaryId, tenantId)
await recalculateActivityAffiliationsOfMemberAsync(primaryId, tenantId)

await finishMemberMergingUpdateActivities(primaryId, secondaryId)

await syncMember(primaryId)
await syncRemoveMember(secondaryId)
await deleteMember(secondaryId)
Expand Down Expand Up @@ -66,7 +67,12 @@ export async function finishMemberUnmerging(
await setMergeAction(primaryId, secondaryId, tenantId, {
step: MergeActionStep.UNMERGE_ASYNC_STARTED,
})
await moveActivitiesWithIdentityToAnotherMember(primaryId, secondaryId, identities, tenantId)

await finishMemberUnmergingUpdateActivities({
memberId: primaryId,
newMemberId: secondaryId,
identities,
})
await syncMember(primaryId)
await syncMember(secondaryId)
await recalculateActivityAffiliationsOfMemberAsync(primaryId, tenantId)
Expand Down
38 changes: 27 additions & 11 deletions services/libs/data-access-layer/src/activities/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,42 @@ export async function streamActivities(
})
}

export type MapActivityFunction = (
activity: IDbActivityCreateData,
) => Promise<Partial<IDbActivityCreateData>>

export async function updateActivities(
qdb: DbConnOrTx,
queueClient: IQueue,
mapActivity: (activity: IDbActivityCreateData) => Promise<Partial<IDbActivityCreateData>>,
mapActivity: MapActivityFunction | MapActivityFunction[],
where: string,
params?: Record<string, unknown>,
): Promise<{ processed: number; duration: number }> {
async function mapNewActivity(
activity: IDbActivityCreateData,
mapActivity: MapActivityFunction | MapActivityFunction[],
): Promise<IDbActivityCreateData> {
let newActivity = activity

if (!Array.isArray(mapActivity)) {
mapActivity = [mapActivity]
}

for (const map of mapActivity) {
newActivity = {
...newActivity,
...(await map(newActivity)),
}
}

return newActivity
}

return streamActivities(
qdb,
async (activity) => {
await insertActivities(
queueClient,
[
{
...activity,
...(await mapActivity(activity)),
},
],
true,
)
const newActivity = await mapNewActivity(activity, mapActivity)
await insertActivities(queueClient, [newActivity], true)
},
where,
params,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,6 @@ export async function findMemberById(db: DbStore, primaryId: string, tenantId: s
)
}

export async function moveActivitiesToNewMember(
qdb: DbConnOrTx,
queueClient: IQueue,
primaryId: string,
secondaryId: string,
tenantId: string,
) {
await updateActivities(
qdb,
queueClient,
async () => ({ memberId: primaryId }),
`"memberId" = $(memberId) AND "tenantId" = $(tenantId)`,
{
memberId: secondaryId,
tenantId,
},
)
}

export async function updateMergeActionState(
db: DbStore,
primaryId: string,
Expand Down
Loading

0 comments on commit 90965c5

Please sign in to comment.