From a4c67129ae9061feb8f42c3f2497e23373472d5b Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Tue, 3 Sep 2019 12:42:05 +0200 Subject: [PATCH 1/5] feat: add HTTP requests limiting on utils/ to avoid ban --- package.json | 1 + utils/download_filtered.js | 72 +++++++++++++++++++++++++------------- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/package.json b/package.json index 9df31d1d..d919db7a 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "fs-extra": "^7.0.0", "glob": "^7.0.0", "joi": "^14.0.0", + "limited-request-queue": "^5.0.0", "lodash": "^4.16.0", "minimist": "^1.2.0", "pelias-blacklist-stream": "^1.0.0", diff --git a/utils/download_filtered.js b/utils/download_filtered.js index d1bb8340..3ce5dd62 100644 --- a/utils/download_filtered.js +++ b/utils/download_filtered.js @@ -4,12 +4,11 @@ const async = require('async'); const fs = require('fs-extra'); const tmp = require('tmp'); const logger = require('pelias-logger').get('openaddresses-download'); +const RequestQueue = require('limited-request-queue'); +const URL = require('url').URL; function downloadFiltered(config, callback) { const targetDir = config.imports.openaddresses.datapath; - const files = config.imports.openaddresses.files; - - logger.info(`Attempting to download selected data files: ${files}`); fs.ensureDir(targetDir, (err) => { if (err) { @@ -17,49 +16,72 @@ function downloadFiltered(config, callback) { return callback(err); } - async.eachLimit(files, 5, downloadSource.bind(null, targetDir), callback); + const files = getFiles(config, targetDir, callback); + logger.info(`Attempting to download selected data files: ${files.map(file => file.csv)}`); + // wait to avoid being banned by openaddresses.io + const options = { + maxSockets: 1, + rateLimit: 3000 + }; + const queue = new RequestQueue.default(options) + .on(RequestQueue.ITEM_EVENT, (url, data, done) => { + downloadSource(targetDir, data.file, () => done()); + }) + .on(RequestQueue.END_EVENT, callback); + files.map(file => { + queue.enqueue(new URL(file.url), {file: file}); + }); }); } -function downloadSource(targetDir, file, main_callback) { +function getFiles(config, targetDir, main_callback){ const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal'); - logger.info(`Downloading ${file}`); - - const source = file.replace('.csv', '.zip'); - const sourceUrl = `https://results.openaddresses.io/latest/run/${source}`; - - // sources MUST end with '.csv' - if( !file.endsWith('.csv') ){ - const msg = `invalid source '${file}': MUST end with '.csv'`; - logger.warn(msg); + const files = config.imports.openaddresses.files; + files.forEach(file => { + // sources MUST end with '.csv' + if( !file.endsWith('.csv') ){ + const msg = `invalid source '${file}': MUST end with '.csv'`; + logger.warn(msg); - // respect 'imports.openaddresses.missingFilesAreFatal' setting - return main_callback(errorsFatal ? msg : null); - } + // respect 'imports.openaddresses.missingFilesAreFatal' setting + return main_callback(errorsFatal ? msg : null); + } + }); + return files.map(file => { + const source = file.replace('.csv', '.zip'); + const name = file.replace('.csv', '').replace(/\//g,'-'); + return { + csv: file, + url: `https://results.openaddresses.io/latest/run/${source}`, + zip: tmp.tmpNameSync({prefix: name, dir: targetDir, postfix: '.zip'}) + }; + }); +} - const name = file.replace('.csv', '').replace(/\//g,'-'); - const tmpZipFile = tmp.tmpNameSync({prefix: name, dir: targetDir, postfix: '.zip'}); +function downloadSource(targetDir, file, main_callback) { + const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal'); + logger.info(`Downloading ${file.csv}`); async.series( [ // download the zip file into the temp directory (callback) => { - logger.debug(`downloading ${sourceUrl}`); - child_process.exec(`curl -L -X GET -o ${tmpZipFile} ${sourceUrl}`, callback); + logger.debug(`downloading ${file.url}`); + child_process.exec(`curl -L -X GET -o ${file.zip} ${file.url}`, callback); }, // unzip file into target directory (callback) => { - logger.debug(`unzipping ${tmpZipFile} to ${targetDir}`); - child_process.exec(`unzip -o -qq -d ${targetDir} ${tmpZipFile}`, callback); + logger.debug(`unzipping ${file.zip} to ${targetDir}`); + child_process.exec(`unzip -o -qq -d ${targetDir} ${file.zip}`, callback); }, // delete the temp downloaded zip file - fs.remove.bind(null, tmpZipFile) + fs.remove.bind(null, file.zip) ], function(err) { if (err) { - logger.warn(`failed to download ${sourceUrl}: ${err}`); + logger.warn(`failed to download ${file.url}: ${err}`); } // honour 'imports.openaddresses.missingFilesAreFatal' setting From d552c4f645a3c1a1d3a1897fbd63bffe9d9cf0a9 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Tue, 3 Sep 2019 15:58:30 +0200 Subject: [PATCH 2/5] refactor: simplify code + support node6 using bottleneck module --- package.json | 2 +- utils/download_filtered.js | 18 +++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/package.json b/package.json index d919db7a..5f571a5c 100644 --- a/package.json +++ b/package.json @@ -8,12 +8,12 @@ "main": "import.js", "dependencies": { "async": "^2.5.0", + "bottleneck": "^2.19.5", "combined-stream": "^1.0.7", "csv-parse": "^4.0.0", "fs-extra": "^7.0.0", "glob": "^7.0.0", "joi": "^14.0.0", - "limited-request-queue": "^5.0.0", "lodash": "^4.16.0", "minimist": "^1.2.0", "pelias-blacklist-stream": "^1.0.0", diff --git a/utils/download_filtered.js b/utils/download_filtered.js index 3ce5dd62..62a1faad 100644 --- a/utils/download_filtered.js +++ b/utils/download_filtered.js @@ -4,8 +4,7 @@ const async = require('async'); const fs = require('fs-extra'); const tmp = require('tmp'); const logger = require('pelias-logger').get('openaddresses-download'); -const RequestQueue = require('limited-request-queue'); -const URL = require('url').URL; +const Bottleneck = require('bottleneck/es5'); function downloadFiltered(config, callback) { const targetDir = config.imports.openaddresses.datapath; @@ -19,18 +18,15 @@ function downloadFiltered(config, callback) { const files = getFiles(config, targetDir, callback); logger.info(`Attempting to download selected data files: ${files.map(file => file.csv)}`); - // wait to avoid being banned by openaddresses.io + // limit requests to avoid being banned by openaddresses.io const options = { - maxSockets: 1, - rateLimit: 3000 + maxConcurrent: 1, + minTime: 3000 }; - const queue = new RequestQueue.default(options) - .on(RequestQueue.ITEM_EVENT, (url, data, done) => { - downloadSource(targetDir, data.file, () => done()); - }) - .on(RequestQueue.END_EVENT, callback); + const limiter = new Bottleneck(options) + .on('empty', callback); // This will be called when `limiter.empty()` becomes true. files.map(file => { - queue.enqueue(new URL(file.url), {file: file}); + limiter.submit(downloadSource, targetDir, file, null); }); }); From 43bca0cec49c4f11de09e8d9dd037f9b3c98b0c8 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Tue, 3 Sep 2019 15:59:02 +0200 Subject: [PATCH 3/5] feat: allow interruption with Ctrl+C --- utils/download_filtered.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/utils/download_filtered.js b/utils/download_filtered.js index 62a1faad..d70a271a 100644 --- a/utils/download_filtered.js +++ b/utils/download_filtered.js @@ -28,6 +28,10 @@ function downloadFiltered(config, callback) { files.map(file => { limiter.submit(downloadSource, targetDir, file, null); }); + process.on('SIGINT', () => { + limiter.stop({dropWaitingJobs: true}); + process.exit(); + }); }); } From bbcc9c17b0065605d3064bb357fbb0a2a650706e Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Tue, 3 Sep 2019 16:14:15 +0200 Subject: [PATCH 4/5] fix: callback on last request return Before, it was called on last request call, before the last request had been processed. --- utils/download_filtered.js | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/utils/download_filtered.js b/utils/download_filtered.js index d70a271a..e8473db8 100644 --- a/utils/download_filtered.js +++ b/utils/download_filtered.js @@ -23,10 +23,14 @@ function downloadFiltered(config, callback) { maxConcurrent: 1, minTime: 3000 }; - const limiter = new Bottleneck(options) - .on('empty', callback); // This will be called when `limiter.empty()` becomes true. + const limiter = new Bottleneck(options); + const callbackOnLastOne = () => { + if (limiter.empty()) { + callback(); + } + }; files.map(file => { - limiter.submit(downloadSource, targetDir, file, null); + limiter.submit(downloadSource, targetDir, file, callbackOnLastOne); }); process.on('SIGINT', () => { limiter.stop({dropWaitingJobs: true}); From bd5032bf5db2bcedead336c409919e2a290dc116 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Tue, 3 Sep 2019 16:19:25 +0200 Subject: [PATCH 5/5] fix: set the rate limit to 10/min as documented --- utils/download_filtered.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/utils/download_filtered.js b/utils/download_filtered.js index e8473db8..67c26b8c 100644 --- a/utils/download_filtered.js +++ b/utils/download_filtered.js @@ -19,9 +19,11 @@ function downloadFiltered(config, callback) { logger.info(`Attempting to download selected data files: ${files.map(file => file.csv)}`); // limit requests to avoid being banned by openaddresses.io + // current policy is 10 request per minute + // https://github.com/pelias/openaddresses/issues/433#issuecomment-527383976 const options = { maxConcurrent: 1, - minTime: 3000 + minTime: 6000 }; const limiter = new Bottleneck(options); const callbackOnLastOne = () => {