From 2d4cd8b55006cf4f44a3b2a7d665d198c7346327 Mon Sep 17 00:00:00 2001 From: nscuro Date: Mon, 5 Feb 2024 13:50:38 +0100 Subject: [PATCH] Migrate `MirrorVulnerabilityProcessor` from Kafka Streams to Parallel Consumer Depends on https://github.com/DependencyTrack/hyades-apiserver/pull/552 Relates to https://github.com/DependencyTrack/hyades/issues/346 Relates to https://github.com/DependencyTrack/hyades/issues/901 Relates to https://github.com/DependencyTrack/hyades/issues/907 Signed-off-by: nscuro --- .../kafka/processor/ProcessorInitializer.java | 4 +- .../VulnerabilityMirrorProcessor.java} | 19 ++-- .../streams/KafkaStreamsTopologyFactory.java | 7 -- src/main/resources/application.properties | 9 ++ .../VulnerabilityMirrorProcessorTest.java} | 88 +++++++++---------- 5 files changed, 62 insertions(+), 65 deletions(-) rename src/main/java/org/dependencytrack/event/kafka/{streams/processor/MirrorVulnerabilityProcessor.java => processor/VulnerabilityMirrorProcessor.java} (96%) rename src/test/java/org/dependencytrack/event/kafka/{streams/processor/MirrorVulnerabilityProcessorTest.java => processor/VulnerabilityMirrorProcessorTest.java} (95%) diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java index 81aa599cf..fc47ab994 100644 --- a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java @@ -1,6 +1,7 @@ package org.dependencytrack.event.kafka.processor; import alpine.common.logging.Logger; +import org.dependencytrack.event.kafka.KafkaTopics; import org.dependencytrack.event.kafka.processor.api.ProcessorManager; import javax.servlet.ServletContextEvent; @@ -16,7 +17,8 @@ public class ProcessorInitializer implements ServletContextListener { public void contextInitialized(final ServletContextEvent event) { LOGGER.info("Initializing processors"); - // TODO: Register processor here! + PROCESSOR_MANAGER.registerProcessor(VulnerabilityMirrorProcessor.PROCESSOR_NAME, + KafkaTopics.NEW_VULNERABILITY, new VulnerabilityMirrorProcessor()); PROCESSOR_MANAGER.startAll(); } diff --git a/src/main/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessor.java b/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessor.java similarity index 96% rename from src/main/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessor.java rename to src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessor.java index 0b99c2420..6eaf46c5b 100644 --- a/src/main/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessor.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessor.java @@ -1,4 +1,4 @@ -package org.dependencytrack.event.kafka.streams.processor; +package org.dependencytrack.event.kafka.processor; import alpine.common.logging.Logger; import alpine.common.metrics.Metrics; @@ -6,11 +6,11 @@ import com.github.packageurl.PackageURL; import io.micrometer.core.instrument.Timer; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.cyclonedx.proto.v1_4.Bom; import org.cyclonedx.proto.v1_4.Component; import org.cyclonedx.proto.v1_4.VulnerabilityAffects; +import org.dependencytrack.event.kafka.processor.api.Processor; import org.dependencytrack.model.Vulnerability; import org.dependencytrack.model.VulnerableSoftware; import org.dependencytrack.parser.dependencytrack.ModelConverterCdxToVuln; @@ -27,16 +27,20 @@ import java.util.List; import java.util.Optional; +/** + * A {@link Processor} that ingests vulnerability data from CycloneDX Bill of Vulnerabilities. + */ +public class VulnerabilityMirrorProcessor implements Processor { -public class MirrorVulnerabilityProcessor implements Processor { + static final String PROCESSOR_NAME = "vuln.mirror"; - private static final Logger LOGGER = Logger.getLogger(MirrorVulnerabilityProcessor.class); + private static final Logger LOGGER = Logger.getLogger(VulnerabilityMirrorProcessor.class); private static final Timer TIMER = Timer.builder("vuln_mirror_processing") .description("Time taken to process mirrored vulnerabilities") .register(Metrics.getRegistry()); @Override - public void process(final Record record) { + public void process(final ConsumerRecord record) { final Timer.Sample timerSample = Timer.start(); try (QueryManager qm = new QueryManager().withL2CacheDisabled()) { @@ -112,9 +116,6 @@ public void process(final Record record) { synchronizedVulnerability.setVulnerableSoftware(reconciledVsList); } qm.persist(synchronizedVulnerability); - } catch (Exception e) { - // TODO: Send record to a dead letter topic. - LOGGER.error("Synchronizing vulnerability %s failed".formatted(record.key()), e); } finally { timerSample.stop(TIMER); } diff --git a/src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java b/src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java index 96f446392..f971841df 100644 --- a/src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java +++ b/src/main/java/org/dependencytrack/event/kafka/streams/KafkaStreamsTopologyFactory.java @@ -23,7 +23,6 @@ import org.dependencytrack.event.ProjectPolicyEvaluationEvent; import org.dependencytrack.event.kafka.KafkaTopics; import org.dependencytrack.event.kafka.streams.processor.DelayedBomProcessedNotificationProcessor; -import org.dependencytrack.event.kafka.streams.processor.MirrorVulnerabilityProcessor; import org.dependencytrack.event.kafka.streams.processor.RepositoryMetaResultProcessor; import org.dependencytrack.event.kafka.streams.processor.VulnerabilityScanResultProcessor; import org.dependencytrack.model.VulnerabilityScan; @@ -224,12 +223,6 @@ Topology createTopology() { .withName("consume_from_%s_topic".formatted(KafkaTopics.REPO_META_ANALYSIS_RESULT.name()))) .process(RepositoryMetaResultProcessor::new, Named.as("process_repo_meta_analysis_result")); - streamsBuilder - .stream(KafkaTopics.NEW_VULNERABILITY.name(), - Consumed.with(KafkaTopics.NEW_VULNERABILITY.keySerde(), KafkaTopics.NEW_VULNERABILITY.valueSerde()) - .withName("consume_from_%s_topic".formatted(KafkaTopics.NEW_VULNERABILITY.name()))) - .process(MirrorVulnerabilityProcessor::new, Named.as("process_mirror_vulnerability")); - return streamsBuilder.build(streamsProperties); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index ac06834f6..a2a0b4917 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -444,6 +444,15 @@ kafka.streams.transient.processing.exception.threshold.interval=PT30M # alpine.kafka.processor..retry.max.delay.ms=60000 # alpine.kafka.processor..consumer.= +alpine.kafka.processor.vuln.mirror.max.concurrency=3 +alpine.kafka.processor.vuln.mirror.processing.order=partition +alpine.kafka.processor.vuln.mirror.retry.initial.delay.ms=3000 +alpine.kafka.processor.vuln.mirror.retry.multiplier=2 +alpine.kafka.processor.vuln.mirror.retry.randomization.factor=0.3 +alpine.kafka.processor.vuln.mirror.retry.max.delay.ms=180000 +alpine.kafka.processor.vuln.mirror.consumer.group.id=dtrack-apiserver-processor +alpine.kafka.processor.vuln.mirror.consumer.auto.offset.reset=latest + # Scheduling tasks after 3 minutes (3*60*1000) of starting application task.scheduler.initial.delay=180000 diff --git a/src/test/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessorTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessorTest.java similarity index 95% rename from src/test/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessorTest.java rename to src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessorTest.java index b8a57778c..f9bdf4a6e 100644 --- a/src/test/java/org/dependencytrack/event/kafka/streams/processor/MirrorVulnerabilityProcessorTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityMirrorProcessorTest.java @@ -1,55 +1,26 @@ -package org.dependencytrack.event.kafka.streams.processor; +package org.dependencytrack.event.kafka.processor; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.kstream.Consumed; -import org.cyclonedx.proto.v1_4.Bom; -import org.dependencytrack.PersistenceCapableTest; -import org.dependencytrack.event.kafka.serialization.KafkaProtobufSerde; -import org.dependencytrack.event.kafka.serialization.KafkaProtobufSerializer; import org.dependencytrack.model.Severity; import org.dependencytrack.model.Vulnerability; import org.dependencytrack.persistence.CweImporter; -import org.dependencytrack.util.KafkaTestUtil; -import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; +import static org.dependencytrack.util.KafkaTestUtil.generateBomFromJson; -public class MirrorVulnerabilityProcessorTest extends PersistenceCapableTest { - - private TopologyTestDriver testDriver; - private TestInputTopic inputTopic; +public class VulnerabilityMirrorProcessorTest extends AbstractProcessorTest { @Before public void setUp() throws Exception { - final var streamsBuilder = new StreamsBuilder(); - streamsBuilder - .stream("input-topic", Consumed - .with(Serdes.String(), new KafkaProtobufSerde<>(Bom.parser()))) - .process(MirrorVulnerabilityProcessor::new); - - testDriver = new TopologyTestDriver(streamsBuilder.build()); - inputTopic = testDriver.createInputTopic("input-topic", - new StringSerializer(), new KafkaProtobufSerializer<>()); + super.setUp(); new CweImporter().processCweDefinitions(); // Required for CWE mapping } - @After - public void tearDown() { - if (testDriver != null) { - testDriver.close(); - } - } - @Test public void testProcessNvdVuln() throws Exception { - inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -91,7 +62,10 @@ public void testProcessNvdVuln() throws Exception { { "url": "https://github.com/thinkcmf/thinkcmf/issues/736" } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489"); assertThat(vuln).isNotNull(); @@ -160,7 +134,7 @@ public void testProcessNvdVuln() throws Exception { @Test public void testProcessGitHubVuln() throws Exception { - inputTopic.pipeInput("GITHUB/GHSA-fxwm-579q-49qq", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -223,7 +197,10 @@ public void testProcessGitHubVuln() throws Exception { { "url": "https://github.com/advisories/GHSA-fxwm-579q-49qq" } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("GITHUB/GHSA-fxwm-579q-49qq", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("GITHUB", "GHSA-fxwm-579q-49qq"); assertThat(vuln).isNotNull(); @@ -375,7 +352,7 @@ public void testProcessGitHubVuln() throws Exception { @Test public void testProcessOsvVuln() throws Exception { - inputTopic.pipeInput("OSV/GHSA-2cc5-23r7-vc4v", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -427,7 +404,10 @@ public void testProcessOsvVuln() throws Exception { { "url": "https://github.com/ratpack/ratpack/blob/29434f7ac6fd4b36a4495429b70f4c8163100332/ratpack-session/src/main/java/ratpack/session/clientside/ClientSideSessionConfig.java#L29" } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("OSV/GHSA-2cc5-23r7-vc4v", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("GITHUB", "GHSA-2cc5-23r7-vc4v"); assertThat(vuln).isNotNull(); @@ -555,7 +535,7 @@ public void testProcessOsvVuln() throws Exception { @Test public void testProcessVulnWithoutAffects() throws Exception { - inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -573,7 +553,10 @@ public void testProcessVulnWithoutAffects() throws Exception { } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489"); assertThat(vuln).isNotNull(); @@ -613,7 +596,7 @@ public void testProcessVulnWithoutAffects() throws Exception { @Test public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception { - inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -639,7 +622,10 @@ public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception { } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489"); assertThat(vuln).isNotNull(); @@ -679,7 +665,7 @@ public void testProcessVulnWithUnmatchedAffectsBomRef() throws Exception { @Test public void testProcessVulnWithVersConstraints() throws Exception { - inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -731,7 +717,10 @@ public void testProcessVulnWithVersConstraints() throws Exception { } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489"); assertThat(vuln).isNotNull(); @@ -935,7 +924,7 @@ public void testProcessVulnWithVersConstraints() throws Exception { @Test public void testProcessVulnWithInvalidCpeOrPurl() throws Exception { - inputTopic.pipeInput("NVD/CVE-2022-40489", KafkaTestUtil.generateBomFromJson(""" + final var bovJson = """ { "components": [ { @@ -997,7 +986,10 @@ public void testProcessVulnWithInvalidCpeOrPurl() throws Exception { } ] } - """)); + """; + + final var processor = new VulnerabilityMirrorProcessor(); + processor.process(aConsumerRecord("NVD/CVE-2022-40489", generateBomFromJson(bovJson)).build()); final Vulnerability vuln = qm.getVulnerabilityByVulnId("NVD", "CVE-2022-40489"); assertThat(vuln).isNotNull(); @@ -1035,4 +1027,4 @@ public void testProcessVulnWithInvalidCpeOrPurl() throws Exception { assertThat(vuln.getVulnerableSoftware()).isEmpty(); } -} +} \ No newline at end of file