Skip to content
This repository has been archived by the owner on Jul 9, 2024. It is now read-only.

Commit

Permalink
test(e2e): auto-reconnect devices
Browse files Browse the repository at this point in the history
  • Loading branch information
coderbyheart committed Sep 1, 2021
1 parent e3c0146 commit cad5c60
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 63 deletions.
113 changes: 64 additions & 49 deletions feature-runner/steps/asset-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import { createDeviceCertificate } from '../../cli/jitp/createDeviceCertificate'
import { mqtt, io, iot, iotshadow, iotjobs } from 'aws-iot-device-sdk-v2'
import { deviceFileLocations } from '../../cli/jitp/deviceFileLocations'
import { expect } from 'chai'
import { readFileSync } from 'fs'
import { TextDecoder } from 'util'
import { createSimulatorKeyAndCSR } from '../../cli/jitp/createSimulatorKeyAndCSR'
import { promises as fs } from 'fs'
import { promises as fs, readFileSync } from 'fs'
import { retry } from '../../cli/commands/retry'

const decoder = new TextDecoder('utf8')

Expand All @@ -23,56 +23,70 @@ io.enable_logging(
)
const clientBootstrap = new io.ClientBootstrap()

const connect = async ({
clientCert,
privateKey,
clientId,
mqttEndpoint,
}: {
clientCert: string
privateKey: string
clientId: string
mqttEndpoint: string
}) =>
new Promise<mqtt.MqttClientConnection>((resolve, reject) => {
const cfg = iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder(
clientCert,
privateKey,
)
cfg.with_clean_session(true)
cfg.with_client_id(clientId)
cfg.with_endpoint(mqttEndpoint)
const client = new mqtt.MqttClient(clientBootstrap)
const connection = client.new_connection(cfg.build())
connection.on('error', (err) => {
console.error(err)
reject(err)
})
connection.on('connect', () => {
console.log('CONNECTED')
resolve(connection)
})
connection.connect().catch((err) => {
console.debug(`Failed to connect.`)
console.error(err)
})
})

