Skip to content

Commit

Permalink
feat: ucan stream consumer space store add and remove count (#160)
Browse files Browse the repository at this point in the history
Part of metrics work #117

Adds space counters for `store/add` and `store/remove`.
  • Loading branch information
vasco-santos authored Mar 16, 2023
1 parent 370651c commit 7728718
Show file tree
Hide file tree
Showing 16 changed files with 812 additions and 59 deletions.
2 changes: 2 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 46 additions & 7 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ export function UcanInvocationStack({ stack, app }) {
handler: 'functions/metrics-store-add-size-total.consumer',
deadLetterQueue: metricsStoreAddSizeTotalDLQ.cdk.queue,
})

// metrics upload/remove count
const metricsUploadRemoveTotalDLQ = new Queue(stack, 'metrics-upload-remove-total-dlq')
const metricsUploadRemoveTotalConsumer = new Function(stack, 'metrics-upload-remove-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-upload-remove-total.consumer',
deadLetterQueue: metricsUploadRemoveTotalDLQ.cdk.queue,
})

// metrics per space
const spaceMetricsDLQ = new Queue(stack, 'space-metrics-dlq')
Expand All @@ -98,17 +109,25 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// metrics upload/remove count
const metricsUploadRemoveTotalDLQ = new Queue(stack, 'metrics-upload-remove-total-dlq')
const metricsUploadRemoveTotalConsumer = new Function(stack, 'metrics-upload-remove-total-consumer', {
// store/add count
const spaceMetricsStoreAddTotalConsumer = new Function(stack, 'space-metrics-store-add-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
TABLE_NAME: spaceMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-upload-remove-total.consumer',
deadLetterQueue: metricsUploadRemoveTotalDLQ.cdk.queue,
permissions: [spaceMetricsTable],
handler: 'functions/space-metrics-store-add-total.consumer',
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// store/remove count
const spaceMetricsStoreRemoveTotalConsumer = new Function(stack, 'space-metrics-store-remove-total-consumer', {
environment: {
TABLE_NAME: spaceMetricsTable.tableName
},
permissions: [spaceMetricsTable],
handler: 'functions/space-metrics-store-remove-total.consumer',
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// create a kinesis stream
const ucanStream = new KinesisStream(stack, 'ucan-stream', {
Expand Down Expand Up @@ -161,6 +180,26 @@ export function UcanInvocationStack({ stack, app }) {
...(getKinesisEventSourceConfig(stack))
}
}
},
spaceMetricsStoreAddTotalConsumer: {
function: spaceMetricsStoreAddTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
spaceMetricsStoreRemoveTotalConsumer: {
function: spaceMetricsStoreRemoveTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
}
},
})
Expand Down
4 changes: 4 additions & 0 deletions test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ test('w3infra integration flow', async t => {
}
// Get space metrics before upload
const spaceBeforeUploadAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL)
const spaceBeforeStoreAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.STORE_ADD_TOTAL)

// Get metrics before upload
const beforeOperationMetrics = await getMetrics(t)
Expand Down Expand Up @@ -192,19 +193,22 @@ test('w3infra integration flow', async t => {
const afterStoreAddTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_TOTAL)
const afterStoreAddSizeTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
const spaceAfterUploadAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL)
const spaceAfterStoreAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.STORE_ADD_TOTAL)

// If staging accept more broad condition given multiple parallel tests can happen there
if (stage === 'staging') {
return (
afterStoreAddTotal?.value >= beforeStoreAddTotal?.value + 1 &&
afterStoreAddSizeTotal?.value >= beforeStoreAddSizeTotal.value + carSize &&
spaceAfterStoreAddMetrics?.value >= spaceBeforeStoreAddMetrics?.value + 1 &&
spaceAfterUploadAddMetrics?.value >= spaceBeforeUploadAddMetrics?.value + 1
)
}

return (
afterStoreAddTotal?.value === beforeStoreAddTotal?.value + 1 &&
afterStoreAddSizeTotal?.value === beforeStoreAddSizeTotal.value + carSize &&
spaceAfterStoreAddMetrics?.value === spaceBeforeStoreAddMetrics?.value + 1 &&
spaceAfterUploadAddMetrics?.value === spaceBeforeUploadAddMetrics?.value + 1
)
})
Expand Down
17 changes: 13 additions & 4 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import {
add as storeAdd,
remove as storeRemove
} from '@web3-storage/capabilities/store'
import {
add as uploadAdd,
remove as uploadRemove
} from '@web3-storage/capabilities/upload'

// UCAN protocol
export const STORE_ADD = 'store/add'
export const STORE_REMOVE = 'store/remove'
export const UPLOAD_ADD = 'upload/add'
export const UPLOAD_REMOVE = 'upload/remove'
export const STORE_ADD = storeAdd.can
export const STORE_REMOVE = storeRemove.can
export const UPLOAD_ADD = uploadAdd.can
export const UPLOAD_REMOVE = uploadRemove.can

// Admin Metrics
export const METRICS_NAMES = {
Expand Down
52 changes: 52 additions & 0 deletions ucan-invocation/functions/space-metrics-store-add-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const STORE_ADD = 'store/add'
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @typedef {object} IncrementInput
* @property {`did:${string}:${string}`} space
* @property {number} count
*/

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateStoreCount(ucanInvocations, {
spaceMetricsTable: createSpaceMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').SpaceMetricsTableCtx} ctx
*/
export async function updateStoreCount (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementStoreAddCount(invocationsWithStoreAdd)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
52 changes: 52 additions & 0 deletions ucan-invocation/functions/space-metrics-store-remove-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @typedef {object} IncrementInput
* @property {`did:${string}:${string}`} space
* @property {number} count
*/

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateStoreCount(ucanInvocations, {
spaceMetricsTable: createSpaceMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').SpaceMetricsTableCtx} ctx
*/
export async function updateStoreCount (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementStoreRemoveCount(invocationsWithStoreRemove)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
1 change: 1 addition & 0 deletions ucan-invocation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"@aws-sdk/client-dynamodb": "^3.226.0",
"@aws-sdk/client-eventbridge": "^3.218.0",
"@sentry/serverless": "^7.22.0",
"@web3-storage/capabilities": "^3.2.0",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
Expand Down
Loading

0 comments on commit 7728718

Please sign in to comment.