-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathminio-helper.ts
142 lines (134 loc) · 5.07 KB
/
minio-helper.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import { Stream } from 'stream';
// Copyright 2019-2020 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Transform, PassThrough } from 'stream';
import * as tar from 'tar-stream';
import peek from 'peek-stream';
import gunzip from 'gunzip-maybe';
import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'minio';
import { awsInstanceProfileCredentials } from './aws-helper';
/** MinioRequestConfig describes the info required to retrieve an artifact. */
export interface MinioRequestConfig {
bucket: string;
key: string;
client: MinioClient;
tryExtract?: boolean;
}
/** MinioClientOptionsWithOptionalSecrets wraps around MinioClientOptions where only endPoint is required (accesskey and secretkey are optional). */
export interface MinioClientOptionsWithOptionalSecrets extends Partial<MinioClientOptions> {
endPoint: string;
}
/**
* Create minio client with aws instance profile credentials if needed.
* @param config minio client options where `accessKey` and `secretKey` are optional.
*/
export async function createMinioClient(config: MinioClientOptionsWithOptionalSecrets) {
if (!config.accessKey || !config.secretKey) {
try {
if (await awsInstanceProfileCredentials.ok()) {
const credentials = await awsInstanceProfileCredentials.getCredentials();
if (credentials) {
const {
AccessKeyId: accessKey,
SecretAccessKey: secretKey,
Token: sessionToken,
} = credentials;
return new MinioClient({ ...config, accessKey, secretKey, sessionToken });
}
console.error('unable to get credentials from AWS metadata store.');
}
} catch (err) {
console.error('Unable to get aws instance profile credentials: ', err);
}
}
return new MinioClient(config as MinioClientOptions);
}
/**
* Checks the magic number of a buffer to see if the mime type is a uncompressed
* tarball. The buffer must be of length 264 bytes or more.
*
* See also: https://www.gnu.org/software/tar/manual/html_node/Standard.html
*
* @param buf Buffer
*/
export function isTarball(buf: Buffer) {
if (!buf || buf.length < 264) {
return false;
}
const offset = 257;
const v1 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x00, 0x30, 0x30];
const v0 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x20, 0x20, 0x00];
return (
v1.reduce((res, curr, i) => res && curr === buf[offset + i], true) ||
v0.reduce((res, curr, i) => res && curr === buf[offset + i], true as boolean)
);
}
/**
* Returns a stream that extracts the first record of a tarball if the source
* stream is a tarball, otherwise just pipe the content as is.
*/
export function maybeTarball(): Transform {
return peek(
{ newline: false, maxBuffer: 264 },
(data: Buffer, swap: (error?: Error, parser?: Transform) => void) => {
if (isTarball(data)) swap(undefined, extractFirstTarRecordAsStream());
else swap(undefined, new PassThrough());
},
);
}
/**
* Returns a transform stream where the first record inside a tarball will be
* pushed - i.e. all other contents will be dropped.
*/
function extractFirstTarRecordAsStream() {
const extract = tar.extract();
const transformStream = new Transform({
write: (chunk: any, encoding: string, callback: (error?: Error | null) => void) => {
extract.write(chunk, encoding, callback);
},
});
extract.once('entry', function(_header, stream, next) {
stream.on('data', (buffer: any) => transformStream.push(buffer));
stream.on('end', () => {
transformStream.emit('end');
next();
});
stream.resume(); // just auto drain the stream
});
extract.on('error', error => transformStream.emit('error', error));
return transformStream;
}
/**
* Returns a stream from an object in a s3 compatible object store (e.g. minio).
* The actual content of the stream depends on the object.
*
* Any gzipped or deflated objects will be ungzipped or inflated. If the object
* is a tarball, only the content of the first record in the tarball will be
* returned. For any other objects, the raw content will be returned.
*
* @param param.bucket Bucket name to retrieve the object from.
* @param param.key Key of the object to retrieve.
* @param param.client Minio client.
* @param param.tryExtract Whether we try to extract *.tar.gz, default to true.
*
*/
export async function getObjectStream({
bucket,
key,
client,
tryExtract = true,
}: MinioRequestConfig): Promise<Transform> {
const stream = await client.getObject(bucket, key);
return tryExtract ? stream.pipe(gunzip()).pipe(maybeTarball()) : stream.pipe(new PassThrough());
}