Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step Collector - read exchange message body content only once #256

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public void notify(CamelEvent event) throws Exception {
if(stepId!= null && !isBlackListed(stepId)){

if (filters == null || EventUtil.isFilteredEquals(filters, stepId)) {
// set custom properties
setCustomProperties(exchange, stepId, stepTimestamp);
//process and store the exchange
processEvent(exchange, stepId, stepTimestamp);
}
Expand All @@ -95,9 +93,16 @@ public void notify(CamelEvent event) throws Exception {

private void processEvent(Exchange exchange, String stepId, long stepTimestamp){

// read body only once
byte[] body = exchange.getMessage().getBody(byte[].class);
int bodyLength = body != null ? body.length : 0;

// set custom properties
setCustomProperties(exchange, bodyLength, stepId, stepTimestamp);

//set fields
Message message = exchange.getMessage();
String body = getBody(exchange);
String bodyToStoreOnEvent = getBodyToStoreOnEvent(exchange, body);
Map<String, Object> headers = message.getHeaders();
Map<String, Object> properties = exchange.getProperties();
String messageId = message.getMessageId();
Expand All @@ -111,18 +116,17 @@ private void processEvent(Exchange exchange, String stepId, long stepTimestamp){

//create json
MessageEvent messageEvent = new MessageEvent(
timestamp, messageId, flowId, flowVersion, stepId, headers, properties, body, expiryDate
timestamp, messageId, flowId, flowVersion, stepId, headers, properties, bodyToStoreOnEvent, expiryDate
);
String json = messageEvent.toJson();

//store the event
storeManager.storeEvent(json);
}

public String getBody(Exchange exchange) {
public String getBodyToStoreOnEvent(Exchange exchange, byte[] body) {

try {
byte[] body = exchange.getMessage().getBody(byte[].class);
int limitBodyLength = getLimitBodyLength();

if (body == null || body.length == 0) {
Expand Down Expand Up @@ -172,7 +176,7 @@ private int getLimitBodyLength() {
}
}

private void setCustomProperties(Exchange exchange, String stepId, long stepTimestamp) {
private void setCustomProperties(Exchange exchange, int bodyLength, String stepId, long stepTimestamp) {
if (EventUtil.isFilteredEquals(filters, stepId)) {
// set response time property
setResponseTimeProperty(exchange, stepTimestamp);
Expand All @@ -184,8 +188,7 @@ private void setCustomProperties(Exchange exchange, String stepId, long stepTime
exchange.setProperty(TIMESTAMP_PROPERTY, sdf.format(calNow.getTime()));

// set BodyLength property
byte[] body = exchange.getMessage().getBody(byte[].class);
exchange.setProperty(MESSAGE_BODY_SIZE_PROPERTY, body != null ? body.length : 0);
exchange.setProperty(MESSAGE_BODY_SIZE_PROPERTY, bodyLength);

// set HeadersLength property
Map<String, Object> headersMap = MessageEvent.filterHeaders(exchange.getMessage().getHeaders());
Expand Down