Skip to content

Commit

Permalink
feat: ucan stream consumer space size track
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Mar 16, 2023
1 parent 6f8776b commit e2b81ba
Show file tree
Hide file tree
Showing 9 changed files with 903 additions and 10 deletions.
42 changes: 42 additions & 0 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,28 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// store/add size
const spaceMetricsStoreAddSizeTotalConsumer = new Function(stack, 'space-metrics-store-add-size-total-consumer', {
environment: {
TABLE_NAME: spaceMetricsTable.tableName,
STORE_BUCKET_NAME: carparkBucket.bucketName,
},
permissions: [spaceMetricsTable, carparkBucket],
handler: 'functions/space-metrics-store-add-size-total.consumer',
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// store/remove size
const spaceMetricsStoreRemoveSizeTotalConsumer = new Function(stack, 'space-metrics-store-remove-size-total-consumer', {
environment: {
TABLE_NAME: spaceMetricsTable.tableName,
STORE_BUCKET_NAME: carparkBucket.bucketName,
},
permissions: [spaceMetricsTable, carparkBucket],
handler: 'functions/space-metrics-store-remove-size-total.consumer',
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// create a kinesis stream
const ucanStream = new KinesisStream(stack, 'ucan-stream', {
cdk: {
Expand Down Expand Up @@ -241,6 +263,26 @@ export function UcanInvocationStack({ stack, app }) {
...(getKinesisEventSourceConfig(stack))
}
}
},
spaceMetricsStoreAddSizeTotalConsumer: {
function: spaceMetricsStoreAddSizeTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
spaceMetricsStoreRemoveSizeTotalConsumer: {
function: spaceMetricsStoreRemoveSizeTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
}
},
})
Expand Down
10 changes: 7 additions & 3 deletions test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ test('w3infra integration flow', async t => {
// Get space metrics before upload
const spaceBeforeUploadAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL)
const spaceBeforeStoreAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.STORE_ADD_TOTAL)
const spaceBeforeStoreAddSizeMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.STORE_ADD_SIZE_TOTAL)

// Get metrics before upload
const beforeOperationMetrics = await getMetrics(t)
Expand Down Expand Up @@ -188,14 +189,15 @@ test('w3infra integration flow', async t => {
})

// Check metrics were updated
if (beforeStoreAddSizeTotal && spaceBeforeUploadAddMetrics && beforeUploadAddTotal) {
if (beforeStoreAddSizeTotal && spaceBeforeUploadAddMetrics && spaceBeforeStoreAddSizeMetrics && beforeUploadAddTotal) {
await pWaitFor(async () => {
const afterOperationMetrics = await getMetrics(t)
const afterStoreAddTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_TOTAL)
const afterUploadAddTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.UPLOAD_ADD_TOTAL)
const afterStoreAddSizeTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
const spaceAfterUploadAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL)
const spaceAfterStoreAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.STORE_ADD_TOTAL)
const spaceAfterStoreAddSizeMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.STORE_ADD_SIZE_TOTAL)

// If staging accept more broad condition given multiple parallel tests can happen there
if (stage === 'staging') {
Expand All @@ -204,7 +206,8 @@ test('w3infra integration flow', async t => {
afterUploadAddTotal?.value === beforeUploadAddTotal?.value + 1 &&
afterStoreAddSizeTotal?.value >= beforeStoreAddSizeTotal.value + carSize &&
spaceAfterStoreAddMetrics?.value >= spaceBeforeStoreAddMetrics?.value + 1 &&
spaceAfterUploadAddMetrics?.value >= spaceBeforeUploadAddMetrics?.value + 1
spaceAfterUploadAddMetrics?.value >= spaceBeforeUploadAddMetrics?.value + 1 &&
spaceAfterStoreAddSizeMetrics?.value >= spaceBeforeStoreAddSizeMetrics?.value + carSize
)
}

Expand All @@ -213,7 +216,8 @@ test('w3infra integration flow', async t => {
afterUploadAddTotal?.value === beforeUploadAddTotal?.value + 1 &&
afterStoreAddSizeTotal?.value === beforeStoreAddSizeTotal.value + carSize &&
spaceAfterStoreAddMetrics?.value === spaceBeforeStoreAddMetrics?.value + 1 &&
spaceAfterUploadAddMetrics?.value === spaceBeforeUploadAddMetrics?.value + 1
spaceAfterUploadAddMetrics?.value === spaceBeforeUploadAddMetrics?.value + 1 &&
spaceAfterStoreAddSizeMetrics?.value === spaceBeforeStoreAddSizeMetrics?.value + carSize
)
})
}
Expand Down
45 changes: 45 additions & 0 deletions ucan-invocation/functions/space-metrics-store-add-size-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import * as Sentry from '@sentry/serverless'

