Skip to content

Commit

Permalink
[Stratconn 3257] Add batching to segment connections (#1859)
Browse files Browse the repository at this point in the history
* Added batching for all the actions of segment destination

* Added the unit test cases for batching

* Updated the test case for batch happy path

* Updated the test case

* Updated the tag name for batch matrix

* Added enable batching field

* Revert the unwanted changes and updated the perform block

* Revert the unwanted changes

* Revert the unwanted changes

* Changes output text for batch execution

---------

Co-authored-by: Harsh Vardhan <harsh.vardhan@segment.com>
  • Loading branch information
hvardhan-unth and Harsh Vardhan authored Mar 5, 2024
1 parent 53b4441 commit 529e786
Show file tree
Hide file tree
Showing 19 changed files with 541 additions and 157 deletions.
6 changes: 4 additions & 2 deletions packages/core/src/__tests__/batching.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ describe('Batching', () => {
test('basic happy path', async () => {
const destination = new Destination(basicBatch)
const res = await destination.onBatch(events, basicBatchSettings)
expect(res).toEqual(expect.arrayContaining([{ output: 'successfully processed batch of events' }]))
expect(res[0]).toMatchObject({
output: 'Action Executed'
})
})

test('transforms all the payloads based on the subscription mapping', async () => {
Expand Down Expand Up @@ -221,7 +223,7 @@ describe('Batching', () => {
await expect(promise).resolves.toMatchInlineSnapshot(`
Array [
Object {
"output": "successfully processed batch of events",
"output": "Action Executed",
},
]
`)
Expand Down
13 changes: 10 additions & 3 deletions packages/core/src/destination-kit/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
return results
}

async executeBatch(bundle: ExecuteBundle<Settings, InputData[], AudienceSettings>): Promise<void> {
async executeBatch(bundle: ExecuteBundle<Settings, InputData[], AudienceSettings>): Promise<Result[]> {
const results: Result[] = [{ output: 'Action Executed' }]

if (!this.hasBatchSupport) {
throw new IntegrationError('This action does not support batched requests.', 'NotImplemented', 501)
}
Expand All @@ -311,7 +313,7 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
}

if (payloads.length === 0) {
return
return results
}

if (this.definition.performBatch) {
Expand All @@ -329,8 +331,13 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
transactionContext: bundle.transactionContext,
stateContext: bundle.stateContext
}
await this.performRequest(this.definition.performBatch, data)
const output = await this.performRequest(this.definition.performBatch, data)
results[0].data = output as JSONObject

return results
}

return results
}

async executeDynamicField(
Expand Down
4 changes: 1 addition & 3 deletions packages/core/src/destination-kit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
audienceSettings = events[0].context?.personas?.audience_settings as AudienceSettings
}

await action.executeBatch({
return action.executeBatch({
mapping,
data: events as unknown as InputData[],
settings,
Expand All @@ -616,8 +616,6 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
transactionContext,
stateContext
})

return [{ output: 'successfully processed batch of events' }]
}

public async executeDynamicField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,10 @@ export const traits: InputField = {
defaultObjectUI: 'keyvalue',
additionalProperties: true
}

export const enable_batching: InputField = {
type: 'boolean',
label: 'Batch Data to segment',
description: 'When enabled, the action will send batch data. Segment accepts batches of up to 225 events.',
default: true
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import nock from 'nock'
import { createTestEvent, createTestIntegration } from '@segment/actions-core'
import { createTestEvent, createTestIntegration, SegmentEvent } from '@segment/actions-core'
import Destination from '../../index'
import { MissingUserOrAnonymousIdThrowableError } from '../../errors'

Expand Down Expand Up @@ -81,4 +81,59 @@ describe('Segment.sendGroup', () => {
]
})
})

it('should work with batch events', async () => {
const events: SegmentEvent[] = [
createTestEvent({
traits: {
name: 'Example Corp',
industry: 'Technology'
},
userId: 'test-user-ufi5bgkko5',
anonymousId: 'arky4h2sh7k',
groupId: 'test-group-ks2i7e'
}),
createTestEvent({
traits: {
name: 'Example Corp',
industry: 'Technology'
},
userId: 'test-user-ufi5bgkko5',
groupId: 'test-group-ks2i7e'
})
]

const responses = await testDestination.testBatchAction('sendGroup', {
events,
mapping: defaultGroupMapping,
settings: {
source_write_key: 'test-source-write-key'
}
})

const results = testDestination.results
expect(responses.length).toBe(0)
expect(results.length).toBe(1)
expect(results[0].data).toMatchObject({
batch: [
{
userId: events[0].userId,
anonymousId: events[0].anonymousId,
groupId: events[0].groupId,
traits: {
...events[0].traits
},
context: {}
},
{
userId: events[1].userId,
groupId: events[0].groupId,
traits: {
...events[0].traits
},
context: {}
}
]
})
})
})

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import {
screen,
user_agent,
timezone,
traits
traits,
enable_batching
} from '../segment-properties'
import { MissingUserOrAnonymousIdThrowableError } from '../errors'

Expand All @@ -43,42 +44,58 @@ const action: ActionDefinition<Settings, Payload> = {
screen,
user_agent,
timezone,
traits
traits,
enable_batching
},
perform: (_request, { payload, statsContext }) => {
if (!payload.anonymous_id && !payload.user_id) {
throw MissingUserOrAnonymousIdThrowableError
}

const groupPayload: Object = {
userId: payload?.user_id,
anonymousId: payload?.anonymous_id,
groupId: payload?.group_id,
timestamp: payload?.timestamp,
context: {
app: payload?.application,
campaign: payload?.campaign_parameters,
device: payload?.device,
ip: payload?.ip_address,
locale: payload?.locale,
location: payload?.location,
network: payload?.network,
os: payload?.operating_system,
page: payload?.page,
screen: payload?.screen,
userAgent: payload?.user_agent,
timezone: payload?.timezone
},
traits: {
...payload?.traits
},
type: 'group'
}
const groupPayload: Object = convertPayload(payload)

// Returns transformed payload without snding it to TAPI endpoint.
// The payload will be sent to Segment's tracking API internally.
statsContext?.statsClient?.incr('tapi_internal', 1, [...statsContext.tags, 'action:sendGroup'])
return { batch: [groupPayload] }
},
performBatch: (_request, { payload, statsContext }) => {
const groupPayload = payload.map((data) => {
if (!data.anonymous_id && !data.user_id) {
throw MissingUserOrAnonymousIdThrowableError
}
return convertPayload(data)
})

statsContext?.statsClient?.incr('tapi_internal', 1, [...statsContext.tags, 'action:sendBatchGroup'])
return { batch: groupPayload }
}
}

function convertPayload(data: Payload) {
return {
userId: data?.user_id,
anonymousId: data?.anonymous_id,
groupId: data?.group_id,
timestamp: data?.timestamp,
context: {
app: data?.application,
campaign: data?.campaign_parameters,
device: data?.device,
ip: data?.ip_address,
locale: data?.locale,
location: data?.location,
network: data?.network,
os: data?.operating_system,
page: data?.page,
screen: data?.screen,
userAgent: data?.user_agent,
timezone: data?.timezone
},
traits: {
...data?.traits
},
type: 'group'
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import nock from 'nock'
import { createTestEvent, createTestIntegration } from '@segment/actions-core'
import { createTestEvent, createTestIntegration, SegmentEvent } from '@segment/actions-core'
import Destination from '../../index'
import { MissingUserOrAnonymousIdThrowableError } from '../../errors'

Expand Down Expand Up @@ -73,4 +73,57 @@ describe('Segment.sendIdentify', () => {
]
})
})

it('should work with batch events', async () => {
const events: SegmentEvent[] = [
createTestEvent({
type: 'identify',
traits: {
name: 'Test User',
email: 'test-user@test-company.com'
},
userId: 'test-user-ufi5bgkko5',
anonymousId: 'arky4h2sh7k'
}),
createTestEvent({
type: 'identify',
traits: {
name: 'Test User',
email: 'test-user@test-company.com'
},
userId: 'test-user-ufi5bgkko5'
})
]

const responses = await testDestination.testBatchAction('sendIdentify', {
events,
mapping: defaultIdentifyMapping,
settings: {
source_write_key: 'test-source-write-key'
}
})

const results = testDestination.results
expect(responses.length).toBe(0)
expect(results.length).toBe(1)
expect(results[0].data).toMatchObject({
batch: [
{
userId: events[0].userId,
anonymousId: events[0].anonymousId,
traits: {
...events[0].traits
},
context: {}
},
{
userId: events[1].userId,
traits: {
...events[1].traits
},
context: {}
}
]
})
})
})

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 529e786

Please sign in to comment.