Skip to content

Commit

Permalink
feat: ucan stream consumer space upload remove count
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Mar 14, 2023
1 parent c1f9b79 commit 71a0013
Show file tree
Hide file tree
Showing 12 changed files with 451 additions and 102 deletions.
35 changes: 27 additions & 8 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ export function UcanInvocationStack({ stack, app }) {
handler: 'functions/metrics-store-add-size-total.consumer',
deadLetterQueue: metricsStoreAddSizeTotalDLQ.cdk.queue,
})

// metrics upload/remove count
const metricsUploadRemoveTotalDLQ = new Queue(stack, 'metrics-upload-remove-total-dlq')
const metricsUploadRemoveTotalConsumer = new Function(stack, 'metrics-upload-remove-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-upload-remove-total.consumer',
deadLetterQueue: metricsUploadRemoveTotalDLQ.cdk.queue,
})

// metrics per space
const spaceMetricsDLQ = new Queue(stack, 'space-metrics-dlq')
Expand All @@ -98,18 +109,16 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// metrics upload/remove count
const metricsUploadRemoveTotalDLQ = new Queue(stack, 'metrics-upload-remove-total-dlq')
const metricsUploadRemoveTotalConsumer = new Function(stack, 'metrics-upload-remove-total-consumer', {
// upload/remove count
const spaceMetricsUploadRemoveTotalConsumer = new Function(stack, 'space-metrics-upload-remove-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
TABLE_NAME: spaceMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-upload-remove-total.consumer',
deadLetterQueue: metricsUploadRemoveTotalDLQ.cdk.queue,
permissions: [spaceMetricsTable],
handler: 'functions/space-metrics-upload-remove-total.consumer',
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})


// create a kinesis stream
const ucanStream = new KinesisStream(stack, 'ucan-stream', {
cdk: {
Expand Down Expand Up @@ -161,6 +170,16 @@ export function UcanInvocationStack({ stack, app }) {
...(getKinesisEventSourceConfig(stack))
}
}
},
spaceMetricsUploadRemoveTotalConsumer: {
function: spaceMetricsUploadRemoveTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
}
},
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_ADD } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const UPLOAD_ADD = 'upload/add'
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
Expand Down
46 changes: 46 additions & 0 deletions ucan-invocation/functions/space-metrics-upload-remove-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_REMOVE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateUploadCount(ucanInvocations, {
spaceMetricsTable: createSpaceMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').SpaceMetricsTableCtx} ctx
*/
export async function updateUploadCount (ucanInvocations, ctx) {
const invocationsWithUploadRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE)
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementUploadRemoveCount(invocationsWithUploadRemove)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
121 changes: 85 additions & 36 deletions ucan-invocation/tables/space-metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { MAX_TRANSACT_WRITE_ITEMS } from './constants.js'
*/

/**
* Abstraction layer to handle operations on Space Table.
* Abstraction layer to handle operations on Space Metrics Table.
*
* @param {string} region
* @param {string} tableName
Expand All @@ -33,51 +33,45 @@ export function createSpaceMetricsTable (region, tableName, options = {}) {

return {
/**
* Increment accumulated count from upload add operations.
* Increment accumulated count from upload/add operations.
*
* @param {Capabilities} uploadAddInv
*/
incrementUploadAddCount: async (uploadAddInv) => {
// Merge same space operations into single one and transform into transaction format
// We cannot have multiple operations in a TransactWrite with same key, and we
// decrease the probability of reaching the maximum number of transactions.
const updateInputTransactions = uploadAddInv.reduce((acc, c) => {
const existing = acc?.find((e) => c.with === e.space)
if (existing) {
existing.value += 1
} else {
acc.push({
// @ts-expect-error
space: c.with,
value: 1
})
}
return acc
}, /** @type {UpdateInput[]} */ ([]))

if (updateInputTransactions.length >= MAX_TRANSACT_WRITE_ITEMS) {
throw new Error(`Attempting to increment space count for more than allowed transactions: ${updateInputTransactions.length}`)
const updateInputTransactions = normalizeInvocationsPerSpaceOccurence(uploadAddInv)
if (!updateInputTransactions.length) {
return
}

const transactItems = getItemsToIncrementForMetric(
updateInputTransactions,
tableName,
SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL
)

const cmd = new TransactWriteItemsCommand({
TransactItems: transactItems
})

await dynamoDb.send(cmd)
},
/**
* Increment accumulated count from store/remove operations.
*
* @param {Capabilities} storeRemoveInv
*/
incrementUploadRemoveCount: async (storeRemoveInv) => {
const updateInputTransactions = normalizeInvocationsPerSpaceOccurence(storeRemoveInv)
if (!updateInputTransactions.length) {
return
}

/** @type {import('@aws-sdk/client-dynamodb').TransactWriteItem[]} */
const transactItems = updateInputTransactions.map(item => ({
Update: {
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(item.value) },
},
Key: marshall({
space: item.space,
name: SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL
}),
}
}))
const transactItems = getItemsToIncrementForMetric(
updateInputTransactions,
tableName,
SPACE_METRICS_NAMES.UPLOAD_REMOVE_TOTAL
)

const cmd = new TransactWriteItemsCommand({
TransactItems: transactItems
})
Expand All @@ -86,3 +80,58 @@ export function createSpaceMetricsTable (region, tableName, options = {}) {
}
}
}

