Skip to content

Commit

Permalink
Merge pull request #136 from guardian/api-ffmpeg
Browse files Browse the repository at this point in the history
Calculate duration of input files in order to decide which engine to use
  • Loading branch information
philmcmahon authored Feb 25, 2025
2 parents af9b413 + 1833874 commit 0889e0a
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 9 deletions.
11 changes: 11 additions & 0 deletions ffmpeg-lambda-layer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ffmpeg lambda layer

Ffmpeg (and ffprobe) are very useful tools for working with media files. In order to make use of them on AWS lambda,
they need to be provided as a 'lambda layer' (essentially a zip file with the binaries in it).

This folder contains a script to generate the zip file and push it to S3. If you are publishing a new version of the layer
(e.g. to pull in some updates to ffmpeg) then you will need to run this script, and then update the FFMpegLayerZipKey
parameter on the transcription-service stack.

It probably would be possible to automate this process, but as we don't expect many updates to ffmpeg (I imagine it will
only be necessary when we want to support new media codecs etc), I haven't bothered.
39 changes: 39 additions & 0 deletions ffmpeg-lambda-layer/publish-ffmpeg-layer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env bash

set +x
set -e

echo ""
echo "Downloading ffmpeg release and bundling for lambda layer..."
echo ""

WORKING_DIRECTORY="ffmpeg-layer-build"
mkdir -p $WORKING_DIRECTORY

pushd $WORKING_DIRECTORY
curl -0L https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz > ffmpeg-release-amd64-static.tar.xz
curl -0L https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz.md5 > ffmpeg-release-amd64-static.tar.xz.md5
md5sum -c ffmpeg-release-amd64-static.tar.xz.md5

tar -xf ffmpeg-release-amd64-static.tar.xz

mkdir -p ffmpeg/bin
mkdir -p ffprobe/bin

mkdir -p bin

cp ffmpeg-7.0.2-amd64-static/ffmpeg bin/
cp ffmpeg-7.0.2-amd64-static/ffprobe bin/

zip -r ffmpeg_x86_64.zip bin

HASH=$(md5sum ffmpeg_x86_64.zip | cut -d ' ' -f 1)
NAME_WITH_HASH="ffmpeg_x86_64-$HASH.zip"
mv ffmpeg_x86_64.zip $NAME_WITH_HASH

aws s3 cp ffmpeg_x86_64-$HASH.zip "s3://transcription-service-lambda-layers/${NAME_WITH_HASH}"
popd
rm -rf $WORKING_DIRECTORY

