Skip to content
This repository has been archived by the owner on Feb 10, 2025. It is now read-only.

Commit

Permalink
[PD-257044] databus changes rebase (#848)
Browse files Browse the repository at this point in the history
* PD-257044-changes-for-databus

* duplicated kafka and ssm util packages in emo sor module

* removed redundant dependencies
  • Loading branch information
anurag0510 authored Nov 5, 2024
1 parent 62dc31e commit 72ed737
Show file tree
Hide file tree
Showing 22 changed files with 523 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import com.bazaarvoice.emodb.auth.apikey.ApiKey;
import com.bazaarvoice.emodb.auth.apikey.ApiKeyModification;
import com.bazaarvoice.emodb.auth.identity.TableAuthIdentityManagerDAO;
import com.bazaarvoice.emodb.event.api.BaseEventStore;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.api.AuditBuilder;
import com.bazaarvoice.emodb.sor.api.DataStore;
import com.bazaarvoice.emodb.sor.api.Intrinsic;
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.delta.Deltas;
import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -41,7 +40,7 @@ public class TableAuthIdentityManagerDAOTest {
*/
@Test
public void testRebuildIdIndex() {
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class));
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService());
Supplier<String> idSupplier = () -> "id0";
TableAuthIdentityManagerDAO<ApiKey> tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>(
ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys",
Expand Down Expand Up @@ -79,7 +78,7 @@ public void testRebuildIdIndex() {

@Test
public void testGrandfatheredInId() {
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class));
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService());
Supplier<String> idSupplier = () -> "id0";
TableAuthIdentityManagerDAO<ApiKey> tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>(
ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys",
Expand Down Expand Up @@ -131,7 +130,7 @@ public void testGrandfatheredInId() {

@Test
public void testIdAttributeCompatibility() {
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class));
DataStore dataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService());
Supplier<String> idSupplier = () -> "id0";
TableAuthIdentityManagerDAO<ApiKey> tableAuthIdentityManagerDAO = new TableAuthIdentityManagerDAO<>(
ApiKey.class, dataStore, "__auth:keys", "__auth:internal_ids", "app_global:sys",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import com.bazaarvoice.emodb.auth.role.RoleModification;
import com.bazaarvoice.emodb.auth.role.RoleNotFoundException;
import com.bazaarvoice.emodb.auth.role.TableRoleManagerDAO;
import com.bazaarvoice.emodb.event.api.BaseEventStore;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.api.Audit;
import com.bazaarvoice.emodb.sor.api.DataStore;
import com.bazaarvoice.emodb.sor.api.Intrinsic;
Expand All @@ -20,6 +18,7 @@
import com.bazaarvoice.emodb.sor.api.WriteConsistency;
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.delta.Deltas;
import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.web.auth.EmoPermissionResolver;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -62,7 +61,7 @@ public class TableRoleManagerDAOTest {
@BeforeMethod
public void setUp() {
// DataStore and PermissionManager are fairly heavy to fully mock. Use spies on in-memory implementations instead
_backendDataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class));
_backendDataStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService());
_dataStore = spy(_backendDataStore);
_permissionResolver = new EmoPermissionResolver(null, null);
_backendPermissionManager = new InMemoryPermissionManager(_permissionResolver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
import com.bazaarvoice.emodb.common.zookeeper.store.ValueStore;
import com.bazaarvoice.emodb.datacenter.api.DataCenter;
import com.bazaarvoice.emodb.datacenter.api.DataCenters;
import com.bazaarvoice.emodb.event.api.BaseEventStore;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.api.Audit;
import com.bazaarvoice.emodb.sor.api.AuditBuilder;
import com.bazaarvoice.emodb.sor.condition.Conditions;
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.db.astyanax.DeltaPlacementFactory;
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.delta.Deltas;
import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.table.db.Table;
import com.bazaarvoice.emodb.table.db.astyanax.AstyanaxTableDAO;
import com.bazaarvoice.emodb.table.db.astyanax.CQLStashTableDAO;
Expand Down Expand Up @@ -99,7 +98,7 @@ public void setup() throws Exception {
_astyanaxTableDAO.setCQLStashTableDAO(cqlStashTableDAO);
// Don't store table definitions in the actual backing store so as not to interrupt other tests. Use a
// private in-memory implementation.
_tableBackingStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService(), mock(BaseEventStore.class));
_tableBackingStore = new InMemoryDataStore(new MetricRegistry(), new KafkaProducerService());
_astyanaxTableDAO.setBackingStore(_tableBackingStore);

_lifeCycleRegistry.start();
Expand Down
10 changes: 6 additions & 4 deletions sor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.bazaarvoice.emodb</groupId>
<artifactId>emodb-queue</artifactId>
<version>6.5.205-SNAPSHOT</version>
<scope>compile</scope>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-ssm</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
import com.bazaarvoice.emodb.common.json.deferred.LazyJsonMap;
import com.bazaarvoice.emodb.common.uuid.TimeUUIDs;
import com.bazaarvoice.emodb.common.zookeeper.store.MapStore;
import com.bazaarvoice.emodb.event.api.BaseEventStore;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaConfig;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.ssm.ParameterStoreUtil;
import com.bazaarvoice.emodb.sor.api.*;
import com.bazaarvoice.emodb.sor.audit.AuditWriter;
import com.bazaarvoice.emodb.sor.compactioncontrol.LocalCompactionControl;
Expand All @@ -24,7 +20,10 @@
import com.bazaarvoice.emodb.sor.db.ScanRange;
import com.bazaarvoice.emodb.sor.db.ScanRangeSplits;
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.kafka.KafkaConfig;
import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.log.SlowQueryLog;
import com.bazaarvoice.emodb.sor.ssm.ParameterStoreUtil;
import com.bazaarvoice.emodb.table.db.DroppedTableException;
import com.bazaarvoice.emodb.table.db.StashBlackListTableCondition;
import com.bazaarvoice.emodb.table.db.StashTableDAO;
Expand Down Expand Up @@ -108,7 +107,6 @@ public class DefaultDataStore implements DataStore, DataProvider, DataTools, Tab
private final Clock _clock;
private final KafkaProducerService _kafkaProducerService;
private ParameterStoreUtil parameterStoreUtil;
private final BaseEventStore _eventStore;
private final Cache<String, Boolean> dataThrottlerCache = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.build();
Expand All @@ -120,10 +118,10 @@ public DefaultDataStore(LifeCycleRegistry lifeCycle, MetricRegistry metricRegist
DataReaderDAO dataReaderDao, DataWriterDAO dataWriterDao, SlowQueryLog slowQueryLog, HistoryStore historyStore,
@StashRoot Optional<URI> stashRootDirectory, @LocalCompactionControl CompactionControlSource compactionControlSource,
@StashBlackListTableCondition Condition stashBlackListTableCondition, AuditWriter auditWriter,
@MinSplitSizeMap MapStore<DataStoreMinSplitSize> minSplitSizeMap, Clock clock, KafkaProducerService kafkaProducerService, BaseEventStore eventStore) {
@MinSplitSizeMap MapStore<DataStoreMinSplitSize> minSplitSizeMap, Clock clock, KafkaProducerService kafkaProducerService) {
this(eventWriterRegistry, tableDao, dataReaderDao, dataWriterDao, slowQueryLog, defaultCompactionExecutor(lifeCycle),
historyStore, stashRootDirectory, compactionControlSource, stashBlackListTableCondition, auditWriter,
minSplitSizeMap, metricRegistry, clock, kafkaProducerService, eventStore);
minSplitSizeMap, metricRegistry, clock, kafkaProducerService);
}

