-
Notifications
You must be signed in to change notification settings - Fork 8.3k
/
Copy pathcluster.js
115 lines (90 loc) · 3.02 KB
/
cluster.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import elasticsearch from 'elasticsearch';
import { get, set, isEmpty, cloneDeep, pick } from 'lodash';
import toPath from 'lodash/internal/toPath';
import Boom from 'boom';
import filterHeaders from './filter_headers';
import { parseConfig } from './parse_config';
export class Cluster {
constructor(config) {
this._config = {
...config
};
this.errors = elasticsearch.errors;
this._client = this.createClient();
this._noAuthClient = this.createClient({ auth: false });
return this;
}
callWithRequest = (req = {}, endpoint, clientParams = {}, options = {}) => {
if (req.headers) {
const filteredHeaders = filterHeaders(req.headers, this.getRequestHeadersWhitelist());
set(clientParams, 'headers', filteredHeaders);
}
return callAPI(this._noAuthClient, endpoint, clientParams, options);
}
callWithInternalUser = (endpoint, clientParams = {}, options = {}) => {
return callAPI(this._client, endpoint, clientParams, options);
}
getRequestHeadersWhitelist = () => getClonedProperty(this._config, 'requestHeadersWhitelist');
getCustomHeaders = () => getClonedProperty(this._config, 'customHeaders');
getRequestTimeout = () => getClonedProperty(this._config, 'requestTimeout');
getUrl = () => getClonedProperty(this._config, 'url');
getSsl = () => getClonedProperty(this._config, 'ssl');
getClient = () => this._client;
close() {
if (this._client) {
this._client.close();
}
if (this._noAuthClient) {
this._noAuthClient.close();
}
}
createClient = configOverrides => {
const config = {
...this._getClientConfig(),
...configOverrides
};
return new elasticsearch.Client(parseConfig(config));
}
_getClientConfig = () => {
return getClonedProperties(this._config, [
'url',
'ssl',
'username',
'password',
'customHeaders',
'plugins',
'apiVersion',
'keepAlive',
'pingTimeout',
'requestTimeout',
'log'
]);
}
}
function callAPI(client, endpoint, clientParams = {}, options = {}) {
const wrap401Errors = options.wrap401Errors !== false;
const clientPath = toPath(endpoint);
const api = get(client, clientPath);
let apiContext = get(client, clientPath.slice(0, -1));
if (isEmpty(apiContext)) {
apiContext = client;
}
if (!api) {
throw new Error(`called with an invalid endpoint: ${endpoint}`);
}
return api.call(apiContext, clientParams).catch((err) => {
if (!wrap401Errors || err.statusCode !== 401) {
return Promise.reject(err);
}
const boomError = Boom.boomify(err, { statusCode: err.statusCode });
const wwwAuthHeader = get(err, 'body.error.header[WWW-Authenticate]');
boomError.output.headers['WWW-Authenticate'] = wwwAuthHeader || 'Basic realm="Authorization Required"';
throw boomError;
});
}
function getClonedProperties(config, paths) {
return cloneDeep(paths ? pick(config, paths) : config);
}
function getClonedProperty(config, path) {
return cloneDeep(path ? get(config, path) : config);
}