Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ucan stream consumer space upload remove count #163

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,7 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: metricsStoreAddSizeTotalDLQ.cdk.queue,
})

// metrics upload/add count
const metricsUploadAddTotalDLQ = new Queue(stack, 'metrics-upload-add-total-dlq')
const metricsUploadAddTotalConsumer = new Function(stack, 'metrics-upload-add-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-upload-add-total.consumer',
deadLetterQueue: metricsUploadAddTotalDLQ.cdk.queue,
})

// store/remove size
// metrics store/remove size
const metricsStoreRemoveSizeTotalDLQ = new Queue(stack, 'metrics-store-remove-size-total-dlq')
const metricsStoreRemoveSizeTotalConsumer = new Function(stack, 'metrics-store-remove-size-total-consumer', {
environment: {
Expand All @@ -110,6 +99,17 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: metricsStoreRemoveSizeTotalDLQ.cdk.queue,
})

// metrics upload/add count
const metricsUploadAddTotalDLQ = new Queue(stack, 'metrics-upload-add-total-dlq')
const metricsUploadAddTotalConsumer = new Function(stack, 'metrics-upload-add-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-upload-add-total.consumer',
deadLetterQueue: metricsUploadAddTotalDLQ.cdk.queue,
})

// metrics upload/remove count
const metricsUploadRemoveTotalDLQ = new Queue(stack, 'metrics-upload-remove-total-dlq')
const metricsUploadRemoveTotalConsumer = new Function(stack, 'metrics-upload-remove-total-consumer', {
Expand All @@ -134,6 +134,16 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// upload/remove count
const spaceMetricsUploadRemoveTotalConsumer = new Function(stack, 'space-metrics-upload-remove-total-consumer', {
environment: {
TABLE_NAME: spaceMetricsTable.tableName
},
permissions: [spaceMetricsTable],
handler: 'functions/space-metrics-upload-remove-total.consumer',
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// store/add count
const spaceMetricsStoreAddTotalConsumer = new Function(stack, 'space-metrics-store-add-total-consumer', {
environment: {
Expand Down Expand Up @@ -283,6 +293,16 @@ export function UcanInvocationStack({ stack, app }) {
...(getKinesisEventSourceConfig(stack))
}
}
},
spaceMetricsUploadRemoveTotalConsumer: {
function: spaceMetricsUploadRemoveTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
}
},
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_ADD } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const UPLOAD_ADD = 'upload/add'
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
Expand Down
46 changes: 46 additions & 0 deletions ucan-invocation/functions/space-metrics-upload-remove-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_REMOVE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateUploadCount(ucanInvocations, {
spaceMetricsTable: createSpaceMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').SpaceMetricsTableCtx} ctx
*/
export async function updateUploadCount (ucanInvocations, ctx) {
const invocationsWithUploadRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE)
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementUploadRemoveCount(invocationsWithUploadRemove)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
28 changes: 26 additions & 2 deletions ucan-invocation/tables/space-metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ export function createSpaceMetricsTable (region, tableName, options = {}) {

await dynamoDb.send(cmd)
},
/**
* Increment accumulated count from upload/remove operations.
*
* @param {Capabilities} uploadRemoveInv
*/
incrementUploadRemoveCount: async (uploadRemoveInv) => {
const updateInputTransactions = normalizeInvocationsPerSpaceOccurence(uploadRemoveInv)
if (!updateInputTransactions.length) {
return
}

const transactItems = getItemsToIncrementForMetric(
updateInputTransactions,
tableName,
SPACE_METRICS_NAMES.UPLOAD_REMOVE_TOTAL
)

const cmd = new TransactWriteItemsCommand({
TransactItems: transactItems
})

await dynamoDb.send(cmd)
},
/**
* Increment accumulated count from store/add operations.
*
Expand All @@ -77,7 +100,7 @@ export function createSpaceMetricsTable (region, tableName, options = {}) {

await dynamoDb.send(cmd)
},
/*
/**
* Increment total value from store/add operations.
*
* @param {Capabilities} operationsInv
Expand Down Expand Up @@ -122,7 +145,7 @@ export function createSpaceMetricsTable (region, tableName, options = {}) {

await dynamoDb.send(cmd)
},
/*
/**
* Increment total value from store/remove operations.
*
* @param {Capabilities} operationsInv
Expand All @@ -146,6 +169,7 @@ export function createSpaceMetricsTable (region, tableName, options = {}) {
await dynamoDb.send(cmd)
}
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { testConsumer as test } from '../helpers/context.js'

import { customAlphabet } from 'nanoid'
import { CreateTableCommand, GetItemCommand } from '@aws-sdk/client-dynamodb'
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'
import * as Signer from '@ucanto/principal/ed25519'
import * as UploadCapabilities from '@web3-storage/capabilities/upload'

import { spaceMetricsTableProps } from '../../tables/index.js'
import { createDynamodDb, dynamoDBTableConfig } from '../helpers/resources.js'
import { createDynamodDb } from '../helpers/resources.js'
import { createSpace } from '../helpers/ucanto.js'
import { randomCAR } from '../helpers/random.js'
import { createDynamoTable, getItemFromTable} from '../helpers/tables.js'
import { spaceMetricsTableProps } from '../../tables/index.js'

import { updateUploadCount } from '../../functions/space-metrics-upload-add-total.js'
import { SPACE_METRICS_NAMES } from '../../constants.js'
Expand Down Expand Up @@ -62,7 +60,10 @@ test('handles a batch of single invocation with upload/add', async t => {
spaceMetricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, spaceDid)
const item = await getItemFromTable(t.context.dynamoClient, tableName, {
space: spaceDid,
name: SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL
})
t.truthy(item)
t.is(item?.value, 1)
t.is(item?.space, spaceDid)
Expand Down Expand Up @@ -103,7 +104,10 @@ test('handles batch of single invocation with multiple upload/add attributes', a
spaceMetricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, spaceDid)
const item = await getItemFromTable(t.context.dynamoClient, tableName, {
space: spaceDid,
name: SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL
})
t.truthy(item)
t.is(item?.value, cars.length)
t.is(item?.space, spaceDid)
Expand Down Expand Up @@ -146,7 +150,10 @@ test('handles batch of multiple invocations with upload/add in same space', asyn
spaceMetricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, spaceDid)
const item = await getItemFromTable(t.context.dynamoClient, tableName, {
space: spaceDid,
name: SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL
})
t.truthy(item)
t.is(item?.value, cars.length)
t.is(item?.space, spaceDid)
Expand Down Expand Up @@ -189,7 +196,10 @@ test('handles batch of multiple invocations with upload/add in multiple spaces',
})

const items = await Promise.all(
spaces.map(({ spaceDid }) => getItemFromTable(t.context.dynamoClient, tableName, spaceDid))
spaces.map(({ spaceDid }) => getItemFromTable(t.context.dynamoClient, tableName, {
space: spaceDid,
name: SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL
}))
)
t.truthy(items)
t.is(items.length, spaces.length)
Expand Down Expand Up @@ -241,46 +251,10 @@ test('errors handling batch of multiple invocations with more transactions than
*/
async function prepareResources (dynamoClient) {
const [ tableName ] = await Promise.all([
createDynamouploadTable(dynamoClient),
createDynamoTable(dynamoClient, spaceMetricsTableProps),
])

return {
tableName
}
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo
*/
async function createDynamouploadTable(dynamo) {
const id = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 10)
const tableName = id()

await dynamo.send(new CreateTableCommand({
TableName: tableName,
...dynamoDBTableConfig(spaceMetricsTableProps),
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 1
}
}))

return tableName
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo
* @param {string} tableName
* @param {`did:key:${string}`} space
*/
async function getItemFromTable(dynamo, tableName, space) {
const params = {
TableName: tableName,
Key: marshall({
space,
name: SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL
})
}
const response = await dynamo.send(new GetItemCommand(params))
return response?.Item && unmarshall(response?.Item)
}
Loading