Skip to content

Commit

Permalink
Add S3 Destination (#2117)
Browse files Browse the repository at this point in the history
* initial push

* added subfolder functionality

* removing various commented code and cleanup for PR

* cleaning up operation file and removing snapshot.test.ts

* fixing index.ts and operations.ts errors

* updating s3 with unit test failure fixes

* revising all changes, tested to make sure they work

* defining interface for credentials

* fixing undefined unit tests

* fixing folderName undefined value

* trying to fix undefined

* non null assertation block added

* updating yarn.lock to that of master branch

* feat(code): add uploadCsv test; update yarn.lock

* cleaning up comments

* removing console log in performBatch

* fixing trycatch wrapper and removing data const

* fixing test failures

* fixing error message

* removing console.log

* refactored changes from meeting with Jeremy

* refactor

* changing everyting

* adding operations

* more done

* removed error validate

* added credentials checking

* fixing upload function

* fixed double payload

* more progress

* filename fix

* fix addnl props traits

* add steps to assume intermediary role

* removing test

---------

Co-authored-by: Mayur Pitale <mpitale@twilio.com>
Co-authored-by: Jason Sooter <7215306+JasonSooter@users.noreply.github.com>
Co-authored-by: Joe Ayoub <joe.ayoub@segment.com>
Co-authored-by: Varadarajan V <109586712+varadarajan-tw@users.noreply.github.com>
  • Loading branch information
5 people authored Jul 18, 2024
1 parent 48fa746 commit 4d3d6c3
Show file tree
Hide file tree
Showing 11 changed files with 1,850 additions and 2 deletions.
3 changes: 2 additions & 1 deletion packages/destination-actions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
"kafkajs": "^2.2.4",
"liquidjs": "^10.8.4",
"lodash": "^4.17.21",
"ssh2-sftp-client": "^9.1.0"
"ssh2-sftp-client": "^9.1.0",
"@aws-sdk/client-s3": "^3.600.0"
},
"jest": {
"preset": "ts-jest",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// import nock from 'nock'
// // import { createTestEvent, createTestIntegration } from '@segment/actions-core'
// import { createTestIntegration } from '@segment/actions-core'
// import Definition from '../index'

//const testDestination = createTestIntegration(Definition)

describe('Aws S3', () => {
describe('testAuthentication', () => {
it('should validate authentication inputs', async () => {

expect(true).toEqual(true)
})
})
})

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 96 additions & 0 deletions packages/destination-actions/src/destinations/aws-s3/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { AudienceDestinationDefinition, IntegrationError } from '@segment/actions-core'
import type { AudienceSettings, Settings } from './generated-types'

import syncAudienceToCSV from './syncAudienceToCSV'

type PersonasSettings = {
computation_id: string
}

const destination: AudienceDestinationDefinition<Settings, AudienceSettings> = {
name: 'AWS S3 CSV',
slug: 'actions-s3-csv',
mode: 'cloud',
description: 'Sync Segment event and Audience data to AWS S3.',
audienceFields: {
s3_aws_folder_name: {
label: 'AWS Subfolder Name',
description:
'Name of the S3 Subfolder where the files will be uploaded to. "/" must exist at the end of the folder name.',
type: 'string',
required: false
},
filename: {
label: 'Filename prefix',
description: `Prefix to append to the name of the uploaded file. A timestamp and lower cased audience name will be appended to the filename to ensure uniqueness.`,
type: 'string',
required: false
},
delimiter: {
label: 'Delimeter',
description: `Character used to separate tokens in the resulting file.`,
type: 'string',
required: true,
choices: [
{label: 'comma', value: ','},
{label: 'pipe', value: '|'},
{label: 'tab', value: 'tab'},
{label: 'semicolon', value: ';'},
{label: 'colon', value: ':'},
],
default: ','
}
},
authentication: {
scheme: 'custom',
fields: {
iam_role_arn: {
label: 'IAM Role ARN',
description:
'IAM role ARN with write permissions to the S3 bucket. Format: arn:aws:iam::account-id:role/role-name',
type: 'string',
required: true
},
s3_aws_bucket_name: {
label: 'AWS Bucket Name',
description: 'Name of the S3 bucket where the files will be uploaded to.',
type: 'string',
required: true
},
s3_aws_region: {
label: 'AWS Region Code (S3 only)',
description: 'Region Code where the S3 bucket is hosted. See [AWS S3 Documentation](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions)',
type: 'string',
required: true
}
}
},
audienceConfig: {
mode: {
type: 'synced',
full_audience_sync: true
},
async createAudience(_, createAudienceInput) {
const audienceSettings = createAudienceInput.audienceSettings
// @ts-ignore type is not defined, and we will define it later
const personas = audienceSettings.personas as PersonasSettings
if (!personas) {
throw new IntegrationError('Missing computation parameters: Id and Key', 'MISSING_REQUIRED_FIELD', 400)
}

return { externalId: personas.computation_id }
},
async getAudience(_, getAudienceInput) {
const audience_id = getAudienceInput.externalId
if (!audience_id) {
throw new IntegrationError('Missing audience_id value', 'MISSING_REQUIRED_FIELD', 400)
}
return { externalId: audience_id }
}
},
actions: {
syncAudienceToCSV
}
}

export default destination
99 changes: 99 additions & 0 deletions packages/destination-actions/src/destinations/aws-s3/operations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { ExecuteInput } from '@segment/actions-core'
import type { Payload } from './syncAudienceToCSV/generated-types'
import type { AudienceSettings } from './generated-types'

// Type definitions
export type RawData = {
context?: {
personas?: {
computation_key?: string
computation_class?: string
computation_id?: string
}
}
}

export type ExecuteInputRaw<Settings, Payload, RawData, AudienceSettings = unknown> = ExecuteInput<
Settings,
Payload,
AudienceSettings
> & { rawData?: RawData }

function generateFile(payloads: Payload[], audienceSettings: AudienceSettings): string {
const headers: string[] = []
const columnsField = payloads[0].columns
const additionalColumns = payloads[0].additional_identifiers_and_traits_columns ?? []

Object.entries(columnsField).forEach(([_, value]) => {
if (value !== undefined) {
headers.push(value)
}
})

additionalColumns.forEach((additionalColumn) => {
headers.push(additionalColumn.value)
})

const headerString = `${headers.join(audienceSettings.delimiter === 'tab' ? '\t' : audienceSettings.delimiter)}\n`

const rows: string[] = [headerString]

payloads.forEach((payload, index, arr) => {
const action = payload.propertiesOrTraits[payload.audienceName]

const row: string[] = []
if (headers.includes('audience_name')) {
row.push(enquoteIdentifier(String(payload.audienceName ?? '')))
}
if (headers.includes('audience_id')) {
row.push(enquoteIdentifier(String(payload.audienceId ?? '')))
}
if (headers.includes('audience_action')) {
row.push(enquoteIdentifier(String(action ?? '')))
}
if (headers.includes('email')) {
row.push(enquoteIdentifier(String(payload.email ?? '')))
}
if (headers.includes('user_id')) {
row.push(enquoteIdentifier(String(payload.userId ?? '')))
}
if (headers.includes('anonymous_id')) {
row.push(enquoteIdentifier(String(payload.anonymousId ?? '')))
}
if (headers.includes('timestamp')) {
row.push(enquoteIdentifier(String(payload.timestamp ?? '')))
}
if (headers.includes('message_id')) {
row.push(enquoteIdentifier(String(payload.messageId ?? '')))
}
if (headers.includes('space_id')) {
row.push(enquoteIdentifier(String(payload.spaceId ?? '')))
}
if (headers.includes('integrations_object')) {
row.push(enquoteIdentifier(String(JSON.stringify(payload.integrationsObject) ?? '')))
}
if (headers.includes('properties_or_traits')) {
row.push(enquoteIdentifier(String(JSON.stringify(payload.propertiesOrTraits) ?? '')))
}

additionalColumns.forEach((additionalColumn) => {
//row.push(enquoteIdentifier(String(JSON.stringify(payload.propertiesOrTraits[additionalColumn.key]) ?? '')))
row.push(enquoteIdentifier(String(JSON.stringify(additionalColumn.key) ?? '')))
})

const isLastRow = arr.length === index + 1
const rowString = `${row.join(audienceSettings.delimiter === 'tab' ? '\t' : audienceSettings.delimiter)}${
isLastRow ? '' : '\n'
}`

rows.push(rowString)
})

return rows.join('')
}

function enquoteIdentifier(str: string) {
return `"${String(str).replace(/"/g, '""')}"`
}

export { generateFile, enquoteIdentifier }
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const ACTION_SLUG = 'actions-s3-csv'
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
describe('Aws S3', () => {
describe('uploadCsv', () => {
it('upload CSV', async () => {
expect(true).toBe(true)
})
})
})

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4d3d6c3

Please sign in to comment.