Skip to content

Commit

Permalink
Companion server upload events (transloadit#3544)
Browse files Browse the repository at this point in the history
* modernise code

* pull out config related functions and middleware

* make test more readable

* Expose companion emitter

to allow the consumer to subscribe to upload success/failure events

fixes transloadit#3435

* disable client socket timeout for tests

or jest will wait for them to time out
also fix broken test 'uploader respects maxFileSize with unknown size'

* document the event emitter usage
  • Loading branch information
mifi authored Mar 24, 2022
1 parent 73b613e commit 4e94dd0
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 41 deletions.
7 changes: 5 additions & 2 deletions src/companion.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const providerManager = require('./server/provider')
const controllers = require('./server/controllers')
const s3 = require('./server/controllers/s3')
const url = require('./server/controllers/url')
const emitter = require('./server/emitter')
const createEmitter = require('./server/emitter')
const redis = require('./server/redis')
const { getURLBuilder } = require('./server/helpers/utils')
const jobs = require('./server/jobs')
Expand Down Expand Up @@ -79,7 +79,7 @@ module.exports.app = (optionsArg = {}) => {
if (options.redisUrl) {
redis.client(merge({ url: options.redisUrl }, options.redisOptions || {}))
}
emitter(options.redisUrl, options.redisPubSubScope)
const emitter = createEmitter(options.redisUrl, options.redisPubSubScope)

const app = express()

Expand Down Expand Up @@ -152,5 +152,8 @@ module.exports.app = (optionsArg = {}) => {
processId,
})

// todo split emitter from app in next major
// @ts-ignore
app.companionEmitter = emitter
return app
}
1 change: 1 addition & 0 deletions src/config/companion.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const defaultOptions = {
logClientVersion: true,
periodicPingUrls: [],
streamingUpload: false,
clientSocketConnectTimeout: 60000,
}