const awsIotThingMQTTConnection = ({
mqttEndpoint,
certsDir,
}: {
mqttEndpoint: string
certsDir: string
}) => {
const connections: Record<
string,
{
connection: mqtt.MqttClientConnection
shadow: iotshadow.IotShadowClient
}
> = {}
return async (
clientId: string,
): Promise<{
connection: mqtt.MqttClientConnection
shadow: iotshadow.IotShadowClient
}> => {
const connections: Record<string, Promise<mqtt.MqttClientConnection>> = {}
return async (clientId: string): Promise<mqtt.MqttClientConnection> => {
if (connections[clientId] === undefined) {
console.debug(`Creating new connection for ${clientId}...`)
const deviceFiles = deviceFileLocations({
certsDir,
deviceId: clientId,
})
const { privateKey, clientCert } = JSON.parse(
readFileSync(deviceFiles.simulatorJSON, 'utf-8'),
)
const cfg = iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder(
clientCert,
privateKey,
const [privateKey, clientCert] = [
readFileSync(deviceFiles.key, 'utf-8'),
readFileSync(deviceFiles.certWithCA, 'utf-8'),
]
connections[clientId] = retry<mqtt.MqttClientConnection>(
10,
() => 5000,
)(async () =>
connect({
clientCert,
privateKey,
clientId,
mqttEndpoint,
}),
)
cfg.with_clean_session(true)
cfg.with_client_id(clientId)
cfg.with_endpoint(mqttEndpoint)
const client = new mqtt.MqttClient(clientBootstrap)
const connection = client.new_connection(cfg.build())
connection.on('error', (err) => {
console.error(err)
})
try {
await connection.connect()
connections[clientId] = {
connection,
shadow: new iotshadow.IotShadowClient(connection),
}
} catch (err) {
console.error(err)
}
}

return connections[clientId]
Expand Down Expand Up @@ -156,7 +170,7 @@ export const assetTrackerStepRunners = ({
regexMatcher<World>(/^I disconnect the tracker(?: "([^"]+)")?$/)(
async ([deviceId], __, runner) => {
const catId = deviceId ?? runner.store['tracker:id']
const { connection } = await iotConnect(catId)
const connection = await iotConnect(catId)
await runner.progress('IoT > disconnect')
await connection.disconnect()
await new Promise((resolve) => setTimeout(resolve, 5000, []))
Expand All @@ -170,7 +184,8 @@ export const assetTrackerStepRunners = ({
}
const reported = JSON.parse(step.interpolatedArgument)
const catId = deviceId ?? runner.store['tracker:id']
const { connection, shadow } = await iotConnect(catId)
const connection = await iotConnect(catId)
const shadow = new iotshadow.IotShadowClient(connection)

const updatePromise = await new Promise((resolve, reject) => {
const timeout = setTimeout(reject, 10 * 1000)
Expand Down Expand Up @@ -222,7 +237,7 @@ export const assetTrackerStepRunners = ({
throw new Error('Must provide argument!')
}
const message = JSON.parse(step.interpolatedArgument)
const { connection } = await iotConnect(catId)
const connection = await iotConnect(catId)
const publishPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(reject, 10 * 1000)
connection.on('error', (err: any) => {
Expand All @@ -244,7 +259,7 @@ export const assetTrackerStepRunners = ({
)(async ({ deviceId, messageCount, raw, topic, storeName }, _, runner) => {
const catId = deviceId ?? runner.store['tracker:id']
const isRaw = raw !== undefined
const { connection } = await iotConnect(catId)
const connection = await iotConnect(catId)

const expectedMessageCount =
messageCount === 'a' ? 1 : parseInt(messageCount, 10)
Expand All @@ -261,7 +276,7 @@ export const assetTrackerStepRunners = ({
)
}, 60 * 1000)

connection.subscribe(
void connection.subscribe(
topic,
mqtt.QoS.AtLeastOnce,
async (topic: string, payload: ArrayBuffer) => {
Expand All @@ -273,7 +288,7 @@ export const assetTrackerStepRunners = ({
await runner.progress(`Iot`, message)
if (messages.length === expectedMessageCount) {
clearTimeout(timeout)
connection.unsubscribe(topic)
void connection.unsubscribe(topic)
const result = messages.length > 1 ? messages : messages[0]

if (storeName !== undefined) runner.store[storeName] = result
Expand Down Expand Up @@ -304,7 +319,7 @@ export const assetTrackerStepRunners = ({
/^the tracker(?: (?<deviceId>[^ ]+))? fetches the next job into "(?<storeName>[^"]+)"$/,
)(async ({ deviceId, storeName }, _, runner) => {
const catId = deviceId ?? runner.store['tracker:id']
const { connection } = await iotConnect(catId)
const connection = await iotConnect(catId)
const jobsClient = new iotjobs.IotJobsClient(connection)

return new Promise((resolve, reject) => {
Expand All @@ -329,8 +344,8 @@ export const assetTrackerStepRunners = ({
return reject(error)
}
clearTimeout(timeout)
runner.store[storeName] = job
resolve(job)
runner.store[storeName] = job?.queuedJobs?.[0]
resolve(job?.queuedJobs?.[0])
},
)
.then(async () =>
Expand All @@ -350,7 +365,7 @@ export const assetTrackerStepRunners = ({
const catId = deviceId ?? runner.store['tracker:id']
const job = runner.store[storeName]
expect(job).to.not.be.an('undefined')
const { connection } = await iotConnect(catId)
const connection = await iotConnect(catId)
const jobsClient = new iotjobs.IotJobsClient(connection)

return new Promise<void>((resolve, reject) => {
Expand Down
11 changes: 1 addition & 10 deletions features/ConnectTracker.feature
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
@Only
Feature: Connect a tracker
As a user
I can Connect a tracker

Scenario: Generate a certificate and connect

Given I generate a certificate

Scenario: Connect the tracker

We use just-in-time-provisioning so this scenarion is expected to be
retried, because the AWS IoT endpoint will disconnect a new device when
it first connects.
See https://docs.aws.amazon.com/iot/latest/developerguide/jit-provisioning.html

Given I connect the tracker
Then I connect the tracker
1 change: 0 additions & 1 deletion features/DeleteTrackers.feature
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
@Last
@Only
Feature: Delete trackers
As a user
I can delete trackers
Expand Down
1 change: 0 additions & 1 deletion features/DeviceUpdateShadow.feature
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
@Only
Feature: Device: Update Shadow
Devices can update their shadow

Expand Down
3 changes: 1 addition & 2 deletions features/FOTA.feature
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ Feature: Device Firmware Upgrade over the air
Then "job" should match this JSON
"""
{
"jobId": "{jobId}",
"status": "QUEUED"
"jobId": "{jobId}"
}
"""
And the tracker marks the job in "job" as in progress
Expand Down

0 comments on commit cad5c60

Please sign in to comment.