echo ""
echo "Layer zip key: ${NAME_WITH_HASH} - to use this layer you will need to update the FFMpegLayerZipKey parameter in the transcription-service stack"
12 changes: 12 additions & 0 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
getS3Client,
sendMessage,
writeTranscriptionItem,
downloadObject,
} from '@guardian/transcription-service-backend-common';
import {
ClientConfig,
Expand Down Expand Up @@ -48,6 +49,7 @@ import {
import { DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb';
import { invokeLambda } from './services/lambda';
import { LambdaClient } from '@aws-sdk/client-lambda';
import { getFileDuration } from '@guardian/transcription-service-backend-common/src/ffmpeg';

const runningOnAws = process.env['AWS_EXECUTION_ENV'];
const emulateProductionLocally =
Expand Down Expand Up @@ -172,6 +174,15 @@ const getApp = async () => {
return;
}

const tempPath = `/tmp/${s3Key}`;
await downloadObject(
s3Client,
config.app.sourceMediaBucket,
s3Key,
tempPath,
);
const duration = await getFileDuration(tempPath);

const signedUrl = await getSignedDownloadUrl(
config.aws.region,
config.app.sourceMediaBucket,
Expand All @@ -188,6 +199,7 @@ const getApp = async () => {
body.data.languageCode,
body.data.translationRequested,
body.data.diarizationRequested,
duration,
);
if (isSqsFailure(sendResult)) {
res.status(500).send(sendResult.errorMsg);
Expand Down
21 changes: 21 additions & 0 deletions packages/backend-common/src/ffmpeg.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { exec } from 'node:child_process';
import { logger } from '@guardian/transcription-service-backend-common';
import { promisify } from 'node:util';

const execPromise = promisify(exec);

export const getFileDuration = async (
filePath: string,
): Promise<number | undefined> => {
const command = `ffprobe -i ${filePath} -show_entries format=duration -v quiet -of csv="p=0"`;
try {
const { stdout, stderr } = await execPromise(command);
if (stderr) {
logger.error(`ffprobe stderr: `, stderr);
}
return parseFloat(stdout);
} catch (error) {
logger.error(`Error during ffprobe file duration detection`, error);
return undefined;
}
};
20 changes: 14 additions & 6 deletions packages/backend-common/src/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export const generateOutputSignedUrlAndSendMessage = async (
languageCode: InputLanguageCode,
translationRequested: boolean,
diarizationRequested: boolean,
duration?: number,
): Promise<SendResult> => {
const signedUrls = await generateOutputSignedUrls(
s3Key,
Expand All @@ -79,9 +80,18 @@ export const generateOutputSignedUrlAndSendMessage = async (
7,
);

const queue = config.app.useWhisperx
? config.app.gpuTaskQueueUrl
: config.app.taskQueueUrl;
// user whisperX if whisperX enabled and...
// duration is either unknown or greater than 10 minutes or diarization has been requested
const engine =
config.app.useWhisperx &&
(!duration || duration > 600 || diarizationRequested)
? TranscriptionEngine.WHISPER_X
: TranscriptionEngine.WHISPER_CPP;

const queue =
engine === TranscriptionEngine.WHISPER_X
? config.app.gpuTaskQueueUrl
: config.app.taskQueueUrl;

const job: TranscriptionJob = {
id: s3Key, // id of the source file
Expand All @@ -94,9 +104,7 @@ export const generateOutputSignedUrlAndSendMessage = async (
languageCode,
translate: false,
diarize: diarizationRequested,
engine: config.app.useWhisperx
? TranscriptionEngine.WHISPER_X
: TranscriptionEngine.WHISPER_CPP,
engine,
};
const messageResult = await sendMessage(
client,
Expand Down
15 changes: 15 additions & 0 deletions packages/cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,18 @@
This directory defines the components to be deployed to AWS.

See [`package.json`](./package.json) for a list of available scripts.

## Stacks

### Transcription-Service

This is the main stack with the vast majority of the infrastructure.

### Repository

This stack could probably be merged with universal-infra (it isn't for historical reasons). It contains all the relevant
infra and IAM permissions to support publishing docker images from github actions to ECR.

### Universal-Infra

This stack was created to contain resources shared across all stages.
13 changes: 13 additions & 0 deletions packages/cdk/bin/cdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'source-map-support/register';
import { GuRoot } from '@guardian/cdk/lib/constructs/root';
import { TranscriptionServiceRepository } from '../lib/repository';
import { TranscriptionService } from '../lib/transcription-service';
import { TranscriptionServiceUniversalInfra } from '../lib/universal-infra';

const app = new GuRoot();
new TranscriptionService(app, 'TranscriptionService-CODE', {
Expand All @@ -21,3 +22,15 @@ new TranscriptionServiceRepository(app, 'TranscriptionServiceRepository', {
stage: 'PROD',
env: { region: 'eu-west-1' },
});

// This is another stack which is used for both code/prod - but as repository already existed I made a new stack to avoid
// having to delete the whole repository stack (including all containers) in order to give it a less specific name
new TranscriptionServiceUniversalInfra(
app,
'TranscriptionServiceUniversalInfra',
{
stack: 'investigations',
stage: 'PROD',
env: { region: 'eu-west-1' },
},
);
55 changes: 55 additions & 0 deletions packages/cdk/lib/__snapshots__/transcription-service.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ exports[`The TranscriptionService stack matches the snapshot 1`] = `
"GuCertificate",
"GuS3Bucket",
"GuS3Bucket",
"GuStringParameter",
"GuDistributionBucketParameter",
"GuApiLambda",
"GuPolicy",
Expand Down Expand Up @@ -105,6 +106,10 @@ exports[`The TranscriptionService stack matches the snapshot 1`] = `
"Default": "/TEST/investigations/alarmTopicArn",
"Type": "AWS::SSM::Parameter::Value<String>",
},
"LayerBucketArn": {
"Default": "/investigations/transcription-service/lambdaLayerBucketArn",
"Type": "AWS::SSM::Parameter::Value<String>",
},
"LoggingStreamName": {
"Default": "/account/services/logging.stream.name",
"Description": "SSM parameter containing the Name (not ARN) on the kinesis stream",
Expand Down Expand Up @@ -209,6 +214,48 @@ exports[`The TranscriptionService stack matches the snapshot 1`] = `
},
"Type": "AWS::SSM::Parameter",
},
"FFMpegLayerx8664TEST3F325D39": {
"Properties": {
"CompatibleArchitectures": [
"x86_64",
],
"CompatibleRuntimes": [
"nodejs18.x",
"nodejs22.x",
"nodejs20.x",
"nodejs18.x",
],
"Content": {
"S3Bucket": {
"Fn::Select": [
0,
{
"Fn::Split": [
"/",
{
"Fn::Select": [
5,
{
"Fn::Split": [
":",
{
"Ref": "LayerBucketArn",
},
],
},
],
},
],
},
],
},
"S3Key": "ffmpeg_x86_64-1998ca7c6bd4313262b49cd9a66bc690.zip",
},
"Description": "FFMpeg Layer",
"LayerName": "FFMpegLayer",
},
"Type": "AWS::Lambda::LayerVersion",
},
"GPUTaskQueueUrlParameter64941E19": {
"Properties": {
"Name": "/TEST/investigations/transcription-service/gpuTaskQueueUrl",
Expand Down Expand Up @@ -2442,7 +2489,15 @@ service transcription-service-worker start",
"STAGE": "TEST",
},
},
"EphemeralStorage": {
"Size": 10240,
},
"Handler": "index.api",
"Layers": [
{
"Ref": "FFMpegLayerx8664TEST3F325D39",
},
],
"LoggingConfig": {
"LogFormat": "JSON",
},
Expand Down
45 changes: 43 additions & 2 deletions packages/cdk/lib/transcription-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ import {
Role,
ServicePrincipal,
} from 'aws-cdk-lib/aws-iam';
import { Runtime } from 'aws-cdk-lib/aws-lambda';
import {
Architecture,
Code,
LayerVersion,
Runtime,
} from 'aws-cdk-lib/aws-lambda';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { LogGroup } from 'aws-cdk-lib/aws-logs';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { HttpMethods } from 'aws-cdk-lib/aws-s3';
import { Bucket, HttpMethods } from 'aws-cdk-lib/aws-s3';
import { Secret } from 'aws-cdk-lib/aws-secretsmanager';
import { Topic } from 'aws-cdk-lib/aws-sns';
import { Queue } from 'aws-cdk-lib/aws-sqs';
Expand Down Expand Up @@ -205,6 +210,40 @@ export class TranscriptionService extends GuStack {
});
}

const layerBucket = new GuStringParameter(this, 'LayerBucketArn', {
fromSSM: true,
default: '/investigations/transcription-service/lambdaLayerBucketArn',
});

const ffmpegHash = new GuStringParameter(this, 'FFMpegLayerZipKey', {
description:
"Key for the ffmpeg layer's zip file (pushed to layerBucket by publish-ffmpeg-layer.sh script)",
});

const ffmpegLayer = new LayerVersion(
this,
`FFMpegLayer_x86_64-${this.stage}`,
{
code: Code.fromBucket(
Bucket.fromBucketArn(
this,
'LambdaLayerBucket',
layerBucket.valueAsString,
),
ffmpegHash.valueAsString,
),
description: 'FFMpeg Layer',
layerVersionName: 'FFMpegLayer',
compatibleArchitectures: [Architecture.X86_64],
compatibleRuntimes: [
Runtime.NODEJS_LATEST,
Runtime.NODEJS_22_X,
Runtime.NODEJS_20_X,
Runtime.NODEJS_18_X,
],
},
);

const apiLambda = new GuApiLambda(this, 'transcription-service-api', {
fileName: 'api.zip',
handler: 'index.api',
Expand All @@ -213,6 +252,8 @@ export class TranscriptionService extends GuStack {
noMonitoring: true,
},
app: `${APP_NAME}-api`,
layers: [ffmpegLayer],
ephemeralStorageSize: Size.gibibytes(10), // needed so api can download source files to get the duration
api: {
id: apiId,
description: 'API for transcription service frontend',
Expand Down
26 changes: 26 additions & 0 deletions packages/cdk/lib/universal-infra.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { GuStackProps } from '@guardian/cdk/lib/constructs/core';
import { GuStack } from '@guardian/cdk/lib/constructs/core';
import { GuS3Bucket } from '@guardian/cdk/lib/constructs/s3';
import type { App } from 'aws-cdk-lib';
import { CfnOutput } from 'aws-cdk-lib';
import { StringParameter } from 'aws-cdk-lib/aws-ssm';

export class TranscriptionServiceUniversalInfra extends GuStack {
constructor(scope: App, id: string, props: GuStackProps) {
super(scope, id, props);

const layerBucket = new GuS3Bucket(this, 'LayerBucket', {
bucketName: 'transcription-service-lambda-layers',
app: 'transcription-service-universal-infra',
});

new StringParameter(this, 'ExportFunctionName', {
parameterName: `/investigations/transcription-service/lambdaLayerBucketArn`,
stringValue: layerBucket.bucketArn,
});

new CfnOutput(this, 'LayerBucket', {
value: layerBucket.bucketArn,
});
}
}
1 change: 1 addition & 0 deletions packages/common/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export const TranscriptionOutputSuccess = TranscriptionOutputBase.extend({
outputBucketKeys: OutputBucketKeys,
// we can get rid of this when we switch to using a zip
translationOutputBucketKeys: z.optional(OutputBucketKeys),
duration: z.optional(z.number()),
});

export const MediaDownloadFailure = OutputBase.extend({
Expand Down
1 change: 1 addition & 0 deletions packages/media-download/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ const requestTranscription = async (
job.languageCode,
job.translationRequested,
job.diarizationRequested,
metadata.duration,
);
if (isSqsFailure(sendResult)) {
throw new Error('Failed to send transcription job');
Expand Down
Loading

0 comments on commit 0889e0a

Please sign in to comment.