Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support importing contacts from arbitrary csv url #1434

Merged
merged 7 commits into from
Oct 6, 2022

Conversation

bchrobot
Copy link
Member

@bchrobot bchrobot commented Sep 23, 2022

Description

This adds support for importing contacts from a CSV at an arbitrary URL.

Motivation and Context

This allows triggering imports from sources such as ActionKit before there is complete/user-facing support in the Spoke Rewired application itself.

This also:

  • Moves contact list utilities to typed utility server-side (they are no longer used client-side)
  • Adds pg-promise for easier bulk inserts

How Has This Been Tested?

This has been tested locally. See screenshots.

Screenshots (if appropriate):

Screen_Shot_2022-09-23_at_7_46_41_AM

Screen Shot 2022-09-23 at 7 46 20 AM

Documentation Changes

N/A -- not user-facing.

Checklist:

  • My change requires a change to the documentation.
  • I have included updates for the documentation accordingly.

@bchrobot bchrobot marked this pull request as ready for review September 23, 2022 12:22
@bchrobot
Copy link
Member Author

This seems to run synchronously which is not ideal when there are single pods. Any thoughts on how to break up the work would be welcome.

I also plan to revisit after catching up on sleep.

@hiemanshu hiemanshu added this to the 4.38.1 milestone Sep 26, 2022
Copy link
Contributor

@hiemanshu hiemanshu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM!

Question: What does pg-promise provide?

src/lib/index.js Outdated Show resolved Hide resolved
@@ -0,0 +1,13 @@
import type { ScheduleConfig } from "graphile-scheduler";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is this hinting towards a potential refactor around task list separation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- some of that happened in #1403

https://github.com/politics-rewired/Spoke/pull/1403/files#diff-78ac00bb06a581fe2668ad8dc73889de02e086963a1e621929f2f03388d34964

I haven't created a formal task for it yet, but think it could be useful for tidying up the worker file -- separating the task list from the worker configuration.

@bchrobot
Copy link
Member Author

Question: What does pg-promise provide?

It simplifies bulk inserts when using the pg library:

const pgp = Pgp({});
const cs = new pgp.helpers.ColumnSet(
[
"campaign_id",
"first_name",
"last_name",
"cell",
"external_id",
"zip",
"custom_fields"
],
{
table: "campaign_contact"
}
);

const insertBatch = (trx: PoolClient, batch: CampaignContactInsertRow[]) => {
const query = pgp.helpers.insert(batch, cs);
return trx.query(query);
};

@hiemanshu
Copy link
Contributor

Question: What does pg-promise provide?

It simplifies bulk inserts when using the pg library:

Does it do anything different than using batchInsert from knex? https://knexjs.org/guide/utility.html#batchinsert

@bchrobot
Copy link
Member Author

Question: What does pg-promise provide?

It simplifies bulk inserts when using the pg library:

Does it do anything different than using batchInsert from knex? https://knexjs.org/guide/utility.html#batchinsert

I don't know about under the hood but probably not. The difference is just pg vs knex (/the tarn pooler knex uses).

@hiemanshu hiemanshu modified the milestones: 4.38.1, 4.39.0 Sep 29, 2022
@bchrobot bchrobot force-pushed the feat-import-from-url branch from 8651066 to c1c95eb Compare October 4, 2022 09:13
@bchrobot
Copy link
Member Author

bchrobot commented Oct 4, 2022

Below is a utility SQL function for cloning a campaign to be used when externally queueing URL imports (e.g. from Retool). I'm not sure whether it makes sense to add to version control or even replace the Node clone steps here:

