Skip to content

Commit

Permalink
Merge pull request pelias#435 from severo/download_bottleneck
Browse files Browse the repository at this point in the history
Rate limiting, using bottleneck
  • Loading branch information
orangejulius authored Sep 3, 2019
2 parents ed96f34 + bd5032b commit 744c315
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 25 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"main": "import.js",
"dependencies": {
"async": "^2.5.0",
"bottleneck": "^2.19.5",
"combined-stream": "^1.0.7",
"csv-parse": "^4.0.0",
"fs-extra": "^7.0.0",
Expand Down
78 changes: 53 additions & 25 deletions utils/download_filtered.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,90 @@ const async = require('async');
const fs = require('fs-extra');
const tmp = require('tmp');
const logger = require('pelias-logger').get('openaddresses-download');
const Bottleneck = require('bottleneck/es5');

function downloadFiltered(config, callback) {
const targetDir = config.imports.openaddresses.datapath;
const files = config.imports.openaddresses.files;

logger.info(`Attempting to download selected data files: ${files}`);

fs.ensureDir(targetDir, (err) => {
if (err) {
logger.error(`error making directory ${targetDir}`, err);
return callback(err);
}

async.eachLimit(files, 5, downloadSource.bind(null, targetDir), callback);
const files = getFiles(config, targetDir, callback);
logger.info(`Attempting to download selected data files: ${files.map(file => file.csv)}`);

// limit requests to avoid being banned by openaddresses.io
// current policy is 10 request per minute
// https://github.com/pelias/openaddresses/issues/433#issuecomment-527383976
const options = {
maxConcurrent: 1,
minTime: 6000
};
const limiter = new Bottleneck(options);
const callbackOnLastOne = () => {
if (limiter.empty()) {
callback();
}
};
files.map(file => {
limiter.submit(downloadSource, targetDir, file, callbackOnLastOne);
});
process.on('SIGINT', () => {
limiter.stop({dropWaitingJobs: true});
process.exit();
});
});

}

function downloadSource(targetDir, file, main_callback) {
function getFiles(config, targetDir, main_callback){
const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal');
logger.info(`Downloading ${file}`);

const source = file.replace('.csv', '.zip');
const sourceUrl = `https://results.openaddresses.io/latest/run/${source}`;

// sources MUST end with '.csv'
if( !file.endsWith('.csv') ){
const msg = `invalid source '${file}': MUST end with '.csv'`;
logger.warn(msg);
const files = config.imports.openaddresses.files;
files.forEach(file => {
// sources MUST end with '.csv'
if( !file.endsWith('.csv') ){
const msg = `invalid source '${file}': MUST end with '.csv'`;
logger.warn(msg);

// respect 'imports.openaddresses.missingFilesAreFatal' setting
return main_callback(errorsFatal ? msg : null);
}
// respect 'imports.openaddresses.missingFilesAreFatal' setting
return main_callback(errorsFatal ? msg : null);
}
});
return files.map(file => {
const source = file.replace('.csv', '.zip');
const name = file.replace('.csv', '').replace(/\//g,'-');
return {
csv: file,
url: `https://results.openaddresses.io/latest/run/${source}`,
zip: tmp.tmpNameSync({prefix: name, dir: targetDir, postfix: '.zip'})
};
});
}

const name = file.replace('.csv', '').replace(/\//g,'-');
const tmpZipFile = tmp.tmpNameSync({prefix: name, dir: targetDir, postfix: '.zip'});
function downloadSource(targetDir, file, main_callback) {
const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal');
logger.info(`Downloading ${file.csv}`);

async.series(
[
// download the zip file into the temp directory
(callback) => {
logger.debug(`downloading ${sourceUrl}`);
child_process.exec(`curl -L -X GET -o ${tmpZipFile} ${sourceUrl}`, callback);
logger.debug(`downloading ${file.url}`);
child_process.exec(`curl -L -X GET -o ${file.zip} ${file.url}`, callback);
},
// unzip file into target directory
(callback) => {
logger.debug(`unzipping ${tmpZipFile} to ${targetDir}`);
child_process.exec(`unzip -o -qq -d ${targetDir} ${tmpZipFile}`, callback);
logger.debug(`unzipping ${file.zip} to ${targetDir}`);
child_process.exec(`unzip -o -qq -d ${targetDir} ${file.zip}`, callback);
},
// delete the temp downloaded zip file
fs.remove.bind(null, tmpZipFile)
fs.remove.bind(null, file.zip)
],
function(err) {
if (err) {
logger.warn(`failed to download ${sourceUrl}: ${err}`);
logger.warn(`failed to download ${file.url}: ${err}`);
}

// honour 'imports.openaddresses.missingFilesAreFatal' setting
Expand Down

0 comments on commit 744c315

Please sign in to comment.