@VisibleForTesting
Expand All @@ -132,7 +130,7 @@ public DefaultDataStore(DatabusEventWriterRegistry eventWriterRegistry,TableDAO
SlowQueryLog slowQueryLog, ExecutorService compactionExecutor, HistoryStore historyStore,
Optional<URI> stashRootDirectory, CompactionControlSource compactionControlSource,
Condition stashBlackListTableCondition, AuditWriter auditWriter,
MapStore<DataStoreMinSplitSize> minSplitSizeMap, MetricRegistry metricRegistry, Clock clock, KafkaProducerService kafkaProducerService, BaseEventStore eventStore) {
MapStore<DataStoreMinSplitSize> minSplitSizeMap, MetricRegistry metricRegistry, Clock clock, KafkaProducerService kafkaProducerService) {
_eventWriterRegistry = requireNonNull(eventWriterRegistry, "eventWriterRegistry");
_tableDao = requireNonNull(tableDao, "tableDao");
_dataReaderDao = requireNonNull(dataReaderDao, "dataReaderDao");
Expand All @@ -154,7 +152,6 @@ public DefaultDataStore(DatabusEventWriterRegistry eventWriterRegistry,TableDAO
_clock = requireNonNull(clock, "clock");
_kafkaProducerService = requireNonNull(kafkaProducerService, "kafkaProducerService");
this.parameterStoreUtil = new ParameterStoreUtil();
_eventStore = eventStore;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.bazaarvoice.emodb.sor.core.test;

import com.bazaarvoice.emodb.event.api.BaseEventStore;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.audit.DiscardingAuditWriter;
import com.bazaarvoice.emodb.sor.compactioncontrol.InMemoryCompactionControlSource;
import com.bazaarvoice.emodb.sor.condition.Conditions;
import com.bazaarvoice.emodb.sor.core.DatabusEventWriterRegistry;
import com.bazaarvoice.emodb.sor.core.DefaultDataStore;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog;
import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO;
import com.codahale.metrics.MetricRegistry;
Expand All @@ -21,19 +20,19 @@
*/
public class InMemoryDataStore extends DefaultDataStore {

public InMemoryDataStore(MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService, BaseEventStore eventStore) {
this(new InMemoryDataReaderDAO(), metricRegistry, kafkaProducerService, eventStore);
public InMemoryDataStore(MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) {
this(new InMemoryDataReaderDAO(), metricRegistry, kafkaProducerService);
}


public InMemoryDataStore(InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService, BaseEventStore eventStore) {
this(new DatabusEventWriterRegistry(), dataDao, metricRegistry, kafkaProducerService, eventStore);
public InMemoryDataStore(InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) {
this(new DatabusEventWriterRegistry(), dataDao, metricRegistry, kafkaProducerService);
}

public InMemoryDataStore(DatabusEventWriterRegistry eventWriterRegistry, InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService, BaseEventStore eventStore) {
public InMemoryDataStore(DatabusEventWriterRegistry eventWriterRegistry, InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) {
super(eventWriterRegistry, new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), MoreExecutors.newDirectExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), kafkaProducerService, eventStore);
new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), kafkaProducerService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.bazaarvoice.emodb.sor.kafka;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class KafkaAdminService {
private static final Logger _log = LoggerFactory.getLogger(KafkaAdminService.class);
private final AdminClient adminClient;
// Cache for the list of all topics with a TTL of 10 minutes
private final Cache<String, Set<String>> topicListCache = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.build();

private static final String TOPIC_LIST_KEY = "allTopics";


public KafkaAdminService() {
this.adminClient = AdminClient.create(KafkaConfig.getAdminProps());
}

/**
* Creates a new Kafka topic with the specified configurations.
*
* @param topic The name of the topic.
* @param numPartitions Number of partitions.
* @param replicationFactor Replication factor.
*/
public Boolean createTopicIfNotExists(String topic, int numPartitions, short replicationFactor, String queueType) {
Boolean isExisting =isTopicExists(topic);
if (! isExisting) {
//create the topic now
NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
try {
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
addToCache(topic);
_log.info("Created topic: {} with numPartitions: {} and replication factor {} ", topic, numPartitions, replicationFactor);
} catch (Exception e) {
_log.error("Error creating topic {}: ", topic, e);
throw new RuntimeException(e);
}
}
return isExisting;
}
public void addToCache(String topic){
Set<String> topics = topicListCache.getIfPresent(TOPIC_LIST_KEY);
if (topics == null) {
topics = new HashSet<>();
} else {
// Create a new mutable Set if the existing one is unmodifiable
topics = new HashSet<>(topics);
}
topics.add(topic);
topicListCache.put(TOPIC_LIST_KEY, topics);
_log.info("Added newly created topic to cache: {}", topic);
}


/**
* Checks if a Kafka topic exists by using a cache to store the list of all topics.
* If the cache entry has expired or the cache is empty, it queries the Kafka AdminClient for the topic list.
* <p>
* The cached list has a TTL (Time-To-Live) of 10 minutes, after which it will be refreshed
* from Kafka on the next access.
* </p>
*
* @param topic the name of the Kafka topic to check
* @return {@code true} if the topic exists, otherwise {@code false}.
* @throws RuntimeException if there is an error fetching the topic list or checking if the topic exists.
*/
public boolean isTopicExists(String topic) {
try {
// Retrieve the list of topics from the cache
Set<String> topics = topicListCache.get(TOPIC_LIST_KEY, this::fetchTopicListFromKafka);

// Check if the given topic is in the cached list
return topics.contains(topic);
} catch (ExecutionException e) {
_log.error("Failed to check if topic exists: {}", topic, e);
throw new RuntimeException("Error checking if topic exists", e);
}
}

/**
* Fetches the list of all topic names from Kafka AdminClient.
* This method is called only when the cache is expired or empty.
*
* @return a Set containing all topic names.
* @throws ExecutionException if there is an error fetching the topic list from Kafka.
*/
private Set<String> fetchTopicListFromKafka() throws ExecutionException {
try {
_log.info("Fetching topic list from Kafka");
return adminClient.listTopics().names().get();
} catch (Exception e) {
_log.error("Error fetching topic list from Kafka", e);
throw new ExecutionException(e);
}
}

/**
* Closes the AdminClient to release resources.
*/
public void close() {
if (adminClient != null) {
adminClient.close();
}
}
}
Loading

0 comments on commit 72ed737

Please sign in to comment.