Skip to content

Commit

Permalink
lib: add nsolid JS API for heap sampling
Browse files Browse the repository at this point in the history
It involves two different set of API's.

The first one, is the classic `N|Solid` api which will create a profile
and send it to the Console. It consists of the following methods:
- `heapSampling()` and `heapSamplingEnd()`.

The second allows to retrieve the heap profile via a `Readable` using
the following method:

```
const stream = heapSamplingStream(threadId, duration, options);
```
  • Loading branch information
santigimeno committed Apr 1, 2024
1 parent 48562cb commit b1609c8
Show file tree
Hide file tree
Showing 9 changed files with 596 additions and 1 deletion.
121 changes: 120 additions & 1 deletion agents/zmq/lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ const {
ERR_NSOLID_CPU_PROFILE_STOP,
ERR_NSOLID_HEAP_PROFILE_START,
ERR_NSOLID_HEAP_PROFILE_STOP,
ERR_NSOLID_HEAP_SAMPLING_START,
ERR_NSOLID_HEAP_SAMPLING_STOP,
ERR_NSOLID_HEAP_SNAPSHOT,
} = require('internal/errors').codes;
const {
validateBoolean,
validateFunction,
validateNumber,
validateObject,
} = require('internal/validators');

module.exports = ({ heapProfile: _heapProfile,
heapProfileEnd: _heapProfileEnd,
heapSampling: _heapSampling,
heapSamplingEnd: _heapSamplingEnd,
profile: _profile,
profileEnd: _profileEnd,
snapshot: _snapshot,
Expand Down Expand Up @@ -121,7 +126,7 @@ module.exports = ({ heapProfile: _heapProfile,
* sends the data to the console once it's done. It does not return until the
* profiling has started or it has failed.
* @param {number} [timeout=600000]
* @param {trackAllocations} [trackAllocations=false]
* @param {boolean} [trackAllocations=false]
* @param {heapProfileCallback} [cb] called when the profile has started or there
* has been an error.
* @example
Expand Down Expand Up @@ -218,6 +223,118 @@ module.exports = ({ heapProfile: _heapProfile,
}
};

const defaultHeapSamplingOptions = { sampleInterval: 512 * 1024, stackDepth: 16, flags: 0 };
/**
* Executes Heap sampling of the current JS thread for `timeout` seconds and
* sends the data to the console once it's done. It does not return until the
* sampling has started or it has failed.
* @param {number} [duration=600000] duration of the sampling in milliseconds
* @param {object} [options={ sampleInterval: 512 * 1024, stackDepth: 16, flags: 0 }]
* @param {number} options.sampleInterval every allocation will be allocated every `sampleInterval` bytes
* @param {string} options.stackDepth stack depth to capture
* @param {string} options.flags flags to pass to the profiler. See:
* https://v8docs.nodesource.com/node-20.3/d7/d76/classv8_1_1_heap_profiler.html#a785d454e7866f222e199d667be567392
* @param {heapSamplingCallback} [cb] called when the sampling has started or there
* has been an error.
* @example
* const nsolid = require('nsolid');
* // async version. Starts sampling for 600ms with default options
* nsolid.heapSampling(600, (err) => {
* if(!err) {
* console.log('Sampling started!');
* }
* });
* // sync version
* try {
* // starts sampling for 600000 ms with sampleInterval of 256kB
* const opts = { sampleInterval: 256 * 1024 };
* nsolid.heapProfile(opts);
* } catch (err) {
* // In case an error happens
* }
* @throws Will throw an error if an error happens and no callback was passed.
* @function heapSampling
* @memberof module:nsolid
*/
const heapSampling = (duration, options, cb) => {
if (typeof duration === 'function') {
cb = duration;
duration = null;
options = null;
} else if (typeof options === 'function') {
cb = options;
options = null;
}

duration = duration || 600000;
options = options || {};
validateObject(options, 'options');
options = { ...defaultHeapSamplingOptions, ...options };
validateNumber(duration, 'duration');
validateNumber(options.sampleInterval, 'options.sampleInterval');
validateNumber(options.stackDepth, 'options.stackDepth');
validateNumber(options.flags, 'options.flags');

if (cb !== undefined) {
validateFunction(cb, 'heapSamplingCallback');
}

const status = _heapSampling(options.sampleInterval, options.stackDepth, options.flags, duration);
let err;
if (status !== 0) {
err = new ERR_NSOLID_HEAP_SAMPLING_START();
err.code = status;
}

if (cb) {
process.nextTick(() => cb(err));
} else if (err) {
throw err;
}
};

/**
* Stops the running heap sampling in the current JS thread (if any)
* @param {heapSamplingEndCallback} [cb] called when the sampling has started or
* there has been an error.
* @example
* const nsolid = require('nsolid');
* // async version
* nsolid.heapSampling(600, (err) => {
* if(!err) {
* console.log('Sampling started!');
* setTimeout(() => {
* nsolid.heapSamplingEnd((err) => {
* if (!err) {
* console.log('Sampling ended!');
* }}
* });
* }, 1000);
* }
* });
* @throws Will throw an error if an error happens and no callback was passed.
* @function heapSamplingEnd
* @memberof module:nsolid
*/
const heapSamplingEnd = (cb) => {
if (cb !== undefined) {
validateFunction(cb, 'heapSamplingEndCallback');
}

const status = _heapSamplingEnd();
let err;
if (status !== 0) {
err = new ERR_NSOLID_HEAP_SAMPLING_STOP();
err.code = status;
}

if (cb) {
process.nextTick(() => cb(err));
} else if (err) {
throw err;
}
};

/**
* It generates a heap snapshot of the current JS thread and sends it to the
* console. It does not return until the snapshot has been generated or an
Expand Down Expand Up @@ -264,6 +381,8 @@ module.exports = ({ heapProfile: _heapProfile,
return {
heapProfile,
heapProfileEnd,
heapSampling,
heapSamplingEnd,
profile,
profileEnd,
snapshot,
Expand Down
42 changes: 42 additions & 0 deletions agents/zmq/src/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ using v8::NewStringType;
using v8::Number;
using v8::Object;
using v8::String;
using v8::Uint32;
using v8::Value;

namespace node {
Expand Down Expand Up @@ -112,6 +113,43 @@ static void EndHeapProfile(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(ZmqAgent::Inst()->stop_heap_profiling(thread_id));
}

static void StartHeapSampling(const FunctionCallbackInfo<Value>& args) {
ASSERT_EQ(4, args.Length());
ASSERT(args[0]->IsNumber());
ASSERT(args[1]->IsUint32());
ASSERT(args[2]->IsUint32());
ASSERT(args[3]->IsNumber());
Isolate* isolate = args.GetIsolate();
Local<Context> context = isolate->GetCurrentContext();
uint64_t thread_id = ThreadId(context);
uint64_t sample_interval = args[0].As<Number>()->Value();
uint32_t stack_depth = args[1].As<Uint32>()->Value();
uint32_t flags = args[2].As<Uint32>()->Value();
uint64_t duration = args[3].As<Number>()->Value();
json message = {
{ "args", {
{ "metadata", {
{ "reason", "Agent API" }
}},
{ "duration", duration },
{ "sampleInterval", sample_interval },
{ "stackDepth", stack_depth },
{ "flags", flags },
{ "duration", duration },
{ "threadId", thread_id }
}}
};

args.GetReturnValue().Set(ZmqAgent::Inst()->start_heap_sampling(message));
}

static void EndHeapSampling(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate = args.GetIsolate();
Local<Context> context = isolate->GetCurrentContext();
uint64_t thread_id = ThreadId(context);
args.GetReturnValue().Set(ZmqAgent::Inst()->stop_heap_sampling(thread_id));
}

static void Start(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(ZmqAgent::Inst()->start());
}
Expand All @@ -127,6 +165,8 @@ static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(EndCPUProfile);
registry->Register(StartHeapProfile);
registry->Register(EndHeapProfile);
registry->Register(StartHeapSampling);
registry->Register(EndHeapSampling);
registry->Register(Config);
registry->Register(Start);
registry->Register(Stop);
Expand All @@ -142,6 +182,8 @@ void InitZmqAgent(Local<Object> exports,
NODE_SET_METHOD(exports, "profileEnd", EndCPUProfile);
NODE_SET_METHOD(exports, "heapProfile", StartHeapProfile);
NODE_SET_METHOD(exports, "heapProfileEnd", EndHeapProfile);
NODE_SET_METHOD(exports, "heapSampling", StartHeapSampling);
NODE_SET_METHOD(exports, "heapSamplingEnd", EndHeapSampling);
if (!node::nsolid::IsMainThread(node::nsolid::GetLocalEnvInst(context))) {
return;
}
Expand Down
7 changes: 7 additions & 0 deletions agents/zmq/src/zmq_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2366,6 +2366,13 @@ int ZmqAgent::stop_heap_profiling(uint64_t thread_id) {
return Snapshot::StopTrackingHeapObjectsSync(GetEnvInst(thread_id));
}

int ZmqAgent::stop_heap_sampling(uint64_t thread_id) {
// This method is only called from the JS thread the profile it's stopping is
// running so the sync StopSamplingSync method can be safely
// called.
return Snapshot::StopSamplingSync(GetEnvInst(thread_id));
}

void ZmqAgent::status_command_cb(SharedEnvInst, ZmqAgent* agent) {
agent->status_cb_(agent->status());
}
Expand Down
12 changes: 12 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -3661,6 +3661,18 @@ Error when trying to start taking a heap profile with the JS API.

Error when trying to stop a heap profile being taken with the JS API.

<a id="ERR_NSOLID_HEAP_SAMPLING_START"></a>

### `ERR_NSOLID_HEAP_SAMPLING_START`

Error when trying to start taking a heap sampling with the JS API.

<a id="ERR_NSOLID_HEAP_SAMPLING_STOP"></a>

### `ERR_NSOLID_HEAP_SAMPLING_STOP`

Error when trying to stop a heap sampling being taken with the JS API.

<a id="ERR_NSOLID_HEAP_SNAPSHOT"></a>

### `ERR_NSOLID_HEAP_SNAPSHOT`
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,10 @@ E('ERR_NSOLID_HEAP_PROFILE_START',
'Heap profile could not be started', Error);
E('ERR_NSOLID_HEAP_PROFILE_STOP',
'Heap profile could not be stopped', Error);
E('ERR_NSOLID_HEAP_SAMPLING_START',
'Heap sampling could not be started', Error);
E('ERR_NSOLID_HEAP_SAMPLING_STOP',
'Heap sampling could not be stopped', Error);
E('ERR_NSOLID_HEAP_SNAPSHOT',
'Heap snapshot could not be generated', Error);
E('ERR_NSOLID_OTEL_API_ALREADY_REGISTERED',
Expand Down
74 changes: 74 additions & 0 deletions lib/nsolid.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const { Readable } = require('stream');
const {
validateBoolean,
validateNumber,
validateObject,
} = require('internal/validators');

const {
Expand All @@ -39,6 +40,7 @@ const zmq = require('internal/agents/zmq/lib/nsolid');
const {
codes: {
ERR_NSOLID_HEAP_PROFILE_START,
ERR_NSOLID_HEAP_SAMPLING_START,
ERR_INVALID_ARG_TYPE,
},
} = require('internal/errors');
Expand Down Expand Up @@ -124,6 +126,9 @@ ObjectAssign(start, {
heapProfileStream,
heapProfile: zmq.heapProfile,
heapProfileEnd: zmq.heapProfileEnd,
heapSamplingStream,
heapSampling: zmq.heapSampling,
heapSamplingEnd: zmq.heapSamplingEnd,
profile: zmq.profile,
profileEnd: zmq.profileEnd,
on,
Expand Down Expand Up @@ -395,6 +400,75 @@ function heapProfileStream(threadId, duration, trackAllocations) {
return readable;
}

const defaultHeapSamplingOptions = { sampleInterval: 512 * 1024, stackDepth: 16, flags: 0 };

/**
* Starts a heap sampling in a specific JS thread and returns a readable stream
* of the profile data.
* @param {number} threadId - The ID of the thread for which to start the heap sampling.
* @param {number} duration - The duration in milliseconds for which to run the heap sampling.
* @param {object} [options={ sampleInterval: 512 * 1024, stackDepth: 16, flags: 0 }]
* @param {number} options.sampleInterval every allocation will be allocated every `sampleInterval` bytes
* @param {string} options.stackDepth stack depth to capture
* @param {string} options.flags flags to pass to the profiler. See:
* https://v8docs.nodesource.com/node-20.3/d7/d76/classv8_1_1_heap_profiler.html#a785d454e7866f222e199d667be567392
* @example
* const nsolid = require('nsolid');
* const threadId = 1;
* const duration = 5000; // 5 seconds
* const trackAllocations = true;
* const heapSamplingStream = nsolid.heapSamplingStream(threadId, duration, {});
* heapSamplingStream.on('data', (data) => {
* console.log('Heap Sampling Data:', data);
* });
* heapSamplingStream.on('error', (err) => {
* console.error('Error:', err);
* });
* @returns {Readable} A readable stream of the heap sampling data. It emits an
* {ERR_NSOLID_HEAP_SAMPLING_START} If there is an error starting the heap profile.
* @alias module:nsolid.heapSamplingStream
*/
function heapSamplingStream(threadId, duration, options) {
validateNumber(threadId, 'threadId');
validateNumber(duration, 'duration');
options = options || {};
validateObject(options, 'options');
options = { ...defaultHeapSamplingOptions, ...options };
validateNumber(options.sampleInterval, 'options.sampleInterval');
validateNumber(options.stackDepth, 'options.stackDepth');
validateNumber(options.flags, 'options.flags');
const readable = new Readable({
read() {
},
destroy(err, cb) {
binding.heapSamplingEnd(threadId);
if (typeof cb === 'function')
cb(err);
},
});

const { sampleInterval, stackDepth, flags } = options;
const ret = binding.heapSampling(threadId, duration, sampleInterval, stackDepth, flags, (status, data) => {
if (status !== 0) {
const err = new ERR_NSOLID_HEAP_SAMPLING_START();
err.code = status;
readable.destroy(err);
} else if (data === null) {
readable.push(null);
} else {
readable.push(data);
}
});

if (ret !== 0) {
const err = new ERR_NSOLID_HEAP_SAMPLING_START();
err.code = ret;
readable.destroy(err);
}

return readable;
}


/**
* It retrieves the startup times of the process.
Expand Down
Loading

0 comments on commit b1609c8

Please sign in to comment.