This repository has been archived by the owner on Oct 2, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathjetstreamPublish.js
51 lines (50 loc) · 2 KB
/
jetstreamPublish.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import { realizeChannelName, getMessageType, realizeParametersForChannelWrapper, messageHasNullPayload, renderJSDocParameters } from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';
/**
* Component which returns a function which publishes to the given channel
*
* @param {string} channelName to publish to
* @param {Message} message which is being published
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamPublish(channelName, message, channelParameters) {
const messageType = getMessageType(message);
const hasNullPayload = messageHasNullPayload(message.payload());
//Determine the publish operation based on whether the message type is null
let publishOperation = `await js.publish(${realizeChannelName(channelParameters, channelName)}, Nats.Empty);`;
if (!hasNullPayload) {
publishOperation = `
let dataToSend : any = message.marshal();
dataToSend = codec.encode(dataToSend);
js.publish(${realizeChannelName(channelParameters, channelName)}, dataToSend, options);`;
}
return `
/**
* Internal functionality to publish message to jetstream channel
* ${channelName}
*
* @param message to publish
* @param js to publish with
* @param codec used to convert messages
${renderJSDocParameters(channelParameters)}
* @param options to publish with
*/
export function jetStreamPublish(
message: ${messageType},
js: Nats.JetStreamClient,
codec: Nats.Codec<any>
${realizeParametersForChannelWrapper(channelParameters)},
options?: Nats.PublishOptions
): Promise<void> {
return new Promise<void>(async (resolve, reject) => {
try{
${publishOperation}
resolve();
}catch(e: any){
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
});
};
`;
}