Skip to content

Commit

Permalink
basic SSL config options (#1908)
Browse files Browse the repository at this point in the history
  • Loading branch information
joe-ayoub-segment authored Mar 5, 2024
1 parent 726d281 commit a77f4a7
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 31 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 31 additions & 9 deletions packages/destination-actions/src/destinations/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,51 +14,73 @@ const destination: DestinationDefinition<Settings> = {
brokers: {
label: 'Brokers',
description:
'The brokers for your Kafka instance, in the format of `host:port`. Accepts a comma delimited string.',
'The brokers for your Kafka instance, in the format of `host:port`. E.g. localhost:9092. Accepts a comma delimited string.',
type: 'string',
required: true
},
mechanism: {
label: 'SASL Authentication Mechanism',
description: 'The SASL Authentication Mechanism for your Kafka instance.',
description: 'The Authentication Mechanism for your Kafka instance.',
type: 'string',
required: true,
choices: [
{ label: 'Plain', value: 'plain' },
{ label: 'SCRAM/SHA-256', value: 'scram-sha-256' },
{ label: 'SCRAM/SHA-512', value: 'scram-sha-512' }
{ label: 'SCRAM/SHA-512', value: 'scram-sha-512' },
{ label: 'AWS IAM', value: 'aws' }
],
default: 'plain'
},
clientId: {
label: 'Client ID',
description: 'The client ID for your Kafka instance. Defaults to "segment-actions-kafka-producer".',
description: "The client ID for your Kafka instance. Defaults to 'segment-actions-kafka-producer'.",
type: 'string',
required: true,
default: 'segment-actions-kafka-producer'
},
username: {
label: 'Username',
description: 'The username for your Kafka instance.',
label: 'Username or IAM Access Key ID',
description:
'The username for your Kafka instance. If using AWS IAM Authentication this should be your AWS Access Key ID.',
type: 'string',
required: true
},
password: {
label: 'Password',
description: 'The password for your Kafka instance.',
label: 'Password or IAM Secret Key',
description:
'The password for your Kafka instance. If using AWS IAM Authentication this should be your AWS Secret Key.',
type: 'password',
required: true
},
partitionerType: {
label: 'Partitioner Type',
description: 'The partitioner type for your Kafka instance. Defaults to "Default Partitioner".',
description: "The partitioner type for your Kafka instance. Defaults to 'Default Partitioner'.",
type: 'string',
required: true,
choices: [
{ label: 'Default Partitioner', value: 'DefaultPartitioner' },
{ label: 'Legacy Partitioner', value: 'LegacyPartitioner' }
],
default: 'DefaultPartitioner'
},
authorizationIdentity: {
label: 'AWS Authorization Identify',
description:
"The aws:userid of the AWS IAM identity. Required if 'SASL Authentication Mechanism' field is set to 'AWS IAM'.",
type: 'string',
required: false,
default: ''
},
ssl: {
label: 'SSL Configuration Options',
description: 'Indicates the type of SSL to be used.',
type: 'string',
required: true,
choices: [
{ label: 'No SSL Encryption', value: 'none' },
{ label: 'Default SSL Encryption', value: 'default' }
],
default: 'default'
}
}
},
Expand Down
52 changes: 36 additions & 16 deletions packages/destination-actions/src/destinations/kafka/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Kafka, SASLOptions, ProducerRecord, Partitioners } from 'kafkajs'
import type { DynamicFieldResponse } from '@segment/actions-core'
import { DynamicFieldResponse, IntegrationError, ErrorCodes } from '@segment/actions-core'
import type { Settings } from './generated-types'
import type { Payload } from './send/generated-types'

Expand Down Expand Up @@ -30,26 +30,44 @@ export const getTopics = async (settings: Settings): Promise<DynamicFieldRespons
const getKafka = (settings: Settings) => {
return new Kafka({
clientId: settings.clientId,
brokers: settings.brokers.trim().split(',').map(broker => broker.trim()),
ssl: true,
brokers: settings.brokers
.trim()
.split(',')
.map((broker) => broker.trim()),
ssl: settings.ssl === 'none' ? false : true,
sasl: {
mechanism: settings.mechanism,
username: settings.username,
password: settings.password
...(settings.mechanism === 'aws'
? {
accessKeyId: settings.username,
secretAccessKey: settings.password,
authorizationIdentity: settings.authorizationIdentity
}
: { username: settings.username, password: settings.password })
} as SASLOptions
})
}

const getProducer = (settings: Settings) => {
return getKafka(settings).producer({
createPartitioner:
settings.partitionerType === LEGACY_PARTITIONER
? Partitioners.LegacyPartitioner
: Partitioners.DefaultPartitioner
settings.partitionerType === LEGACY_PARTITIONER ? Partitioners.LegacyPartitioner : Partitioners.DefaultPartitioner
})
}

export const validate = (settings: Settings) => {
if (settings.mechanism === 'aws' && ['', undefined].includes(settings.authorizationIdentity)) {
throw new IntegrationError(
'AWS mechanism requires an authorization identity',
ErrorCodes.INVALID_AUTHENTICATION,
400
)
}
}

export const sendData = async (settings: Settings, payload: Payload[]) => {
validate(settings)

const groupedPayloads: { [topic: string]: Payload[] } = {}

payload.forEach((p) => {
Expand All @@ -62,13 +80,16 @@ export const sendData = async (settings: Settings, payload: Payload[]) => {

const topicMessages: TopicMessages[] = Object.keys(groupedPayloads).map((topic) => ({
topic,
messages: groupedPayloads[topic].map((payload) => ({
value: JSON.stringify(payload.payload),
key: payload.key,
headers: payload?.headers ?? undefined,
partition: payload?.partition ?? payload?.default_partition ?? undefined,
partitionerType: settings.partitionerType
}) as Message)
messages: groupedPayloads[topic].map(
(payload) =>
({
value: JSON.stringify(payload.payload),
key: payload.key,
headers: payload?.headers ?? undefined,
partition: payload?.partition ?? payload?.default_partition ?? undefined,
partitionerType: settings.partitionerType
} as Message)
)
}))

const producer = getProducer(settings)
Expand All @@ -80,5 +101,4 @@ export const sendData = async (settings: Settings, payload: Payload[]) => {
}

await producer.disconnect()

}

0 comments on commit a77f4a7

Please sign in to comment.