const cloneSingle = async (count: number) => {
// Copy campaign
const {
rows: [newCampaign]
} = await trx.raw<QueryResult<CampaignRecord>>(
`
insert into campaign (
organization_id,
title,
description,
is_approved,
is_started,
is_archived,
due_by,
logo_image_url,
intro_html,
primary_color,
texting_hours_start,
texting_hours_end,
timezone,
creator_id,
is_autoassign_enabled,
limit_assignment_to_teams,
replies_stale_after_minutes,
external_system_id
)
select
organization_id,
(case
when is_template then replace(concat('COPY - ', title), '#', ?::text)
else 'COPY - ' || title
end),
description,
false as is_approved,
false as is_started,
false as is_archived,
due_by,
logo_image_url,
intro_html,
primary_color,
texting_hours_start,
texting_hours_end,
timezone,
? as creator_id,
false as is_autoassign_enabled,
limit_assignment_to_teams,
replies_stale_after_minutes,
external_system_id
from all_campaign
where id = ?
returning *
`,
[count, userId, campaignId]
);
// Copy Messaging Service OR use active one
const messagingServices = await r
.knex("messaging_service")
.where({ organization_id: newCampaign.organization_id, active: true });
if (messagingServices.length === 0) {
throw new Error("No active messaging services found");
}
await trx("campaign")
.update({
messaging_service_sid: messagingServices[0].messaging_service_sid
})
.where({ id: newCampaign.id });
// Copy interactions
const interactions = await trx<InteractionStepRecord>("interaction_step")
.where({
campaign_id: campaignId,
is_deleted: false
})
.then((interactionSteps) =>
interactionSteps.map<InteractionStepRecord | { id: string }>(
(interactionStep) => ({
id: `new${interactionStep.id}`,
questionText: interactionStep.question,
scriptOptions: interactionStep.script_options,
answerOption: interactionStep.answer_option,
answerActions: interactionStep.answer_actions,
isDeleted: interactionStep.is_deleted,
campaign_id: newCampaign.id,
parentInteractionId: interactionStep.parent_interaction_id
? `new${interactionStep.parent_interaction_id}`
: interactionStep.parent_interaction_id
})
)
);
if (interactions.length > 0) {
await persistInteractionStepTree(
newCampaign.id,
makeTree(interactions, null /* id */),
{ is_started: false },
trx
);
}
// Copy canned responses
await trx.raw(
`
insert into canned_response (campaign_id, title, text)
select
? as campaign_id,
title,
text
from canned_response
where campaign_id = ?
`,
[newCampaign.id, campaignId]
);
// Copy Teams
await trx.raw(
`
insert into campaign_team (campaign_id, team_id)
select
? as campaign_id,
team_id
from campaign_team
where campaign_id = ?
`,
[newCampaign.id, campaignId]
);
// Copy Campaign Groups
await trx.raw(
`
insert into campaign_group_campaign (campaign_id, campaign_group_id)
select
? as campaign_id,
campaign_group_id
from campaign_group_campaign
where campaign_id = ?
`,
[newCampaign.id, campaignId]
);
// Copy Campaign Variables
await trx.raw(
`
insert into campaign_variable (campaign_id, display_order, name, value)
select
? as campaign_id,
display_order,
campaign_variable.name,
(case
when all_campaign.is_template then null
else campaign_variable.value
end)
from campaign_variable
join all_campaign on all_campaign.id = campaign_variable.campaign_id
where
campaign_id = ?
and deleted_at is null
`,
[newCampaign.id, campaignId]
);
return newCampaign;
};

create schema if not exists utility;

create or replace function utility.clone_from_template(template_id integer, iteration_idx integer)
returns integer
as $$
declare
  v_messaging_service_sid text;
  v_campaign campaign;