/**
* Get items to increment for metric.
*
* @param {UpdateInput[]} items
* @param {string} tableName
* @param {string} metricName
* @returns {import('@aws-sdk/client-dynamodb').TransactWriteItem[]}
*/
function getItemsToIncrementForMetric (items, tableName, metricName) {
return items.map(item => ({
Update: {
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(item.value) },
},
Key: marshall({
space: item.space,
name: metricName
}),
}
}))
}

/**
* Merge same space operations into single one and transform into transaction format
* We cannot have multiple operations in a TransactWrite with same key, and we
* decrease the probability of reaching the maximum number of transactions.
*
* @param {Capabilities} inv
* @returns {UpdateInput[]}
*/
function normalizeInvocationsPerSpaceOccurence (inv) {
const res = inv.reduce((acc, c) => {
const existing = acc?.find((e) => c.with === e.space)
if (existing) {
existing.value += 1
} else {
acc.push({
// @ts-expect-error
space: c.with,
value: 1
})
}
return acc
}, /** @type {UpdateInput[]} */ ([]))

if (res.length >= MAX_TRANSACT_WRITE_ITEMS) {
throw new Error(`Attempting to increment space count for more than allowed transactions: ${res.length}`)
}

return res
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import * as StoreCapabilities from '@web3-storage/capabilities/store'
import { createDynamodDb } from '../helpers/resources.js'
import { createSpace } from '../helpers/ucanto.js'
import { randomCAR } from '../helpers/random.js'
import { createDynamoAdminMetricsTable, getItemFromTable} from '../helpers/tables.js'
import { createDynamoTable, getItemFromTable} from '../helpers/tables.js'
import { adminMetricsTableProps } from '../../tables/index.js'

import { updateSizeTotal } from '../../functions/metrics-store-add-size-total.js'
import { createMetricsTable } from '../../tables/metrics.js'
Expand Down Expand Up @@ -116,7 +117,7 @@ test('handles batch of single invocations with multiple store/add attributes', a
*/
async function prepareResources (dynamoClient) {
const [ tableName ] = await Promise.all([
createDynamoAdminMetricsTable(dynamoClient),
createDynamoTable(dynamoClient, adminMetricsTableProps),
])

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import * as StoreCapabilities from '@web3-storage/capabilities/store'
import { createDynamodDb } from '../helpers/resources.js'
import { createSpace } from '../helpers/ucanto.js'
import { randomCAR } from '../helpers/random.js'
import { createDynamoAdminMetricsTable, getItemFromTable} from '../helpers/tables.js'
import { createDynamoTable, getItemFromTable} from '../helpers/tables.js'
import { adminMetricsTableProps } from '../../tables/index.js'

import { updateStoreAddTotal } from '../../functions/metrics-store-add-total.js'
import { createMetricsTable } from '../../tables/metrics.js'
Expand Down Expand Up @@ -157,7 +158,7 @@ test('handles a batch of single invocation without store/add', async t => {
*/
async function prepareResources (dynamoClient) {
const [ tableName ] = await Promise.all([
createDynamoAdminMetricsTable(dynamoClient),
createDynamoTable(dynamoClient, adminMetricsTableProps),
])

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import * as StoreCapabilities from '@web3-storage/capabilities/store'
import { createDynamodDb } from '../helpers/resources.js'
import { createSpace } from '../helpers/ucanto.js'
import { randomCAR } from '../helpers/random.js'
import { createDynamoAdminMetricsTable, getItemFromTable} from '../helpers/tables.js'
import { createDynamoTable, getItemFromTable} from '../helpers/tables.js'
import { adminMetricsTableProps } from '../../tables/index.js'

import { updateStoreRemoveTotal } from '../../functions/metrics-store-remove-total.js'
import { createMetricsTable } from '../../tables/metrics.js'
Expand Down Expand Up @@ -157,7 +158,7 @@ test('handles a batch of single invocation without store/remove', async t => {
*/
async function prepareResources (dynamoClient) {
const [ tableName ] = await Promise.all([
createDynamoAdminMetricsTable(dynamoClient),
createDynamoTable(dynamoClient, adminMetricsTableProps),
])

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import * as UploadCapabilities from '@web3-storage/capabilities/upload'
import { createDynamodDb } from '../helpers/resources.js'
import { createSpace } from '../helpers/ucanto.js'
import { randomCAR } from '../helpers/random.js'
import { createDynamoAdminMetricsTable, getItemFromTable} from '../helpers/tables.js'
import { createDynamoTable, getItemFromTable} from '../helpers/tables.js'
import { adminMetricsTableProps } from '../../tables/index.js'

import { updateUploadRemoveTotal } from '../../functions/metrics-upload-remove-total.js'
import { createMetricsTable } from '../../tables/metrics.js'
Expand Down Expand Up @@ -157,7 +158,7 @@ test('handles a batch of single invocation without upload/remove', async t => {
*/
async function prepareResources (dynamoClient) {
const [ tableName ] = await Promise.all([
createDynamoAdminMetricsTable(dynamoClient),
createDynamoTable(dynamoClient, adminMetricsTableProps),
])

return {
Expand Down
Loading

0 comments on commit 71a0013

Please sign in to comment.