Skip to content

Commit

Permalink
Send subsegments separately from segments when serialized size is lar…
Browse files Browse the repository at this point in the history
…ger than the UDP packet size limit. This is similar to how the DefaultStreamingStrategy behaves. (aws#344)

Segments occasionally can be too large and contain hundreds subsegments, despite the behavior of the DefaultStreamingStrategy class, when running with multiple threads. It is suspected that race conditions can lead to subsegments building up and not getting pushed into xray daemon by the time the segment is ended.
  • Loading branch information
Kurru authored Jul 27, 2022
1 parent 6f29232 commit 3692320
Showing 1 changed file with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.DatagramSocket;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -33,6 +34,7 @@
@Deprecated
public class UDPEmitter extends Emitter {
private static final Log logger = LogFactory.getLog(UDPEmitter.class);
private static final int UDP_PACKET_LIMIT = 63 * 1024;

private DatagramSocket daemonSocket;
private DaemonConfiguration config;
Expand Down Expand Up @@ -82,8 +84,20 @@ public boolean sendSegment(Segment segment) {
logger.debug(segment.prettySerialize());
}
if (segment.compareAndSetEmitted(false, true)) {
return sendData((PROTOCOL_HEADER + PROTOCOL_DELIMITER + segment.serialize()).getBytes(StandardCharsets.UTF_8),
segment);
byte[] bytes = (PROTOCOL_HEADER + PROTOCOL_DELIMITER + segment.serialize()).getBytes(StandardCharsets.UTF_8);

if (bytes.length > UDP_PACKET_LIMIT) {
List<Subsegment> subsegments = segment.getSubsegmentsCopy();
logger.debug("Segment too large, sending subsegments to daemon first. bytes " + bytes.length + " subsegemnts "
+ subsegments.size());
for (Subsegment subsegment : subsegments) {
sendSubsegment(subsegment);
segment.removeSubsegment(subsegment);
}
bytes = (PROTOCOL_HEADER + PROTOCOL_DELIMITER + segment.serialize()).getBytes(StandardCharsets.UTF_8);
logger.debug("New segment size. bytes " + bytes.length);
}
return sendData(bytes, segment);
} else {
return false;
}
Expand Down

0 comments on commit 3692320

Please sign in to comment.