begin
  -- Check messaging service
  select coalesce(c.messaging_service_sid, ms.messaging_service_sid)
  into v_messaging_service_sid
  from all_campaign c
  left join messaging_service ms on ms.organization_id = c.organization_id
  where true
    and c.id = template_id
    and ms.active = true
  limit 1;

  -- Ensure messaging service
  if (v_messaging_service_sid) is null then
    raise exception 'No active messaging services found';
  end if;

  -- Copy campaign
  insert into campaign (
    organization_id,
    title,
    description,
    is_approved,
    is_started,
    is_archived,
    due_by,
    logo_image_url,
    intro_html,
    primary_color,
    texting_hours_start,
    texting_hours_end,
    timezone,
    creator_id,
    is_autoassign_enabled,
    limit_assignment_to_teams,
    replies_stale_after_minutes,
    external_system_id,
    messaging_service_sid
  )
  select
    organization_id,
    (case
      when is_template then replace(concat('COPY - ', title), '#', iteration_idx::text)
      else 'COPY - ' || title
    end) as title,
    description,
    false as is_approved,
    false as is_started,
    false as is_archived,
    due_by,
    logo_image_url,
    intro_html,
    primary_color,
    texting_hours_start,
    texting_hours_end,
    timezone,
    null as creator_id,
    false as is_autoassign_enabled,
    limit_assignment_to_teams,
    replies_stale_after_minutes,
    external_system_id,
    v_messaging_service_sid as messaging_service_sid
  from all_campaign
  where id = template_id
  returning *
  into v_campaign;

  -- Copy interactions

  -- Deferring constraints wasn't necessary when testing locally but I had nicely ordered interaction tuples so  ¯\_(ツ)_/¯
  -- set constraints "interaction_step_parent_interaction_id_foreign" deferred;

  with payloads as (
    select *, nextval('interaction_step_id_seq'::regclass) as new_id
    from interaction_step
    where true
      and is_deleted = false
      and campaign_id = template_id
  )
  insert into interaction_step (
    id,
    campaign_id,
    question,
    parent_interaction_id,
    answer_option,
    answer_actions,
    script_options
  )
  select
    payloads.new_id,
    v_campaign.id as campaign_id,
    payloads.question,
    parent_id_mapping.new_id as parent_interaction_id,
    payloads.answer_option,
    payloads.answer_actions,
    payloads.script_options
  from payloads
  left join payloads parent_id_mapping on payloads.parent_interaction_id = parent_id_mapping.id;

  -- Copy canned responses
  insert into canned_response (campaign_id, title, text)
  select
    v_campaign.id as campaign_id,
    title,
    text
  from canned_response
  where campaign_id = template_id;

  -- Copy teams
  insert into campaign_team (campaign_id, team_id)
  select
    v_campaign.id as campaign_id,
    team_id
  from campaign_team
  where campaign_id = template_id;

  -- Copy campaign groups
  insert into campaign_group_campaign (campaign_id, campaign_group_id)
  select
    v_campaign.id as campaign_id,
    campaign_group_id
  from campaign_group_campaign
  where campaign_id = template_id;

  -- Copy campaign variables
  insert into campaign_variable (campaign_id, display_order, name, value)
  select
    v_campaign.id as campaign_id,
    display_order,
    campaign_variable.name,
    (case
      when all_campaign.is_template then null
      else campaign_variable.value
    end)
  from campaign_variable
  join all_campaign on all_campaign.id = campaign_variable.campaign_id
  where true
    and campaign_id = template_id
    and deleted_at is null;

  return v_campaign.id;
end;
$$ language plpgsql volatile security invoker;

Chunking can then occur using the following:

with new_campaigns as (
  select
    utility.clone_template(4, idx) as campaign_id,
    (idx - 1) * 100000 as offset,
    100000 as limit
  from generate_series(1, ceiling(250000/100000::decimal)) idx
)
select *
from new_campaigns
cross join lateral graphile_worker.add_job(
  'import-contact-csv-from-url',
  json_build_object(
    'campaignId',  new_campaigns.campaign_id,
    'offset': new_campaigns.offset,
    'limit': new_campaigns.limit,
    'initiateFilterLandlines', true,
    'signedDownloadUrl', 'https://s3.aws.amazon.com/my-secure-bucket/ak-export.csv',
    'columnMapping': '{}'::json
  ) as payload
) gw;

@bchrobot bchrobot requested a review from hiemanshu October 4, 2022 10:39
@hiemanshu
Copy link
Contributor

Below is a utility SQL function for cloning a campaign to be used when externally queueing URL imports (e.g. from Retool). I'm not sure whether it makes sense to add to version control or even replace the Node clone steps here:

My recommendation is to put it into dev-tools or similar as a .sql script that can easily be referenced without having to log into retool / come back to this PR.

Copy link
Contributor

@hiemanshu hiemanshu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@hiemanshu hiemanshu modified the milestones: 4.39.0, 4.40.0 Oct 4, 2022
@bchrobot
Copy link
Member Author

bchrobot commented Oct 5, 2022

@hiemanshu this can be merged whenever!

@hiemanshu hiemanshu merged commit 1d85657 into main Oct 6, 2022
@hiemanshu hiemanshu deleted the feat-import-from-url branch October 6, 2022 05:08
@hiemanshu hiemanshu modified the milestones: 4.39.1, 4.39.0 Oct 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants