Skip to content

Commit

Permalink
adds timeouts to ODIS signer (#7597)
Browse files Browse the repository at this point in the history
* adds timeouts to signer

* moves timeout function to base package, adds signer timeout metric

* udpates dependency graph

* fixes typo

* fixes error

* adds backup timeout at express level

* addresses feedback

* import timeout through common package

* moves backup timeout to better spot

* removes commented out code

* parallelizes db ops

* adds TODOS

* adds timeout helper to sdk/base

* adds retryAsyncWithTimeout helper to sdk/base

* refactors retryAsyncWithBackOffAndTimeout() to use timeout()

* bumps package version and updates changelog

* adds docs

* Revert "bumps package version and updates changelog"

This reverts commit ed9040e.

* Adds timeout functions to sdk/base package (#7617)

### Description

In our efforts to improve ODIS scalability, the CAP team came across the need for these timeout async functions and they seemed like something that should be added to the base package. 

We've added retryAsyncWithBackOffWithTimeout() and timeout(). The former retries an async function up to a given number of times with a backoff between each retry and caps the attempt to call the function with a timeout. The timeout() function simply calls an async function and wraps the promise in a timeout.

Authorship credit goes to @codyborn who wrote the timeout logic in a separate PR 

### Tested

Unit tests added to async.test.ts

### Related issues

None

### Backwards compatibility

Yes

### Documentation

None

* add full node timeouts to signer

* adds db timeouts to signer

* adds dependency graph

* ODIS Combiner Reliability (#7594)

### Description

- Adding timeouts to full-node requests
- “Failing open” when timeouts are hit
- Forcing outstanding request termination when k of m signatures have been collected
- Forcing outstanding request termination and returning early when k of m signatures are not possible

### Tested

- Test cases for new async method wrapper
- TODO: Deploy to alfajores and perform load tests to measure impact

### Backwards compatibility

Yes

### Documentation

N/A

* ODIS load test (#7602)

Integrates load test logic into monitor

* update dependency graph

* adds sdk/base to signer dockerfile (temporary)

Co-authored-by: Cody Born <codyborn@outlook.com>
  • Loading branch information
alecps and codyborn authored Apr 6, 2021
1 parent 0a5353f commit 7a0f2a2
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 66 deletions.
1 change: 1 addition & 0 deletions dockerfiles/phone-number-privacy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ FROM node:10

WORKDIR /celo-phone-number-privacy/

COPY packages/sdk/base packages
COPY packages/phone-number-privacy/signer signer

WORKDIR /celo-phone-number-privacy/signer
Expand Down
3 changes: 2 additions & 1 deletion packages/phone-number-privacy/monitor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"config:set:mainnet": "firebase functions:config:set --project celo-pgpnp-mainnet",
"clean": "tsc -b . --clean",
"build": "tsc -b .",
"lint": "tslint --project ."
"lint": "tslint --project .",
"loadTest": "ts-node src/scripts/runLoadTest.ts"
},
"dependencies": {
"@celo/contractkit": "1.0.0-beta3",
Expand Down
30 changes: 8 additions & 22 deletions packages/phone-number-privacy/monitor/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
import { PhoneNumberHashDetails } from '@celo/identity/lib/odis/phone-number-identifier'
import { ErrorMessages } from '@celo/identity/lib/odis/query'
import { rootLogger as logger } from '@celo/phone-number-privacy-common'
import * as functions from 'firebase-functions'
import { queryOdisForSalt } from './query'
import { testQuery } from './test'

const haveConfig = !!functions.config().blockchain
export const network = haveConfig ? functions.config().blockchain.network : process.env.NETWORK
export const blockchainProvider: string = haveConfig
? functions.config().blockchain.provider
: process.env.BLOCKCHAIN_PROVIDER

export const odisMonitorScheduleFunction = functions
.region('us-central1', 'europe-west3')
.pubsub.schedule('every 5 minutes')
.onRun(async () => {
logger.info('Performing test query')
try {
const odisResponse: PhoneNumberHashDetails = await queryOdisForSalt()
logger.info({ odisResponse }, 'ODIS salt request successful. System is healthy.')
} catch (err) {
if ((err as Error).message === ErrorMessages.ODIS_QUOTA_ERROR) {
logger.info(
{ error: err },
'ODIS salt request out of quota. This is expected. System is healthy.'
)
} else {
logger.error('ODIS salt request failed.')
logger.error({ err })
throw err
}
}
})
.onRun(testQuery)
22 changes: 13 additions & 9 deletions packages/phone-number-privacy/monitor/src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,33 @@ import { newKit } from '@celo/contractkit'
import { OdisUtils } from '@celo/identity'
import { AuthSigner } from '@celo/identity/lib/odis/query'
import { fetchEnv } from '@celo/phone-number-privacy-common'
import { generateKeys, generateMnemonic, MnemonicStrength } from '@celo/utils/lib/account'
import { normalizeAddressWith0x, privateKeyToAddress } from '@celo/utils/lib/address'
import { LocalWallet } from '@celo/wallet-local'
import * as functions from 'firebase-functions'
import { blockchainProvider, network } from './index'

const privateKey = fetchEnv('PRIVATE_KEY')
const phoneNumber = fetchEnv('PHONE_NUMBER')
const accountAddress = normalizeAddressWith0x(privateKeyToAddress(privateKey)) // 0x1be31a94361a391bbafb2a4ccd704f57dc04d4bb
const contractKit = newKit(blockchainProvider, new LocalWallet())

const contractKit = newKit(functions.config().blockchain.provider, new LocalWallet())
contractKit.connection.addAccount(privateKey)
contractKit.defaultAccount = accountAddress
const newPrivateKey = async () => {
const mnemonic = await generateMnemonic(MnemonicStrength.s256_24words)
return (await generateKeys(mnemonic)).privateKey
}

export const queryOdisForSalt = () => {
export const queryOdisForSalt = async () => {
const privateKey = await newPrivateKey()
const accountAddress = normalizeAddressWith0x(privateKeyToAddress(privateKey))
contractKit.connection.addAccount(privateKey)
contractKit.defaultAccount = accountAddress
const authSigner: AuthSigner = {
authenticationMethod: OdisUtils.Query.AuthenticationMethod.WALLET_KEY,
contractKit,
}

return OdisUtils.PhoneNumberIdentifier.getPhoneNumberIdentifier(
phoneNumber,
accountAddress,
authSigner,
OdisUtils.Query.getServiceContext(functions.config().blockchain.name),
OdisUtils.Query.getServiceContext(network),
undefined,
'monitor:1.0.0'
)
Expand Down
29 changes: 29 additions & 0 deletions packages/phone-number-privacy/monitor/src/scripts/runLoadTest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { concurrentLoadTest } from '../test'

/* tslint:disable:no-console */

const args = process.argv.slice(2)

const printHelpAndExit = () => {
console.log('Usage: yarn loadTest <network> <numWorkers>')
process.exit(1)
}

if (args[0] === '--help' || args.length !== 2) {
printHelpAndExit()
}

switch (args[0]) {
case 'alfajores':
process.env.BLOCKCHAIN_PROVIDER = 'https://alfajores-forno.celo-testnet.org'
break
case 'mainnet':
process.env.BLOCKCHAIN_PROVIDER = 'https://forno.celo.org'
break
default:
printHelpAndExit()
break
}
process.env.NETWORK = args[0]

concurrentLoadTest(Number(args[1])) // tslint:disable-line:no-floating-promises
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { PhoneNumberHashDetails } from '@celo/identity/lib/odis/phone-number-ide
import { ErrorMessages } from '@celo/identity/lib/odis/query'
import { rootLogger as logger } from '@celo/phone-number-privacy-common'
import { queryOdisForSalt } from './query'
export const runTest = async () => {

export async function testQuery() {
logger.info('Performing test query')
try {
const odisResponse: PhoneNumberHashDetails = await queryOdisForSalt()
Expand All @@ -22,22 +23,27 @@ export const runTest = async () => {
}
}

export const loop = async () => {
export async function serialLoadTest(n: number) {
for (let i = 0; i < n; i++) {
try {
await testQuery()
} catch {} // tslint:disable-line:no-empty
}
}

export async function concurrentLoadTest(workers: number) {
while (true) {
const reqs = []
for (let i = 0; i < 100; i++) {
for (let i = 0; i < workers; i++) {
reqs.push(i)
}

await concurrentMap(100, reqs, async (i) => {
await concurrentMap(workers, reqs, async (i) => {
await sleep(i * 10)
try {
while (true) {
await runTest()
}
} catch {} // tslint:disable-line:no-empty
while (true) {
try {
await testQuery()
} catch {} // tslint:disable-line:no-empty
}
})
}
}

loop() // tslint:disable-line:no-floating-promises
4 changes: 4 additions & 0 deletions packages/phone-number-privacy/signer/src/common/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ export const Counters = {
name: 'whitelisted_requests',
help: 'Counter for the number of whitelisted requests not requiring quota (testing only)',
}),
timeouts: new Counter({
name: 'timeouts',
help: 'Counter for the number of signer timeouts as measured by the signer',
}),
}
const buckets = [
0.001,
Expand Down
2 changes: 2 additions & 0 deletions packages/phone-number-privacy/signer/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ interface Config {
secretKey: string
}
}
timeout: number
whitelist_percentage: number
}

Expand Down Expand Up @@ -132,6 +133,7 @@ const config: Config = {
secretKey: env.KEYSTORE_AWS_SECRET_KEY,
},
},
timeout: env.ODIS_SIGNER_TIMEOUT || 5000,
whitelist_percentage: Number(env.WHITELIST_PERCENTAGE) || 0,
}
export default config
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export async function getPerformedQueryCount(account: string, logger: Logger): P
.select(ACCOUNTS_COLUMNS.numLookups)
.where(ACCOUNTS_COLUMNS.address, account)
.first()
.timeout(DB_TIMEOUT)
getPerformedQueryCountMeter()
return queryCounts === undefined ? 0 : queryCounts[ACCOUNTS_COLUMNS.numLookups]
} catch (err) {
Expand Down Expand Up @@ -51,6 +52,7 @@ async function _incrementQueryCount(account: string, logger: Logger) {
await accounts()
.where(ACCOUNTS_COLUMNS.address, account)
.increment(ACCOUNTS_COLUMNS.numLookups, 1)
.timeout(DB_TIMEOUT)
return true
} else {
const newAccount = new Account(account)
Expand Down Expand Up @@ -81,6 +83,7 @@ async function _getDidMatchmaking(account: string, logger: Logger): Promise<bool
.where(ACCOUNTS_COLUMNS.address, account)
.select(ACCOUNTS_COLUMNS.didMatchmaking)
.first()
.timeout(DB_TIMEOUT)
if (!didMatchmaking) {
return false
}
Expand Down Expand Up @@ -109,6 +112,8 @@ async function _setDidMatchmaking(account: string, logger: Logger) {
if (await getAccountExists(account)) {
return accounts()
.where(ACCOUNTS_COLUMNS.address, account)
.update(ACCOUNTS_COLUMNS.didMatchmaking, new Date())
.timeout(DB_TIMEOUT)
.update(ACCOUNTS_COLUMNS.didMatchmaking, new Date()) // TODO(Alec): add timeouts here?
} else {
const newAccount = new Account(account)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export async function getRequestExists(
[REQUESTS_COLUMNS.blindedQuery]: request.blindedQueryPhoneNumber,
})
.first()
.timeout(DB_TIMEOUT)
getRequestExistsMeter()
return !!existingRequest
} catch (err) {
Expand Down
13 changes: 10 additions & 3 deletions packages/phone-number-privacy/signer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { rootLogger as logger } from '@celo/phone-number-privacy-common'
import { Counters } from './common/metrics'
import config, { DEV_MODE } from './config'
import { initDatabase } from './database/database'
import { initKeyProvider } from './key-management/key-provider'
Expand All @@ -12,9 +13,15 @@ async function start() {
const server = createServer()
logger.info('Starting server')
const port = config.server.port
server.listen(port, () => {
logger.info(`Server is listening on port ${port}`)
})
const backupTimeout = config.timeout * 1.2
server
.listen(port, () => {
logger.info(`Server is listening on port ${port}`)
})
.setTimeout(backupTimeout, () => {
Counters.timeouts.inc()
logger.warn(`Timed out after ${backupTimeout}ms`)
})
}

start().catch((err) => {
Expand Down
19 changes: 15 additions & 4 deletions packages/phone-number-privacy/signer/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { timeout } from '@celo/base'
import { loggerMiddleware, rootLogger as logger } from '@celo/phone-number-privacy-common'
import Logger from 'bunyan'
import express, { Request, Response } from 'express'
import fs from 'fs'
import https from 'https'
import * as PromClient from 'prom-client'
import { Histograms } from './common/metrics'
import { Counters, Histograms } from './common/metrics'
import config, { getVersion } from './config'
import { handleGetBlindedMessagePartialSig } from './signing/get-partial-signature'
import { handleGetQuota } from './signing/query-quota'
Expand Down Expand Up @@ -37,7 +39,7 @@ export function createServer() {
handler: (req: Request, res: Response) => Promise<void>
) =>
app.post(endpoint, async (req, res) => {
await callAndMeasureLatency(endpoint, handler, req, res)
await callAndMeterLatency(endpoint, handler, req, res)
})

// EG. curl -v "http://localhost:8080/getBlindedMessagePartialSig" -H "Authorization: 0xdaf63ea42a092e69b2001db3826bc81dc859bffa4d51ce8943fddc8ccfcf6b2b1f55d64e4612e7c028791528796f5a62c1d2865b184b664589696a08c83fc62a00" -d '{"hashedPhoneNumber":"0x5f6e88c3f724b3a09d3194c0514426494955eff7127c29654e48a361a19b4b96","blindedQueryPhoneNumber":"n/I9srniwEHm5o6t3y0tTUB5fn7xjxRrLP1F/i8ORCdqV++WWiaAzUo3GA2UNHiB","account":"0x588e4b68193001e4d10928660aB4165b813717C0"}' -H 'Content-Type: application/json'
Expand All @@ -52,14 +54,23 @@ export function createServer() {
}
}

async function callAndMeasureLatency(
async function callAndMeterLatency(
endpoint: Endpoints,
handler: (req: Request, res: Response) => Promise<void>,
req: Request,
res: Response
) {
const childLogger: Logger = res.locals.logger
const end = Histograms.responseLatency.labels(endpoint).startTimer()
await handler(req, res).finally(end)
const timeoutRes = Symbol()
await timeout(handler, [req, res], config.timeout, timeoutRes)
.catch((error: any) => {
if (error === timeoutRes) {
Counters.timeouts.inc()
childLogger.warn(`Timed out after ${config.timeout}ms`)
}
})
.finally(end)
}

function getSslOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,15 @@ export async function handleGetBlindedMessagePartialSig(
const meterDbWriteOps = Histograms.getBlindedSigInstrumentation
.labels('dbWriteOps')
.startTimer()
// TODO(Alec): Parallelize these requests
if (!(await storeRequest(request.body, logger))) {
const [requestStored, queryCountIncremented] = await Promise.all([
storeRequest(request.body, logger),
incrementQueryCount(account, logger),
])
if (!requestStored) {
logger.debug('Did not store request.')
errorMsgs.push(ErrorMessage.FAILURE_TO_STORE_REQUEST)
}
if (!(await incrementQueryCount(account, logger))) {
if (!queryCountIncremented) {
logger.debug('Did not increment query count.')
errorMsgs.push(ErrorMessage.FAILURE_TO_INCREMENT_QUERY_COUNT)
} else {
Expand Down
Loading

0 comments on commit 7a0f2a2

Please sign in to comment.