Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API] Add CSV bulk indexing support to Kibana API #6844

Merged
merged 18 commits into from
May 11, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
"clipboard": "1.5.5",
"commander": "2.8.1",
"css-loader": "0.17.0",
"csv": "0.4.6",
"csv-parse": "1.1.0",
"d3": "3.5.6",
"elasticsearch": "10.1.2",
"elasticsearch-browser": "10.1.2",
Expand Down
3 changes: 2 additions & 1 deletion src/cli/cluster/base_path_proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ export default class BasePathProxy {
config.set('server.basePath', this.basePath);
}

config.set('server.maxPayloadBytes', 1024 * 1024 * 1024);
const ONE_GIGABYTE = 1024 * 1024 * 1024;
config.set('server.maxPayloadBytes', ONE_GIGABYTE);

setupLogging(null, this.server, config);
setupConnection(null, this.server, config);
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/kibana/server/routes/api/ingest/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { registerPost } from './register_post';
import { registerDelete } from './register_delete';
import { registerProcessors } from './register_processors';
import { registerSimulate } from './register_simulate';
import { registerBulk } from './register_bulk';
import { registerData } from './register_data';

export default function (server) {
registerPost(server);
registerDelete(server);
registerProcessors(server);
registerSimulate(server);
registerBulk(server);
registerData(server);
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import { Promise } from 'bluebird';
import { parse, transform } from 'csv';
import parse from 'csv-parse';
import _ from 'lodash';
import hi from 'highland';
import { patternToIngest } from '../../../../common/lib/convert_pattern_and_ingest_name';
import { PassThrough } from 'stream';
import JSONStream from 'JSONStream';

export function registerBulk(server) {
const ONE_GIGABYTE = 1024 * 1024 * 1024;

export function registerData(server) {
server.route({
path: '/api/kibana/{id}/_bulk',
path: '/api/kibana/{id}/_data',
method: 'POST',
config: {
payload: {
output: 'stream',
maxBytes: 1024 * 1024 * 1024
maxBytes: ONE_GIGABYTE
}
},
handler: function (req, reply) {
Expand Down Expand Up @@ -46,7 +48,7 @@ export function registerBulk(server) {
push(null, doc);
}
else {
push(null, {index: _.isEmpty(fileName) ? {} : {_id: `L${currentLine} - ${fileName}`}});
push(null, {index: _.isEmpty(fileName) ? {} : {_id: `${fileName}:${currentLine}`}});
push(null, doc);
currentLine++;
next();
Expand All @@ -71,7 +73,8 @@ export function registerBulk(server) {
return _.reduce(response.items, (memo, docResponse) => {
const indexResult = docResponse.index;
if (indexResult.error) {
if (_.isUndefined(_.get(memo, 'errors.index'))) {
const hasIndexingErrors = _.isUndefined(_.get(memo, 'errors.index'));
if (hasIndexingErrors) {
_.set(memo, 'errors.index', []);
}
memo.errors.index.push(_.pick(indexResult, ['_id', 'error']));
Expand Down
68 changes: 34 additions & 34 deletions test/unit/api/ingest/_bulk.js → test/unit/api/ingest/_data.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ define(function (require) {

return function (bdd, scenarioManager, request) {
const es = scenarioManager.client;
bdd.describe('_bulk', function () {
bdd.describe('_data', function () {

bdd.beforeEach(function () {
return es.indices.putTemplate({
Expand All @@ -27,31 +27,31 @@ define(function (require) {
});

bdd.it('should accept a multipart/form-data request with a csv file attached', function () {
return request.post('/kibana/names/_bulk')
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/data/fake_names.csv')
.expect(200);
});

bdd.it('should also accept the raw csv data in the payload body', function () {
var csvData = fs.readFileSync('test/unit/data/fake_names_big.csv', {encoding: 'utf8'});

return request.post('/kibana/names/_bulk')
return request.post('/kibana/names/_data')
.send(csvData)
.expect(200);
});

bdd.it('should return JSON results', function () {
return request.post('/kibana/names/_bulk')
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/data/fake_names.csv')
.expect('Content-Type', /json/)
.expect(200);
});

bdd.it('should index one document per row in the csv', function () {
return request.post('/kibana/names/_bulk')
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/data/fake_names.csv')
.expect(200)
.then((bulkResponse) => {
.then(() => {
return es.indices.refresh()
.then(() => {
return es.count({ index: 'names' })
Expand All @@ -63,72 +63,72 @@ define(function (require) {
});

bdd.it('should stream a chunked response', function () {
return request.post('/kibana/names/_bulk')
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/data/fake_names.csv')
.expect('Transfer-Encoding', 'chunked')
.expect(200);
});

bdd.it('should respond with an array of one or more "result objects"', function () {
return request.post('/kibana/names/_bulk')
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/data/fake_names_big.csv')
.expect(200)
.then((bulkResponse) => {
expect(bulkResponse.body.length).to.be(14);
.then((dataResponse) => {
expect(dataResponse.body.length).to.be(14);
});
});

bdd.describe('result objects', function () {

bdd.it('should include a count of created documents', function () {
return request.post('/kibana/names/_bulk')
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/data/fake_names.csv')
.expect(200)
.then((bulkResponse) => {
expect(bulkResponse.body[0]).to.have.property('created');
expect(bulkResponse.body[0].created).to.be(100);
.then((dataResponse) => {
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(100);
});
});

bdd.it('should report any indexing errors per document under an "errors.index" key', function () {
return request.post('/kibana/names/_bulk')
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/data/fake_names_with_mapping_errors.csv')
.expect(200)
.then((bulkResponse) => {
expect(bulkResponse.body[0]).to.have.property('created');
expect(bulkResponse.body[0].created).to.be(98);
expect(bulkResponse.body[0]).to.have.property('errors');
expect(bulkResponse.body[0].errors).to.have.property('index');
expect(bulkResponse.body[0].errors.index.length).to.be(2);
.then((dataResponse) => {
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(98);
expect(dataResponse.body[0]).to.have.property('errors');
expect(dataResponse.body[0].errors).to.have.property('index');
expect(dataResponse.body[0].errors.index.length).to.be(2);
});
});

bdd.it('should report any csv parsing errors under an "errors.other" key', function () {
return request.post('/kibana/names/_bulk')
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/data/fake_names_with_parse_errors.csv')
.expect(200)
.then((bulkResponse) => {
.then((dataResponse) => {
// parse errors immediately abort indexing
expect(bulkResponse.body[0]).to.have.property('created');
expect(bulkResponse.body[0].created).to.be(0);
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(0);

expect(bulkResponse.body[0]).to.have.property('errors');
expect(bulkResponse.body[0].errors).to.have.property('other');
expect(bulkResponse.body[0].errors.other.length).to.be(1);
expect(dataResponse.body[0]).to.have.property('errors');
expect(dataResponse.body[0].errors).to.have.property('other');
expect(dataResponse.body[0].errors.other.length).to.be(1);
});
});

});

bdd.describe('optional parameters', function () {
bdd.it('should accept a custom delimiter query string param for parsing the CSV', function () {
return request.post('/kibana/names/_bulk?delimiter=|')
return request.post('/kibana/names/_data?delimiter=|')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed this. The | could be passed in as %7C because of URL encoding. Mind adding a test to make sure that works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the API handle the delimiter correctly if it is URL-encoded? For example, in this case it would be delimiter=%7F. Mind writing a separate unit test for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect supertest (the request lib being used here) to handle the url encoding, so this should actually be testing delimiter=%7F. If that's the case, passing %7F explicitly will cause it to be double encoded. I'll confirm that's the case though.

Copy link
Contributor

@ycombinator ycombinator May 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think URL-encoding is working quite right (or I could be doing something wrong too).

It works if I URL-encode | as %7F:

$ curl -k -H'kbn-version: 5.0.0-snapshot' -XPOST https://localhost:5601/dun/api/kibana/foo/_data?delimiter=%7F -d 'foo,bar
1|2
3|4
17|18'
[
{"created":3}
]

But it doesn't work if I use & as my delimiter and URL-encode it as %26:

$ curl -k -H'kbn-version: 5.0.0-snapshot' -XPOST https://localhost:5601/dun/api/kibana/foo/_data?delimiter=%26 -d 'foo,bar
1&2
3&4
17&18'
[
{"created":0,"errors":{"other":["Number of columns on line 2 does not match header"]}}
]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is that you still have a comma instead of an ampersand in your header line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, you're right. Sorry about that. I can confirm that URL-encoding the delimiter works as expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, no worries. Url encoding on the API side should really "just work" since Hapi handles all that.

.attach('csv', 'test/unit/data/fake_names_pipe_delimited.csv')
.expect(200)
.then((bulkResponse) => {
expect(bulkResponse.body[0]).to.have.property('created');
expect(bulkResponse.body[0].created).to.be(2);
expect(bulkResponse.body[0]).to.not.have.property('errors');
.then((dataResponse) => {
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(2);
expect(dataResponse.body[0]).to.not.have.property('errors');

return es.indices.refresh();
})
Expand Down Expand Up @@ -160,7 +160,7 @@ define(function (require) {
}
})
.then((res) => {
return request.post('/kibana/names/_bulk?pipeline=true')
return request.post('/kibana/names/_data?pipeline=true')
.attach('csv', 'test/unit/data/fake_names.csv')
.expect(200);
})
Expand Down
4 changes: 2 additions & 2 deletions test/unit/api/ingest/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ define(function (require) {
var expect = require('intern/dojo/node!expect.js');
var post = require('./_post');
var del = require('./_del');
var bulk = require('./_bulk');
var data = require('./_data');
var simulate = require('./_simulate');
var processors = require('./_processors');
var processorTypes = require('./processors/index');
Expand All @@ -27,7 +27,7 @@ define(function (require) {

post(bdd, scenarioManager, request);
del(bdd, scenarioManager, request);
bulk(bdd, scenarioManager, request);
data(bdd, scenarioManager, request);
simulate(bdd, scenarioManager, request);
processors(bdd, scenarioManager, request);
processorTypes(bdd, scenarioManager, request);
Expand Down