/**
Expand Down
35 changes: 32 additions & 3 deletions src/server/Uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ class Uploader {
*/
async tryUploadStream (stream) {
try {
emitter().emit('upload-start', { token: this.token })

const ret = await this.uploadStream(stream)
if (!ret) return
const { url, extraData } = ret
Expand Down Expand Up @@ -358,10 +360,37 @@ class Uploader {
return Uploader.shortenToken(this.token)
}

async awaitReady () {
// TODO timeout after a while? Else we could leak emitters
async awaitReady (timeout) {
logger.debug('waiting for socket connection', 'uploader.socket.wait', this.shortToken)
await new Promise((resolve) => emitter().once(`connection:${this.token}`, resolve))

// TODO: replace the Promise constructor call when dropping support for Node.js <16 with
// await once(emitter, eventName, timeout && { signal: AbortSignal.timeout(timeout) })
await new Promise((resolve, reject) => {
const eventName = `connection:${this.token}`
let timer
let onEvent

function cleanup () {
emitter().removeListener(eventName, onEvent)
clearTimeout(timer)
}

if (timeout) {
// Need to timeout after a while, or we could leak emitters
timer = setTimeout(() => {
cleanup()
reject(new Error('Timed out waiting for socket connection'))
}, timeout)
}

onEvent = () => {
cleanup()
resolve()
}

emitter().once(eventName, onEvent)
})

logger.debug('socket connection received', 'uploader.socket.wait', this.shortToken)
}

Expand Down
4 changes: 3 additions & 1 deletion src/server/helpers/upload.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ const { ValidationError } = Uploader
async function startDownUpload ({ req, res, getSize, download, onUnhandledError }) {
try {
const size = await getSize()
const { clientSocketConnectTimeout } = req.companion.options

logger.debug('Instantiating uploader.', null, req.id)
const uploader = new Uploader(Uploader.reqToOptions(req, size))

logger.debug('Starting download stream.', null, req.id)
const stream = await download()

// "Forking" off the upload operation to background, so we can return the http request:
;(async () => {
// wait till the client has connected to the socket, before starting
// the download, so that the client can receive all download/upload progress.
logger.debug('Waiting for socket connection before beginning remote download/upload.', null, req.id)
await uploader.awaitReady()
await uploader.awaitReady(clientSocketConnectTimeout)
logger.debug('Socket connection received. Starting remote download/upload.', null, req.id)

await uploader.tryUploadStream(stream)
Expand Down
5 changes: 3 additions & 2 deletions src/server/s3-client.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const S3 = require('aws-sdk/clients/s3')
const AWS = require('aws-sdk')

/**
* instantiates the aws-sdk s3 client that will be used for s3 uploads.
*
Expand All @@ -6,8 +9,6 @@
module.exports = (companionOptions) => {
let s3Client = null
if (companionOptions.providerOptions.s3) {
const S3 = require('aws-sdk/clients/s3')
const AWS = require('aws-sdk')
const s3ProviderOptions = companionOptions.providerOptions.s3

if (s3ProviderOptions.accessKeyId || s3ProviderOptions.secretAccessKey) {
Expand Down
2 changes: 2 additions & 0 deletions src/standalone/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ const getConfigFromEnv = () => {
streamingUpload: process.env.COMPANION_STREAMING_UPLOAD === 'true',
maxFileSize: process.env.COMPANION_MAX_FILE_SIZE ? parseInt(process.env.COMPANION_MAX_FILE_SIZE, 10) : undefined,
chunkSize: process.env.COMPANION_CHUNK_SIZE ? parseInt(process.env.COMPANION_CHUNK_SIZE, 10) : undefined,
clientSocketConnectTimeout: process.env.COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT
? parseInt(process.env.COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT, 10) : undefined,
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/standalone/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const companion = require('../companion')
const helper = require('./helper')
const middlewares = require('../server/middlewares')
const { getURLBuilder } = require('../server/helpers/utils')
const connectRedis = require('connect-redis')

/**
* Configures an Express app for running Companion standalone
Expand Down Expand Up @@ -139,7 +140,7 @@ module.exports = function server (inputCompanionOptions = {}) {
}

if (companionOptions.redisUrl) {
const RedisStore = require('connect-redis')(session)
const RedisStore = connectRedis(session)
const redisClient = redis.client(
merge({ url: companionOptions.redisUrl }, companionOptions.redisOptions),
)
Expand Down
3 changes: 2 additions & 1 deletion test/__tests__/companion.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ const request = require('supertest')
const tokenService = require('../../src/server/helpers/jwt')
const { getServer } = require('../mockserver')

const authServer = getServer()
// todo don't share server between tests. rewrite to not use env variables
const authServer = getServer({ COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '0' })
const authData = {
dropbox: 'token value',
box: 'token value',
Expand Down
3 changes: 2 additions & 1 deletion test/__tests__/providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ const tokenService = require('../../src/server/helpers/jwt')
const { getServer } = require('../mockserver')
const defaults = require('../fixtures/constants')

const authServer = getServer()
// todo don't share server between tests. rewrite to not use env variables
const authServer = getServer({ COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '0' })
const OAUTH_STATE = 'some-cool-nice-encrytpion'
const providers = require('../../src/server/provider').getDefaultProviders()

Expand Down
67 changes: 40 additions & 27 deletions test/__tests__/uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const nock = require('nock')
const Uploader = require('../../src/server/Uploader')
const socketClient = require('../mocksocket')
const standalone = require('../../src/standalone')
const Emitter = require('../../src/server/emitter')

afterAll(() => {
nock.cleanAll()
Expand Down Expand Up @@ -64,33 +65,45 @@ describe('uploader with tus protocol', () => {
const uploadToken = uploader.token
expect(uploadToken).toBeTruthy()

return new Promise((resolve, reject) => {
// validate that the test is resolved on socket connection
uploader.awaitReady().then(() => {
uploader.tryUploadStream(stream).then(() => resolve())
})
let progressReceived = 0

let progressReceived = 0
// emulate socket connection
socketClient.connect(uploadToken)
socketClient.onProgress(uploadToken, (message) => {
progressReceived = message.payload.bytesUploaded
try {
expect(message.payload.bytesTotal).toBe(fileContent.length)
} catch (err) {
reject(err)
}
})
socketClient.onUploadSuccess(uploadToken, (message) => {
try {
expect(progressReceived).toBe(fileContent.length)
// see __mocks__/tus-js-client.js
expect(message.payload.url).toBe('https://tus.endpoint/files/foo-bar')
} catch (err) {
reject(err)
}
})
const onProgress = jest.fn()
const onUploadSuccess = jest.fn()
const onBeginUploadEvent = jest.fn()
const onUploadEvent = jest.fn()

const emitter = Emitter()
emitter.on('upload-start', onBeginUploadEvent)
emitter.on(uploadToken, onUploadEvent)

const promise = uploader.awaitReady(60000)
// emulate socket connection
socketClient.connect(uploadToken)
socketClient.onProgress(uploadToken, (message) => {
progressReceived = message.payload.bytesUploaded
onProgress(message)
})
socketClient.onUploadSuccess(uploadToken, onUploadSuccess)
await promise
await uploader.tryUploadStream(stream)

expect(progressReceived).toBe(fileContent.length)

expect(onProgress).toHaveBeenLastCalledWith(expect.objectContaining({
payload: expect.objectContaining({
bytesTotal: fileContent.length,
}),
}))
const expectedPayload = expect.objectContaining({
// see __mocks__/tus-js-client.js
url: 'https://tus.endpoint/files/foo-bar',
})
expect(onUploadSuccess).toHaveBeenCalledWith(expect.objectContaining({
payload: expectedPayload,
}))

expect(onBeginUploadEvent).toHaveBeenCalledWith({ token: uploadToken })
expect(onUploadEvent).toHaveBeenLastCalledWith({ action: 'success', payload: expectedPayload })
})

test('upload functions with tus protocol without size', async () => {
Expand All @@ -110,7 +123,7 @@ describe('uploader with tus protocol', () => {

return new Promise((resolve, reject) => {
// validate that the test is resolved on socket connection
uploader.awaitReady().then(() => {
uploader.awaitReady(60000).then(() => {
uploader.tryUploadStream(stream).then(() => {
try {
expect(fs.existsSync(uploader.path)).toBe(false)
Expand Down Expand Up @@ -257,7 +270,7 @@ describe('uploader with tus protocol', () => {
const uploadToken = uploader.token

// validate that the test is resolved on socket connection
uploader.awaitReady().then(uploader.tryUploadStream(stream))
uploader.awaitReady(60000).then(() => uploader.tryUploadStream(stream))
socketClient.connect(uploadToken)

return new Promise((resolve, reject) => {
Expand Down
2 changes: 1 addition & 1 deletion test/__tests__/url.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jest.mock('../../src/server/helpers/request', () => {
})
const { getServer } = require('../mockserver')

const mockServer = getServer()
const mockServer = getServer({ COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '0' })

beforeAll(() => {
nock('http://url.myendpoint.com').get('/files').reply(200, () => '')
Expand Down
7 changes: 5 additions & 2 deletions test/mockserver.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* global jest:false */
const express = require('express')
const session = require('express-session')

Expand Down Expand Up @@ -36,6 +35,8 @@ const defaultEnv = {
COMPANION_PATH: '',

COMPANION_PERIODIC_PING_URLS: '',

COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '',
}

function updateEnv (env) {
Expand All @@ -54,7 +55,9 @@ module.exports.getServer = (extraEnv) => {

updateEnv(env)

// delete from cache to force the server to reload companionOptions from the new env vars
// companion stores certain global state like emitter, metrics, logger (frozen object), so we need to reset modules
// todo rewrite companion to not use global state
// https://github.com/transloadit/uppy/issues/3284
jest.resetModules()
const standalone = require('../src/standalone')
const authServer = express()
Expand Down

0 comments on commit 4e94dd0

Please sign in to comment.