import { createCarStore } from '../buckets/car-store.js'
import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
STORE_BUCKET_NAME: storeBucketName = '',
} = process.env

await updateAddSizeTotal(ucanInvocations, {
spaceMetricsTable: createSpaceMetricsTable(AWS_REGION, tableName),
carStoreBucket: createCarStore(AWS_REGION, storeBucketName)
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').MetricsBySpaceWithBucketCtx} ctx
*/
export async function updateAddSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementStoreAddSizeTotal(invocationsWithStoreAdd)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
55 changes: 55 additions & 0 deletions ucan-invocation/functions/space-metrics-store-remove-size-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import * as Sentry from '@sentry/serverless'

import { createCarStore } from '../buckets/car-store.js'
import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
STORE_BUCKET_NAME: storeBucketName = '',
} = process.env

await updateRemoveSizeTotal(ucanInvocations, {
spaceMetricsTable: createSpaceMetricsTable(AWS_REGION, tableName),
carStoreBucket: createCarStore(AWS_REGION, storeBucketName)
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').MetricsBySpaceWithBucketCtx} ctx
*/
export async function updateRemoveSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
).flatMap(inv => inv.value.att)

// TODO: once we have receipts for store/remove, replace this
// Temporary adaptor to set size in invocation
for (const inv of invocationsWithStoreRemove) {
// @ts-ignore remove invocations always have link
const size = await ctx.carStoreBucket.getSize(inv.nb?.link)

// @ts-ignore
inv.nb.size = size
}

await ctx.spaceMetricsTable.incrementStoreRemoveSizeTotal(invocationsWithStoreRemove)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
76 changes: 76 additions & 0 deletions ucan-invocation/tables/space-metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,28 @@ export function createSpaceMetricsTable (region, tableName, options = {}) {
tableName,
SPACE_METRICS_NAMES.STORE_ADD_TOTAL
)
const cmd = new TransactWriteItemsCommand({
TransactItems: transactItems
})

await dynamoDb.send(cmd)
},
/*
* Increment total value from store/add operations.
*
* @param {Capabilities} operationsInv
*/
incrementStoreAddSizeTotal: async (operationsInv) => {
const updateInputTransactions = normalizeInvocationsPerSpaceSize(operationsInv)
if (!updateInputTransactions.length) {
return
}

const transactItems = getItemsToIncrementForMetric(
updateInputTransactions,
tableName,
SPACE_METRICS_NAMES.STORE_ADD_SIZE_TOTAL
)

const cmd = new TransactWriteItemsCommand({
TransactItems: transactItems
Expand All @@ -94,6 +116,28 @@ export function createSpaceMetricsTable (region, tableName, options = {}) {
tableName,
SPACE_METRICS_NAMES.STORE_REMOVE_TOTAL
)
const cmd = new TransactWriteItemsCommand({
TransactItems: transactItems
})

await dynamoDb.send(cmd)
},
/*
* Increment total value from store/remove operations.
*
* @param {Capabilities} operationsInv
*/
incrementStoreRemoveSizeTotal: async (operationsInv) => {
const updateInputTransactions = normalizeInvocationsPerSpaceSize(operationsInv)
if (!updateInputTransactions.length) {
return
}

const transactItems = getItemsToIncrementForMetric(
updateInputTransactions,
tableName,
SPACE_METRICS_NAMES.STORE_REMOVE_SIZE_TOTAL
)

const cmd = new TransactWriteItemsCommand({
TransactItems: transactItems
Expand Down Expand Up @@ -158,3 +202,35 @@ function normalizeInvocationsPerSpaceOccurence (inv) {

return res
}

/**
* Merge same space operations into single one and transform into transaction format
* We cannot have multiple operations in a TransactWrite with same key, and we
* decrease the probability of reaching the maximum number of transactions.
*
* @param {Capabilities} inv
* @returns {UpdateInput[]}
*/
function normalizeInvocationsPerSpaceSize (inv) {
const res = inv.reduce((acc, c) => {
const existing = acc?.find((e) => c.with === e.space)
if (existing) {
// @ts-expect-error
existing.value += c.nb?.size
} else {
acc.push({
// @ts-expect-error
space: c.with,
// @ts-expect-error
value: c.nb.size
})
}
return acc
}, /** @type {UpdateInput[]} */ ([]))

if (res.length >= MAX_TRANSACT_WRITE_ITEMS) {
throw new Error(`Attempting to increment space count for more than allowed transactions: ${res.length}`)
}

return res
}
Loading

0 comments on commit e2b81ba

Please sign in to comment.