Skip to content

Commit

Permalink
Fix CVE-2022-34917 about kafka-client dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahimSharafudeen committed Nov 13, 2024
1 parent 0c5444c commit 0330021
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<dep.gcs.version>1.9.17</dep.gcs.version>
<dep.alluxio.version>313</dep.alluxio.version>
<dep.slf4j.version>1.7.32</dep.slf4j.version>
<dep.kafka.version>2.3.1</dep.kafka.version>
<dep.kafka.version>2.8.2</dep.kafka.version>
<dep.pinot.version>0.11.0</dep.pinot.version>
<dep.druid.version>0.19.0</dep.druid.version>
<dep.jaxb.version>2.3.1</dep.jaxb.version>
Expand Down
10 changes: 10 additions & 0 deletions presto-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
<configuration>
<ignoredResourcePatterns>
<ignoredResourcePattern>kafka/kafka-version.properties</ignoredResourcePattern>
<ignoredResourcePattern>about.html</ignoredResourcePattern>
</ignoredResourcePatterns>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.presto.kafka.util.TestUtils.findUnusedPort;
Expand All @@ -48,7 +52,8 @@ public class EmbeddedKafka
private final EmbeddedZookeeper zookeeper;
private final int port;
private final File kafkaDataDir;
private final KafkaServerStartable kafka;
private final KafkaServer kafka;
private final AdminClient adminClient;

private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean stopped = new AtomicBoolean();
Expand Down Expand Up @@ -91,7 +96,11 @@ public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties)
.build();

KafkaConfig config = new KafkaConfig(toProperties(properties));
this.kafka = new KafkaServerStartable(config);
Time time = new SystemTime();
this.kafka = new KafkaServer(config, time, scala.Option.empty(), false);
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getConnectString());
this.adminClient = AdminClient.create(adminProps);
}

public void start()
Expand All @@ -112,6 +121,7 @@ public void close()
kafka.awaitShutdown();
zookeeper.close();
deleteRecursively(kafkaDataDir.toPath(), ALLOW_INSECURE);
adminClient.close();
}
}

Expand All @@ -124,14 +134,15 @@ public void createTopics(int partitions, int replication, Properties topicProper
{
checkState(started.get() && !stopped.get(), "not started!");

ZkUtils zkUtils = ZkUtils.apply(getZookeeperConnectString(), 30_000, 30_000, false);
try {
for (String topic : topics) {
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicProperties, RackAwareMode.Disabled$.MODULE$);
NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
newTopic.configs(Maps.fromProperties(topicProperties));
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
}
}
finally {
zkUtils.close();
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to create topics", e);
}
}

Expand Down
6 changes: 6 additions & 0 deletions presto-product-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@
<dependency>
<groupId>io.prestodb.tempto</groupId>
<artifactId>tempto-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.prestodb.tempto</groupId>
Expand Down

0 comments on commit 0330021

Please sign in to comment.