Skip to content

Commit

Permalink
feat: ucan stream consumer space size track
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Mar 15, 2023
1 parent 7d1ff48 commit 8ef0394
Show file tree
Hide file tree
Showing 17 changed files with 1,085 additions and 57 deletions.
2 changes: 1 addition & 1 deletion stacks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ export default function (app) {
})
app.stack(BusStack)
app.stack(UploadDbStack)
app.stack(UcanInvocationStack)
app.stack(CarparkStack)
app.stack(UcanInvocationStack)
app.stack(SatnavStack)
app.stack(UploadApiStack)
app.stack(ReplicatorStack)
Expand Down
57 changes: 50 additions & 7 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { Duration } from 'aws-cdk-lib'

import { BusStack } from './bus-stack.js'
import { CarparkStack } from './carpark-stack.js'
import { UploadDbStack } from './upload-db-stack.js'
import {
getBucketConfig,
Expand All @@ -28,6 +29,7 @@ export function UcanInvocationStack({ stack, app }) {

// Get eventBus reference
const { eventBus } = use(BusStack)
const { carparkBucket } = use(CarparkStack)
const { adminMetricsTable, spaceMetricsTable } = use(UploadDbStack)

const ucanBucket = new Bucket(stack, 'ucan-store', {
Expand Down Expand Up @@ -84,6 +86,17 @@ export function UcanInvocationStack({ stack, app }) {
handler: 'functions/metrics-store-add-size-total.consumer',
deadLetterQueue: metricsStoreAddSizeTotalDLQ.cdk.queue,
})

// metrics upload/remove count
const metricsUploadRemoveTotalDLQ = new Queue(stack, 'metrics-upload-remove-total-dlq')
const metricsUploadRemoveTotalConsumer = new Function(stack, 'metrics-upload-remove-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-upload-remove-total.consumer',
deadLetterQueue: metricsUploadRemoveTotalDLQ.cdk.queue,
})

// metrics per space
const spaceMetricsDLQ = new Queue(stack, 'space-metrics-dlq')
Expand All @@ -98,17 +111,27 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// metrics upload/remove count
const metricsUploadRemoveTotalDLQ = new Queue(stack, 'metrics-upload-remove-total-dlq')
const metricsUploadRemoveTotalConsumer = new Function(stack, 'metrics-upload-remove-total-consumer', {
// store/add size
const spaceMetricsStoreAddSizeTotalConsumer = new Function(stack, 'space-metrics-store-add-size-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
TABLE_NAME: spaceMetricsTable.tableName,
STORE_BUCKET_NAME: carparkBucket.bucketName,
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-upload-remove-total.consumer',
deadLetterQueue: metricsUploadRemoveTotalDLQ.cdk.queue,
permissions: [spaceMetricsTable, carparkBucket],
handler: 'functions/space-metrics-store-add-size-total.consumer',
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// store/remove size
const spaceMetricsStoreRemoveSizeTotalConsumer = new Function(stack, 'space-metrics-store-remove-size-total-consumer', {
environment: {
TABLE_NAME: spaceMetricsTable.tableName,
STORE_BUCKET_NAME: carparkBucket.bucketName,
},
permissions: [spaceMetricsTable, carparkBucket],
handler: 'functions/space-metrics-store-remove-size-total.consumer',
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// create a kinesis stream
const ucanStream = new KinesisStream(stack, 'ucan-stream', {
Expand Down Expand Up @@ -161,6 +184,26 @@ export function UcanInvocationStack({ stack, app }) {
...(getKinesisEventSourceConfig(stack))
}
}
},
spaceMetricsStoreAddSizeTotalConsumer: {
function: spaceMetricsStoreAddSizeTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
spaceMetricsStoreRemoveSizeTotalConsumer: {
function: spaceMetricsStoreRemoveSizeTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
}
},
})
Expand Down
50 changes: 50 additions & 0 deletions ucan-invocation/buckets/car-store.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import {
S3Client,
HeadObjectCommand,
} from '@aws-sdk/client-s3'

/**
* Abstraction layer with Factory to perform operations on bucket storing CAR files.
*
* @param {string} region
* @param {string} bucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
*/
export function createCarStore(region, bucketName, options) {
const s3 = new S3Client({
region,
...options,
})
return useCarStore(s3, bucketName)
}

/**
* @param {S3Client} s3
* @param {string} bucketName
* @returns {import('../types').CarStoreBucket}
*/
export function useCarStore(s3, bucketName) {
return {
/**
* @param {import('multiformats').UnknownLink} link
*/
getSize: async (link) => {
const cid = link.toString()
const cmd = new HeadObjectCommand({
Key: `${cid}/${cid}.car`,
Bucket: bucketName,
})
let res
try {
res = await s3.send(cmd)
} catch (cause) {
// @ts-expect-error
if (cause?.$metadata?.httpStatusCode === 404) {
return 0
}
throw new Error('Failed to check if car-store', { cause })
}
return res.ContentLength || 0
},
}
}
2 changes: 2 additions & 0 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const METRICS_NAMES = {
STORE_ADD_TOTAL: `${STORE_ADD}-total`,
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`,
STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`,
STORE_REMOVE_SIZE_TOTAL: `${STORE_REMOVE}-size-total`,
}

// Spade Metrics
Expand All @@ -20,4 +21,5 @@ export const SPACE_METRICS_NAMES = {
STORE_ADD_TOTAL: `${STORE_ADD}-total`,
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`,
STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`,
STORE_REMOVE_SIZE_TOTAL: `${STORE_REMOVE}-size-total`,
}
45 changes: 45 additions & 0 deletions ucan-invocation/functions/space-metrics-store-add-size-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import * as Sentry from '@sentry/serverless'

import { createCarStore } from '../buckets/car-store.js'
import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
STORE_BUCKET_NAME: storeBucketName = '',
} = process.env

await updateAddSizeTotal(ucanInvocations, {
spaceMetricsTable: createSpaceMetricsTable(AWS_REGION, tableName),
carStoreBucket: createCarStore(AWS_REGION, storeBucketName)
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').MetricsBySpaceWithBucketCtx} ctx
*/
export async function updateAddSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementStoreAddSizeTotal(invocationsWithStoreAdd)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
55 changes: 55 additions & 0 deletions ucan-invocation/functions/space-metrics-store-remove-size-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import * as Sentry from '@sentry/serverless'

import { createCarStore } from '../buckets/car-store.js'
import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
STORE_BUCKET_NAME: storeBucketName = '',
} = process.env

await updateRemoveSizeTotal(ucanInvocations, {
spaceMetricsTable: createSpaceMetricsTable(AWS_REGION, tableName),
carStoreBucket: createCarStore(AWS_REGION, storeBucketName)
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').MetricsBySpaceWithBucketCtx} ctx
*/
export async function updateRemoveSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
).flatMap(inv => inv.value.att)

// TODO: once we have receipts for store/remove, replace this
// Temporary adaptor to set size in invocation
for (const inv of invocationsWithStoreRemove) {
// @ts-ignore remove invocations always have link
const size = await ctx.carStoreBucket.getSize(inv.nb?.link)

// @ts-ignore
inv.nb.size = size
}

await ctx.spaceMetricsTable.incrementStoreRemoveSizeTotal(invocationsWithStoreRemove)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
Loading

0 comments on commit 8ef0394

Please sign in to comment.