diff --git a/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcher.java b/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcher.java index a2e164ec3..ff026216f 100644 --- a/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcher.java +++ b/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcher.java @@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.RecordTooLargeException; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; @@ -314,7 +315,12 @@ private boolean send(AggTaskKey key, AggProtos.AggTaskValue value) { if (value.hasDataTable()) { count += value.getDataTable().getRowCount(); } - if (exception != null) { + if (exception instanceof RecordTooLargeException) { + log.warn("[agg] [{}] write kafka error for {}, metric=[{}] size=[{}]", // + key, exception.getMessage(), value.getMetric(), count); + StatUtils.KAFKA_SEND.add(StringsKey.of(topic, "DISCARD", "unknownPartition"), + new long[] {1, count, 0}); + } else if (exception != null) { log.error("[agg] [{}] write kafka error, metric=[{}] size=[{}]", // key, value.getMetric(), count, exception); StatUtils.KAFKA_SEND.add(StringsKey.of(topic, "ERROR", "unknownPartition"),