Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add script to fix activities #2831

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions services/apps/script_executor_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import {
findMemberIdentitiesGroupedByPlatform,
findMemberMergeActions,
} from './activities/dissect-member'
import {
findActivitiesWithWrongMembers,
findMemberIdentity,
updateActivitiesWithWrongMember,
} from './activities/fix-activities-with-wrong-member'
import {
deleteOrganizationIdentity,
findOrganizationIdentity,
Expand Down Expand Up @@ -38,4 +43,7 @@ export {
updateOrganizationIdentity,
deleteOrganizationIdentity,
isLfxMember,
findActivitiesWithWrongMembers,
findMemberIdentity,
updateActivitiesWithWrongMember,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { ActivityRepository } from '@crowd/data-access-layer/src/old/apps/script_executor_worker/activities.repo'
import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo'
import { IFindActivitiesWithWrongMembersResult } from '@crowd/data-access-layer/src/old/apps/script_executor_worker/types'
import { IMemberIdentity } from '@crowd/types'

import { svc } from '../../main'

export async function findActivitiesWithWrongMembers(
limit: number,
excludeMemberIds?: string[],
): Promise<IFindActivitiesWithWrongMembersResult[]> {
let activitiesWithWrongMember = []

try {
const activityRepo = new ActivityRepository(
svc.postgres.reader.connection(),
svc.log,
svc.questdbSQL,
)
activitiesWithWrongMember = await activityRepo.getActivitiesWithWrongMembers(
limit,
excludeMemberIds,
)
} catch (err) {
throw new Error(err)
}

return activitiesWithWrongMember
}

export async function findMemberIdentity(value: string, platform: string) {
let memberIdentity: IMemberIdentity
try {
const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log)
memberIdentity = await memberRepo.findMemberIdentity(value, platform)
} catch (err) {
throw new Error(err)
}

return memberIdentity
}

export async function updateActivitiesWithWrongMember(
wrongMemberId: string,
correctMemberId: string,
) {
try {
const activityRepo = new ActivityRepository(
svc.postgres.writer.connection(),
svc.log,
svc.questdbSQL,
)
await activityRepo.updateActivitiesWithWrongMember(wrongMemberId, correctMemberId)
} catch (err) {
throw new Error(err)
}
}
2 changes: 1 addition & 1 deletion services/apps/script_executor_worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const config: Config = {
enabled: true,
},
questdb: {
enabled: false,
enabled: true,
},
redis: {
enabled: true,
Expand Down
5 changes: 5 additions & 0 deletions services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ export interface IFixOrgIdentitiesWithWrongUrlsArgs {
tenantId: string
testRun?: boolean
}

export interface IFixActivitiesWithWrongMembersArgs {
limit: number
testRun?: boolean
}
2 changes: 2 additions & 0 deletions services/apps/script_executor_worker/src/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { dissectMember } from './workflows/dissectMember'
import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization'
import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms'
import { fixActivitiesWithWrongMember } from './workflows/fixActivitiesWithWrongMember'
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'

export {
findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms,
findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization,
dissectMember,
fixOrgIdentitiesWithWrongUrls,
fixActivitiesWithWrongMember,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { proxyActivities } from '@temporalio/workflow'

import * as activities from '../../src/activities/fix-activities-with-wrong-member'
import { IFixActivitiesWithWrongMembersArgs } from '../types'

const activity = proxyActivities<typeof activities>({
startToCloseTimeout: '45 minutes',
})

export async function fixActivitiesWithWrongMember(
args: IFixActivitiesWithWrongMembersArgs,
): Promise<void> {
const BATCH_SIZE = args.limit

let records = await activity.findActivitiesWithWrongMembers(BATCH_SIZE)

if (!records.length) {
console.log(`No activities found!`)
return
}

const membersWithoutIdentities = new Set<string>()

while (records.length > 0) {
for (const record of records) {
console.log(`Processing activity with memberId: ${record.memberId}`)

const memberIdentity = await activity.findMemberIdentity(record.username, record.platform)
if (!memberIdentity) {
console.log(`Member identity not found for ${record.username} on ${record.platform}`)
membersWithoutIdentities.add(record.memberId)
continue
}

await activity.updateActivitiesWithWrongMember(record.memberId, memberIdentity.memberId)
}

// if testRun then just run only once
if (args.testRun) {
console.log('Test run completed!')
break
}

// Subsequent queries with excluded memberIds
records = await activity.findActivitiesWithWrongMembers(
BATCH_SIZE,
Array.from(membersWithoutIdentities),
)
}

console.log(`Members without identities: ${membersWithoutIdentities.size}`)
console.log('Completed processing all members!')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { DbConnOrTx, DbConnection, DbTransaction } from '@crowd/database'
import { Logger } from '@crowd/logging'

import { IFindActivitiesWithWrongMembersResult } from './types'

export class ActivityRepository {
constructor(
private readonly connection: DbConnection | DbTransaction,
private readonly log: Logger,
private readonly questdbSQL: DbConnOrTx,
) {}

async getActivitiesWithWrongMembers(
limit = 100,
excludeMemberIds?: string[],
): Promise<IFindActivitiesWithWrongMembersResult[]> {
try {
const query = `
SELECT DISTINCT a."memberId", a.username, a.platform
FROM activities a
LEFT JOIN "memberIdentities" mi ON a."memberId" = mi."memberId"
WHERE mi."memberId" IS NULL
${excludeMemberIds?.length ? 'AND a."memberId" NOT IN ($(excludeMemberIds:csv))' : ''}
LIMIT $(limit)
`

return await this.connection.query(query, {
limit,
excludeMemberIds,
})
} catch (err) {
this.log.error('Error while finding activities!', err)
throw new Error(err)
}
}

async updateActivitiesWithWrongMember(
wrongMemberId: string,
correctMemberId: string,
): Promise<void> {
try {
this.log.info(`Updating activities from ${wrongMemberId} to ${correctMemberId}`)

await this.connection.none(
'UPDATE activities SET "memberId" = $(correctMemberId) WHERE "memberId" = $(wrongMemberId)',
{
wrongMemberId,
correctMemberId,
},
)

await this.questdbSQL.none(
'UPDATE activities SET "memberId" = $(correctMemberId) WHERE "memberId" = $(wrongMemberId)',
{
wrongMemberId,
correctMemberId,
},
)
} catch (err) {
this.log.error('Error while batch updating activities!', err)
throw new Error(err)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DbConnection, DbTransaction } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IMember } from '@crowd/types'
import { IMember, IMemberIdentity } from '@crowd/types'

import { IFindMemberIdentitiesGroupedByPlatformResult, ISimilarMember } from './types'

Expand Down Expand Up @@ -159,6 +159,46 @@ class MemberRepository {

return member
}

async findMemberIdentity(
value: string,
platform: string,
type = 'username',
): Promise<IMemberIdentity | null> {
let memberIdentity: IMemberIdentity

try {
memberIdentity = await this.connection.oneOrNone(
`
select * from "memberIdentities" where value = $(value) and platform = $(platform)
and type = $(type) and verified = true
`,
{
value,
platform,
type,
},
)

// try without verified
if (!memberIdentity) {
memberIdentity = await this.connection.oneOrNone(
`
select * from "memberIdentities"
where value = $(value)
and platform = $(platform)
and type = $(type)
`,
{ value, platform, type },
)
}

return memberIdentity
} catch (err) {
this.log.error('Error while finding member identity!', err)
throw new Error(err)
}
}
}

export default MemberRepository
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export interface IFindMemberMergeActionReplacement {
userId?: string
limit?: number
}

export interface IFindMemberIdentitiesGroupedByPlatformResult {
platforms: string[]
types: string[]
Expand All @@ -22,3 +21,9 @@ export interface IFindMemberIdentitiesGroupedByPlatformResult {
values: string[]
groupedByValue: string
}

export interface IFindActivitiesWithWrongMembersResult {
memberId: string
username: string
platform: string
}
Loading