From 3692320aa8df95d454814909faf074627a237b5e Mon Sep 17 00:00:00 2001 From: Kurru Date: Tue, 26 Jul 2022 20:33:38 -0700 Subject: [PATCH] Send subsegments separately from segments when serialized size is larger than the UDP packet size limit. This is similar to how the DefaultStreamingStrategy behaves. (#344) Segments occasionally can be too large and contain hundreds subsegments, despite the behavior of the DefaultStreamingStrategy class, when running with multiple threads. It is suspected that race conditions can lead to subsegments building up and not getting pushed into xray daemon by the time the segment is ended. --- .../amazonaws/xray/emitters/UDPEmitter.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/aws-xray-recorder-sdk-core/src/main/java/com/amazonaws/xray/emitters/UDPEmitter.java b/aws-xray-recorder-sdk-core/src/main/java/com/amazonaws/xray/emitters/UDPEmitter.java index 542411fc..4e9244a1 100644 --- a/aws-xray-recorder-sdk-core/src/main/java/com/amazonaws/xray/emitters/UDPEmitter.java +++ b/aws-xray-recorder-sdk-core/src/main/java/com/amazonaws/xray/emitters/UDPEmitter.java @@ -23,6 +23,7 @@ import java.net.DatagramSocket; import java.net.SocketException; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +34,7 @@ @Deprecated public class UDPEmitter extends Emitter { private static final Log logger = LogFactory.getLog(UDPEmitter.class); + private static final int UDP_PACKET_LIMIT = 63 * 1024; private DatagramSocket daemonSocket; private DaemonConfiguration config; @@ -82,8 +84,20 @@ public boolean sendSegment(Segment segment) { logger.debug(segment.prettySerialize()); } if (segment.compareAndSetEmitted(false, true)) { - return sendData((PROTOCOL_HEADER + PROTOCOL_DELIMITER + segment.serialize()).getBytes(StandardCharsets.UTF_8), - segment); + byte[] bytes = (PROTOCOL_HEADER + PROTOCOL_DELIMITER + segment.serialize()).getBytes(StandardCharsets.UTF_8); + + if (bytes.length > UDP_PACKET_LIMIT) { + List subsegments = segment.getSubsegmentsCopy(); + logger.debug("Segment too large, sending subsegments to daemon first. bytes " + bytes.length + " subsegemnts " + + subsegments.size()); + for (Subsegment subsegment : subsegments) { + sendSubsegment(subsegment); + segment.removeSubsegment(subsegment); + } + bytes = (PROTOCOL_HEADER + PROTOCOL_DELIMITER + segment.serialize()).getBytes(StandardCharsets.UTF_8); + logger.debug("New segment size. bytes " + bytes.length); + } + return sendData(bytes, segment); } else { return false; }